In [9]:
import torch.nn as nn
import torch
import torch.nn.functional as F
import pandas as pd
import numpy as np
import cv2
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from os.path import join
from google.colab import drive

In [10]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


- Note 1: Please specify where the dataset is, the directory structure must be consistent with the content of "captcha-hacker-2023-spring.zip".
- Note 2: sample_submission.csv is also needed for indicating the label length of testing data in each task. If TAs are going to use the inference code to predict on private set on kaggle, there should be another sample_submission.csv for private set that the format must be the same as the file provided before. Apologize for any inconvenience ><".
- Note 3: Please specify where the model weight is and the location to save the prediction.
- Note 4: Please change the run time type to 'GPU'.

In [11]:
NUMBER = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
ALPHABET = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 
						'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
						
ALL_CHAR_SET = NUMBER + ALPHABET
ALL_CHAR_SET_LEN = len(ALL_CHAR_SET)

MAX_CAPTCHA = 4

CAPTCHA_TO_INDEX_DICT = {char: index for index, char in enumerate(ALL_CHAR_SET)}
INDEX_TO_CAPTCHA_DICT = {index: char for index, char in enumerate(ALL_CHAR_SET)}

# Note 1
dataset_path = '/content/drive/MyDrive/Pattern_Recognition_Final_Project/dataset'

# Note 2
sample_submission_path = join(dataset_path, 'sample_submission.csv')

# Note 3
model_weight_path = '/content/drive/MyDrive/Pattern_Recognition_Final_Project/ResNet_34_epoch_50_acc_0.9724.pkl'
submission_path = '/content/drive/MyDrive/Pattern_Recognition_Final_Project/submission_ResNet_temp.csv'

In [12]:
class CaptchaDataset(Dataset):
		def __init__(self, dataset_path, df, is_predict=False):
			self.dataset_path = dataset_path
			self.data = df["filename"].tolist()
			raw_labels = df["label"].tolist()
			self.is_predict = is_predict
			if is_predict:
				self.labels = np.array(raw_labels)
			
		def __getitem__(self, index):
			file_name, label = self.data[index], self.labels[index]
			if self.is_predict:
				img = cv2.imread(join(self.dataset_path, "test", file_name),  cv2.IMREAD_GRAYSCALE)
			img = img.reshape(img.shape[0],img.shape[1], -1)
			img = (img - 128) / 128
			img = np.transpose(img,(2,0,1)) #因為 Conv2D channel 要在第一個 dim，所以做轉換
			return torch.tensor(img,dtype=torch.float), torch.tensor(label, dtype=torch.float), file_name
			
		def __len__(self):
			return len(self.data)

In [13]:
class ResidualBlock(nn.Module):
		def __init__(self, inchannel, outchannel, stride=1):
				super(ResidualBlock, self).__init__()
				self.left = nn.Sequential(
						nn.Conv2d(inchannel,
											outchannel,
											kernel_size=3,
											stride=stride,
											padding=1,
											bias=False),
						nn.BatchNorm2d(outchannel, track_running_stats=True),
						nn.ReLU(inplace=True),
						nn.Conv2d(outchannel,
											outchannel,
											kernel_size=3,
											stride=1,
											padding=1,
											bias=False),
						nn.BatchNorm2d(outchannel, track_running_stats=True))

				self.shortcut = nn.Sequential()

				if stride != 1 or inchannel != outchannel:
						self.shortcut = nn.Sequential(
								nn.Conv2d(inchannel,
													outchannel,
													kernel_size=1,
													stride=stride,
													bias=False),
								nn.BatchNorm2d(outchannel, track_running_stats=True),
								nn.ReLU())

		def forward(self, x):
				out = self.left(x)
				out += self.shortcut(x)
				# out = F.relu(out)
				return out

