<a href="https://colab.research.google.com/github/HassenLin/eliza_colab/blob/main/HW3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install torchinfo



In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader, Subset
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms
import torchvision
from torchinfo import summary
import numpy as np
import matplotlib.pyplot as plt
from typing import Literal
import os
from pathlib import Path
from PIL import Image
import zipfile
from glob import glob
import shutil
from tqdm import tqdm
import json
import pandas as pd
import cv2

In [9]:
from google.colab import drive
drive.mount("/content/drive")

dataset_path_str = r"/content/data/Midterm_Project"
dataset_path = Path(dataset_path_str)

for f in os.listdir("/content/drive/MyDrive/Colab Notebooks/data/"):
 print(f)
dataset_path.mkdir( parents=True, exist_ok=True )

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Midterm_Project.zip


In [None]:
with zipfile.ZipFile(r"/content/drive/MyDrive/Colab Notebooks/data/Midterm_Project.zip", "r" ) as zip_ref:
  zip_ref.extractall( dataset_path_str )

In [None]:
for f in os.listdir(dataset_path):
 print(f)

In [4]:
mapping = {}
for i, char in enumerate( "0123456789+-*÷") :
   mapping[char] = i
def normalize_label( label_str ):
  ret=""
  for i, char in enumerate( label_str ) :
    if char in mapping:
      ret= ret + char
  return ret

In [5]:
def label_to_tensor( label_str ):
  t=[]
  for i, char in enumerate( label_str ) :
    if char in mapping:
      t.append(mapping[char])
    else:
      print(label_str + " has '"+char+"'")
  return torch.tensor( t, dtype = torch.long )

In [6]:
class CatchDataset(Dataset):
  def __init__( self, split:Literal["train_data01", "train_data02"], transform:transforms=None ) -> None:
    super().__init__()

    self.transform = transform

    if split == "train_data01":
      csv_path = os.path.join( dataset_path, "train_data01.csv" )
      img_dir = os.path.join( dataset_path, "train_data01" )
    elif split == "train_data02":
      csv_path = os.path.join( dataset_path, "train_data02.csv" )
      img_dir = os.path.join( dataset_path, "train_data02" )
    self.imgs = []
    self.labels = []
    df = pd.read_csv( csv_path, sep='\t', lineterminator='\n' )
    # print( df.head() )
    # print( df.shape )
    for _, row in df.iterrows():
      if len(row) >= 3:
        filename = str(os.path.join(img_dir, "p"+str(row.iloc[0])+".jpg"))
        label = normalize_label(str(row.iloc[1])) # +"\t"+ str(row.iloc[2])
        if os.path.exists(filename):
          self.imgs.append(filename)
          self.labels.append(label)
        else:
          print(filename + " not exist!!")
    for i in range(0, 10) :
        print(self.imgs[i] +" ==> "+ self.labels[i])

  def __len__(self):
    return len(self.imgs)

  def __getitem__( self, index ):
    image_path = self.imgs[index]
    label = self.labels[index]

    image = cv2.imread(image_path)
    img_np = np.array( image )
    img_gray = cv2.cvtColor( img_np, cv2.COLOR_BGR2GRAY)
    img_denoise = cv2.fastNlMeansDenoising( img_gray )

    canny = cv2.Canny( img_denoise, 150, 200 )
    canny_dilate = cv2.dilate( canny, np.ones( (3, 3), np.uint8) )
    contours, _ = cv2.findContours( canny_dilate, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE )

    contour_sort = sorted( [(c, cv2.boundingRect(c)[0]) for c in contours], key = lambda x: x[1] )
    filter_contours = []
    for ( c, _ ) in contour_sort:
      x, y, w, h = cv2.boundingRect(c)
      area = w * h
      if area > 15:
        filter_contours.append( (x, y, w, h) )

    characters_imgs = []
    for x, y, w, h in filter_contours:
      char_img = img_gray[ y:y + h, x: x + w ]
      char_img = cv2.resize( char_img, (32, 32) )
      char_img = cv2.cvtColor( char_img, cv2.COLOR_GRAY2RGB )
      char_img = Image.fromarray( char_img )
      char_img = self.transform( char_img )
      characters_imgs.append( char_img )
    while len( characters_imgs ) < 9:
      characters_imgs.append( torch.zeros((3, 32, 32)) )
    characters_imgs = characters_imgs[:9]

    characters_imgs = torch.stack( characters_imgs )
    label_tensor = label_to_tensor( label )
    return characters_imgs, label_tensor

In [7]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] )
])

In [10]:
from sklearn.model_selection import train_test_split

full_dataset = CatchDataset( split="train_data01", transform=transform )

train_indices, val_indices = train_test_split(
    list( range( len(full_dataset) ) ),
    test_size = 0.2,
    random_state = 1,
    shuffle = True
)
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
train_loader = DataLoader( full_dataset, sampler=train_sampler, batch_size=32, shuffle=False )
val_loader = DataLoader( full_dataset, sampler=val_sampler, batch_size=32, shuffle=False )

/content/data/Midterm_Project/train_data01/p1.jpg ==> 13*513032
/content/data/Midterm_Project/train_data01/p2.jpg ==> 785+38787
/content/data/Midterm_Project/train_data01/p3.jpg ==> 3+49807+8
/content/data/Midterm_Project/train_data01/p4.jpg ==> 3*3585982
/content/data/Midterm_Project/train_data01/p5.jpg ==> 5*9*18652
/content/data/Midterm_Project/train_data01/p6.jpg ==> 5483+1815
/content/data/Midterm_Project/train_data01/p7.jpg ==> 77+7*97+7
/content/data/Midterm_Project/train_data01/p8.jpg ==> 8-80+6-74
/content/data/Midterm_Project/train_data01/p9.jpg ==> 7+4653543
/content/data/Midterm_Project/train_data01/p10.jpg ==> 4+471*8*9


