In [15]:
# for google colab
from google.colab import drive
# mount your Google Drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [16]:
# for google colab
# copy all files from "HW5" directory in Google drive to current directory
!cp -r ./gdrive/MyDrive/HW5/* .

In [17]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import csv
import cv2
import numpy as np
import random
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms.functional import to_tensor, to_pil_image

from tqdm import tqdm
import random
import numpy as np

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [18]:
colabPath = '/content/gdrive/MyDrive/HW5'
# unzip capcha-hacker.zip
zipPath = os.path.join(colabPath, 'captcha-hacker.zip')
!unzip $zipPath 

Archive:  /content/gdrive/MyDrive/HW5/captcha-hacker.zip
replace sample_submission.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: ㄙ
error:  invalid response [ㄙ]
replace sample_submission.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


In [19]:
characters1 = '-0123456789' 
characters2 = '-0123456789abcdefghijklmnopqrstuvwxyz' 
n_classes_1, n_classes_2 = 11, 37

In [20]:
TRAIN_PATH = "train"
TEST_PATH = "test"

In [21]:
class TaskDataset(Dataset):
  def __init__(self, data, root, characters, input_length, label_length, return_filename=False):
    self.return_filename = return_filename
    self.root = root
    self.data = data
    self.input_length = input_length
    self.label_length = label_length
    self.characters = characters

  def __getitem__(self, index):
    filename, label = self.data[index]
    img = cv2.imread(f"{self.root}/{filename}")
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = to_tensor(img)
    target = torch.tensor([self.characters.find(x) for x in label], dtype=torch.long)
    input_length = torch.full(size=(1, ), fill_value=self.input_length, dtype=torch.long)
    target_length = torch.full(size=(1, ), fill_value=self.label_length, dtype=torch.long)
    if self.return_filename:
      return img, filename
    else:
      return img, target, input_length, target_length 

  def __len__(self):
    return len(self.data)

In [22]:
class Model(nn.Module):
  def __init__(self, n_classes, input_shape=(1, 72, 72)):
    super(Model, self).__init__()
    self.input_shape = input_shape
    # VGG16
    self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 32, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d((2, 1)),
            nn.Dropout(0.25, inplace=True)
            )
    
    self.lstm = nn.LSTM(input_size=self.infer_features(), hidden_size=128, num_layers=2, bidirectional=True)
    self.fc = nn.Linear(in_features=256, out_features=n_classes)

  def infer_features(self):
    x = torch.zeros((1,)+self.input_shape)
    x = self.cnn(x)
    x = x.reshape(x.shape[0], -1, x.shape[-1])
    return x.shape[1]

  def forward(self, x):
    x = self.cnn(x)
    x = x.reshape(x.shape[0], -1, x.shape[-1])
    x = x.permute(2, 0, 1)
    x, _ = self.lstm(x)
    x = self.fc(x)
    return x

In [23]:
def decode(sequence, characters):
  # map index to real characters and do the ctc
  tmp = ''
  for index in sequence:
    tmp += characters[int(index)]

  output = ''
  next_idx = 1
  flag = False

  for c in tmp[:len(tmp) - 1]:
    if c != '-' and c != tmp[next_idx]:
      output += c
      flag = True
    next_idx += 1
  
  if flag is False:
    return output

  if tmp[-1] != '-' and output[-1] != tmp[-1]:
      output += tmp[-1]

  return output

def decode_label(sequence, characters):
  # map index to real character
  output = ''
  for index in sequence:
    output += characters[index]

  return output.replace(' ', '')

def calc_acc(label, output, characters):
  # decode the label and prediction, calculating the accuracy
  output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1)
  label, output_argmax = label.cpu().numpy(), output_argmax.cpu().numpy()
  acc, total = 0, 0
  for true, pred in zip(label, output_argmax):
    if decode_label(true, characters) == decode(pred, characters):
      acc += 1
    total += 1  
  acc /= total

  return acc

In [24]:
test_data1 = []
test_data2 = []
test_data3 = []

with open(f'sample_submission.csv', newline='') as csvfile:
  for row in csv.reader(csvfile, delimiter=','):
    if row[0].startswith("task1"):
      test_data1.append(row)
    elif row[0].startswith("task2"):
      test_data2.append(row)
    elif row[0].startswith("task3"):
      test_data3.append(row)

test_ds1 = TaskDataset(test_data1, root=TEST_PATH, characters=characters1, input_length=4, label_length=1, return_filename=True)
test_ds2 = TaskDataset(test_data2, root=TEST_PATH, characters=characters2, input_length=4, label_length=2, return_filename=True)
test_ds3 = TaskDataset(test_data3, root=TEST_PATH, characters=characters2, input_length=6, label_length=4, return_filename=True)

In [25]:
# test model1
# initialize model1
model1 = Model(n_classes_1, input_shape=(1, 72, 72)).to(device)
model1 = torch.load('model1.pth')
model1.eval()

with open('submission.csv', 'w', newline='') as csvfile:
  csv_writer = csv.writer(csvfile, delimiter=',')
  csv_writer.writerow(["filename", "label"])
  for image, filenames in test_ds1:
    output = model1(image.unsqueeze(0).to(device))
    output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1)
    csv_writer.writerow([filenames, decode(output_argmax[0], characters2)])

In [26]:
# test model2
model2 = Model(n_classes_2, input_shape=(1, 72, 72)).to(device)
model2 = torch.load('model2.pth')
model2.eval()

with open('submission.csv', 'a', newline='') as csvfile:
  csv_writer = csv.writer(csvfile, delimiter=',')
  for image, filename in test_ds2:
    output = model2(image.unsqueeze(0).to(device))
    output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1)
    csv_writer.writerow([filename, decode(output_argmax[0], characters2)])

In [27]:
# test model3
model3 = Model(n_classes_2, input_shape=(1, 72, 96)).to(device)
model3 = torch.load('model3.pth')
model3.eval()

with open('submission.csv', 'a', newline='') as csvfile:
  csv_writer = csv.writer(csvfile, delimiter=',')
  for image, filename in test_ds3:
    output = model3(image.unsqueeze(0).to(device))
    output_argmax = output.detach().permute(1, 0, 2).argmax(dim=-1)
    csv_writer.writerow([filename, decode(output_argmax[0], characters2)])

In [28]:
!cp submission.csv ./gdrive/MyDrive/HW5/submission.csv 