class ResNet(nn.Module):
		def __init__(self, ResidualBlock, num_classes=62):
				super(ResNet, self).__init__()
				self.inchannel = 64
				self.conv1 = nn.Sequential(
						nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False),
						nn.BatchNorm2d(64, track_running_stats=True),
						nn.ReLU(),
				)
				# ResidualBlock basic
				# res34 3 4 6 3
				self.layer1 = self.make_layer(ResidualBlock, 64, 3, stride=1)
				self.layer2 = self.make_layer(ResidualBlock, 128, 4, stride=2)
				self.layer3 = self.make_layer(ResidualBlock, 256, 6, stride=2)
				self.layer4 = self.make_layer(ResidualBlock, 512, 3, stride=2)
				self.drop = nn.Dropout(0.5)
				self.rfc = nn.Sequential(
					nn.Linear(512, MAX_CAPTCHA*ALL_CHAR_SET_LEN),
				)

		def make_layer(self, block, channels, num_blocks, stride):
				strides = [stride] + [1] * (num_blocks - 1)  # strides = [1,1]
				layers = []
				for stride in strides:
						layers.append(block(self.inchannel, channels, stride))
						self.inchannel = channels
				return nn.Sequential(*layers)

		def forward(self, x):
				x = self.conv1(x)
				# 100, 64, 96, 96
				x = self.layer1(x)
				# 100, 64, 96, 96
				x = self.layer2(x)
				# 100, 128, 48, 48
				x = self.layer3(x)
				# 100, 256, 24, 24
				x = self.layer4(x)
				# 100, 512, 12, 1
				x = nn.AdaptiveAvgPool2d(1)(x)
				# 100, 512, 1, 1
				x = x.view(-1, 512)
				# 100, 512
				x = self.drop(x)
				x = self.rfc(x)
				# 100, 248
				return x

In [14]:
def parse_submission(sample_submission_path):
	def assign_label_len(x):
		if x.startswith('task1'):
				return 1
		elif x.startswith('task2'):
				return 2
		else:
				return 4
	df = pd.read_csv(sample_submission_path)
	df['label'] = df['filename'].apply(assign_label_len)
	
	return df

In [15]:
def predict(model, data_loader, df):
	model.eval()
	model.load_state_dict(torch.load(model_weight_path))
	batches = tqdm(data_loader, total=len(data_loader), leave=False)
	preds = []

	for image, label, file_name in batches:
		image = image.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
		pred = model(image)
		if label.item() == 1.0:
			text_0 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, 0:ALL_CHAR_SET_LEN].data.cpu().numpy())]
			pred_decoded = f'{text_0}'
		elif label.item() == 2.0:
			text_0 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, 0:ALL_CHAR_SET_LEN].data.cpu().numpy())]
			text_1 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, ALL_CHAR_SET_LEN:2*ALL_CHAR_SET_LEN].data.cpu().numpy())]
			pred_decoded = f'{text_0}{text_1}'
		elif label.item() == 4.0:
			text_0 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, 0:ALL_CHAR_SET_LEN].data.cpu().numpy())]
			text_1 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, ALL_CHAR_SET_LEN:2*ALL_CHAR_SET_LEN].data.cpu().numpy())]
			text_2 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, 2*ALL_CHAR_SET_LEN:3*ALL_CHAR_SET_LEN].data.cpu().numpy())]
			text_3 = INDEX_TO_CAPTCHA_DICT[np.argmax(pred[0, 3*ALL_CHAR_SET_LEN:4*ALL_CHAR_SET_LEN].data.cpu().numpy())]
			pred_decoded = f'{text_0}{text_1}{text_2}{text_3}'
		
		preds.append(pred_decoded)
		batches.set_description(f'[label_length:{label.item()}/Pred:{pred_decoded}]')
		batches.set_postfix(file = file_name)

	df['label'] = preds
	
	df.to_csv(submission_path,index=False)

In [16]:
	df_test = parse_submission(sample_submission_path)
	test_ds = CaptchaDataset(dataset_path, df_test, is_predict=True)
	test_dl = DataLoader(test_ds, batch_size=1, num_workers=2, drop_last=True, shuffle=False)
	model = ResNet(ResidualBlock)
	model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
	predict(model, test_dl, df_test)