In [11]:
class ResBlock(nn.Module):

  def __init__( self, in_channels, out_channels, stride=1 ):
    super().__init__()
    self.res_function = nn.Sequential(
        nn.Conv2d( in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=stride, padding=1 ),
        nn.BatchNorm2d( out_channels ),
        nn.ReLU(),
        nn.Conv2d( in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1 ),
        nn.BatchNorm2d( out_channels )
    )
    self.identity_function = nn.Sequential()
    if stride != 1 or in_channels != out_channels:
      self.identity_function = nn.Sequential(
          nn.Conv2d( in_channels, out_channels, kernel_size=1, stride=stride, bias=False ),
          nn.BatchNorm2d( out_channels )
      )

  def forward( self, x ):
    identity = self.identity_function( x )
    out = self.res_function( x )
    out += identity
    return F.relu( out )

class ResNet18(nn.Module):

  def __init__( self, Resblock, num_blocks ):
    super().__init__()
    self.in_channels = 64
    self.conv1 = nn.Sequential(
        nn.Conv2d( in_channels=3, out_channels=64, kernel_size=3, padding=1 ),
        nn.BatchNorm2d( 64 ),
        nn.ReLU()
    )
    self.conv2_x = self._make_layer( Resblock, 64, num_blocks[0], 1 )
    self.conv3_x = self._make_layer( Resblock, 128, num_blocks[1], 1 )
    self.conv4_x = self._make_layer( Resblock, 256, num_blocks[2], 1 )
    self.conv5_x = self._make_layer( Resblock, 512, num_blocks[3], 1 )

    self.avg_pooling = nn.AdaptiveAvgPool2d( (1,1) )
    self.dropout = nn.Dropout( 0.5 )
    self.fc = nn.Linear( 512, 14 )

  def _make_layer( self, Resblock, out_channels, num_blocks, stride ):
    strides = [stride] + [1] * ( num_blocks - 1 )
    layers = []
    for stride in strides:
      layers.append( Resblock( self.in_channels, out_channels, stride) )
      self.in_channels = out_channels
    return nn.Sequential( *layers )

  def forward( self, x ):
    x = self.conv1(x)
    x = self.conv2_x(x)
    x = self.conv3_x(x)
    x = self.conv4_x(x)
    x = self.conv5_x(x)
    x = self.avg_pooling(x)
    x = torch.flatten( x, 1 )
    x = self.dropout(x)
    x = self.fc(x)
    return x


In [12]:
DEVICE = torch.device( "cuda" if torch.cuda.is_available() else "cpu" )
model = ResNet18( ResBlock, [2, 2, 2, 2] )
model = model.to(DEVICE)

In [13]:
loss_func = nn.CrossEntropyLoss()
optimizer = optim.SGD( model.parameters(), lr = 1e-3, momentum = 0.9 )

In [None]:
loss_list = []
test_loss_list = []
train_acc_list = []
test_acc_list = []

EPOCH = 30
patience = 10
early_stop = 0
best_test_acc = 0.0

for epoch in range(EPOCH):
  model.train()
  epoch_loss = 0.0
  correct = 0
  total = 0
  print(f'Epoch: {epoch+1} :')
  for idx, (img, labels) in enumerate(train_loader):
    img = img.to(DEVICE)
    labels = labels.to(DEVICE)
    optimizer.zero_grad()
    batch_loss = 0.0
    print(f'  idx: {idx}', end='')
    for i in range(9):
      char_imgs = img[:, i, :, :, :]
      char_labels = labels[ :, i ]
      outputs = model( char_imgs )
      loss = loss_func( outputs, char_labels )
      batch_loss += loss

      _, pred = torch.max( outputs, dim=1 )
      correct += ( pred == char_labels ).sum().item()
      total += char_imgs.shape[0]
      print('.', end='')
    print('|')
    batch_loss.backward()
    optimizer.step()
    epoch_loss += batch_loss.item()

  avg_loss = epoch_loss / len( train_loader )
  loss_list.append( avg_loss )
  train_acc_list.append( correct / total )
  print( f'\tTrain Loss: {avg_loss:.3f} \tTrain Acc: {100 * correct / total:.2f}%')

  model.eval()
  correct = 0
  total = 0
  test_loss = 0.0

  with torch.no_grad():
    for idx, (img, labels) in enumerate(val_loader):
      img = img.to(DEVICE)
      labels = labels.to(DEVICE)
      batch_loss = 0.0

      for i in range(9):
        char_imgs = img[:, i, :, :, :]
        char_labels = labels[:, i]
        outputs = model( char_imgs )
        loss = loss_func( outputs, labels )
        batch_loss += loss

        _, pred = torch.max( outputs, dim=1 )
        correct += ( pred == char_labels ).sum().item()
        total += char_imgs.shape[0]

      test_loss += batch_loss.item()

  avg_test_loss = test_loss / len( val_loader )
  test_loss_list.append( avg_test_loss )
  test_acc = correct / total
  test_acc_list.append( test_acc )
  print(f"Epoch: {epoch + 1}\tTest Loss: {avg_test_loss:.3f} \tTest Acc: {100 * test_acc:.2f}%")

  if test_acc > best_test_acc:
    best_test_acc = test_acc
    early_stop = 0
    # torch.save( model.state_dict(), "model.pth" )
  else:
    early_stop += 1

  if early_stop >= patience:
    break

Epoch: 1 :
  idx: 0.........|
  idx: 1.........|
  idx: 2........