In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import csv
import cv2
import numpy as np
import random
import os

from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import transforms
from PIL import Image

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
TEST_PATH = "./dataset/test"
# device = (torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'))
device = "cpu"
# try device = "cuda" 
# and change your settings/accelerator to GPU if you want it to run faster if your using kaggle's enviroment

In [3]:
cpt_dir = './model_weights/'
os.makedirs(cpt_dir, exist_ok=True)
phase_dir = './results/'
os.makedirs(phase_dir, exist_ok=True)

In [4]:
alphabets = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
alphabets2index = {alphabet:i for i, alphabet in enumerate(alphabets)}

Task1

In [5]:
class Task1Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task1")]
        self.return_filename = return_filename
        self.root = root
        #self.transform = transforms.RandomOrder(transform)
        self.transform = transforms.Compose([
            #transforms.Resize(32),
            transforms.ToTensor()
        ])
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread("{}/{}".format(self.root, filename))
        img = cv2.resize(img, (96, 96))
        img = cv2.medianBlur(img, 5)
        img = np.mean(img, axis=2)
        img = Image.fromarray(img)
        img = self.transform(img)
        
        if self.return_filename:
            return torch.FloatTensor((img - 128) / 128), filename
        else:
            return torch.FloatTensor((img - 128) / 128), alphabets2index[label]

    def __len__(self):
        return len(self.data)

In [6]:
def downsample(in_ch, out_ch, stride):
    return nn.Sequential(
        nn.Conv2d(in_ch, out_ch, kernel_size=(1, 1), stride=stride, bias=False),
        nn.BatchNorm2d(out_ch))

class BasicBlock(nn.Module):
    '''
    input -> con2d(3x3) -> BN -> activation -> con2d(3x3) -> BN -> activation -> output
    Perform downsampling directly by convolutional layers that have a stride of 2
    '''
    def __init__(self, in_ch, out_ch, downsample_stride):
        super(BasicBlock, self).__init__()
        if downsample_stride is None:
            self.conv1 = nn.Conv2d(in_ch, out_ch, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            self.downsample = None
        else:
            self.conv1 = nn.Conv2d(in_ch, out_ch, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
            self.downsample = downsample(in_ch, out_ch, downsample_stride)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_ch, out_ch, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        self.bn2 = nn.BatchNorm2d(out_ch)

    def forward(self, x):
        ori = x
        out = self.bn1(self.conv1(x))
        out = self.relu(out)
        out = self.bn2(self.conv2(out))
        if self.downsample is not None:
            ori = self.downsample(ori)
        out = self.relu(out+ori)
        return out

In [7]:
class ResNet18(nn.Module):
    def __init__(self):
        super(ResNet18, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = nn.Sequential(
            BasicBlock(64, 64, None),
            BasicBlock(64, 64, None)
        )
        self.layer2 = nn.Sequential(
            BasicBlock(64, 128, (2, 2)),
            BasicBlock(128, 128, None)
        )
        self.layer3 = nn.Sequential(
            BasicBlock(128, 256, (2, 2)),
            BasicBlock(256, 256, None)
        )
        self.layer4 = nn.Sequential(
            BasicBlock(256, 512, (2, 2)),
            BasicBlock(512, 512, None)
        )
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = nn.Linear(512, len(alphabets))

    def forward(self, x):
        out = self.bn1(self.conv1(x))
        out = self.relu(out)
        out = self.maxpool(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        out = self.fc(out.reshape(out.shape[0], -1))
        return out

In [9]:
model = ResNet18().to(device)

In [10]:
model.load_state_dict(torch.load('./model_weights/task1.pt'))

<All keys matched successfully>

In [11]:
test_data = []
with open('./dataset/sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

test_ds = Task1Dataset(test_data, root=TEST_PATH, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=32, num_workers=0, drop_last=False, shuffle=False)


if os.path.exists('submission.csv'):
    csv_writer = csv.writer(open('submission.csv', 'a', newline=''))
else:
    csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
    csv_writer.writerow(["filename", "label"])


model.eval()
batch_pbar = tqdm(test_dl)
for image, filenames in batch_pbar:
    image = image.to(device)
    
    pred = model(image)
    pred = torch.argmax(pred, dim=1)
    
    for i in range(len(filenames)):
        csv_writer.writerow([filenames[i], alphabets[pred[i].item()]])

# for filename, _ in test_data:
#     if filename.startswith("task2") or filename.startswith("task3"):
#         csv_writer.writerow([filename, 0])

100%|████████████████████████████████████████████████████████████████████████████████| 204/204 [06:57<00:00,  2.05s/it]


Task2

In [12]:
class Task2Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task2")]
        self.return_filename = return_filename
        self.root = root
        self.transform = transforms.Compose([
            #transforms.Resize(32),
            transforms.ToTensor()
        ])
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread("{}/{}".format(self.root, filename))
        img = cv2.resize(img, (96, 96))
        img = cv2.medianBlur(img, 5)
        img = np.mean(img, axis=2)
        img = Image.fromarray(img)
        img = self.transform(img)
        
        label_list = [[alphabets2index[digit]] for digit in label]
        label_list = np.array(label_list)
        #print(label_list)
        
        length = [len(label_list)]
        
        #label_list = [length] + label_list
        label_list = np.append([length], label_list, axis=0)
        
        if self.return_filename:
            return torch.FloatTensor((img - 128) / 128), filename
        else:
            return torch.FloatTensor((img - 128) / 128), label_list

    def __len__(self):
        return len(self.data)

In [13]:
class ResNet18_t2(nn.Module):
    def __init__(self):
        super(ResNet18_t2, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = nn.Sequential(
            BasicBlock(64, 64, None),
            BasicBlock(64, 64, None)
        )
        self.layer2 = nn.Sequential(
            BasicBlock(64, 128, (2, 2)),
            BasicBlock(128, 128, None)
        )
        self.layer3 = nn.Sequential(
            BasicBlock(128, 256, (2, 2)),
            BasicBlock(256, 256, None)
        )
        self.layer4 = nn.Sequential(
            BasicBlock(256, 512, (2, 2)),
            BasicBlock(512, 512, None)
        )
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.digitlength = nn.Linear(512, 1)
        self.digit1 = nn.Linear(512, len(alphabets))
        self.digit2 = nn.Linear(512, len(alphabets))

    def forward(self, x):
        out = self.bn1(self.conv1(x))
        out = self.relu(out)
        out = self.maxpool(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        yl = self.digitlength(out.reshape(out.shape[0], -1))
        y1 = self.digit1(out.reshape(out.shape[0], -1))
        y2 = self.digit2(out.reshape(out.shape[0], -1))
        #out = self.fc(out.reshape(out.shape[0], -1))
        return [yl, y1, y2]

In [14]:
model = ResNet18_t2().to(device)

In [15]:
model.load_state_dict(torch.load('./model_weights/task2.pt'))

<All keys matched successfully>

In [16]:
test_data = []
with open('./dataset/sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

test_ds = Task2Dataset(test_data, root=TEST_PATH, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=1, num_workers=0, drop_last=False, shuffle=False)


if os.path.exists('submission.csv'):
    csv_writer = csv.writer(open('submission.csv', 'a', newline=''))
else:
    csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
    csv_writer.writerow(["filename", "label"])


model.eval()
batch_pbar = tqdm(test_dl)
for image, filenames in batch_pbar:
    image = image.to(device)
    
    outputs = model(image)
        
    pred1 = torch.argmax(outputs[1])
    pred2 = torch.argmax(outputs[2])

    pred2str_first = alphabets[pred1]
    pred2str_second = alphabets[pred2]

    res = pred2str_first + pred2str_second
    
    filenames = ''.join(filenames)
    #print(filenames)
    
    csv_writer.writerow([filenames, res])

# for filename, _ in test_data:
#     if filename.startswith("task3"):
#         csv_writer.writerow([filename, 0])

100%|██████████████████████████████████████████████████████████████████████████████| 2500/2500 [05:24<00:00,  7.71it/s]


Task3

In [17]:
class Task3Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task3")]
        self.return_filename = return_filename
        self.root = root
        
        self.transform = transforms.Compose([
            #transforms.Resize(32),
            transforms.ToTensor()
        ])
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread("{}/{}".format(self.root, filename))
        img = cv2.resize(img, (96, 96))
        img = cv2.medianBlur(img, 5)
        img = np.mean(img, axis=2)
        img = Image.fromarray(img)
        img = self.transform(img)
        
        label_list = [[alphabets2index[digit]] for digit in label]
        label_list = np.array(label_list)
        #print(label_list)
        
        length = [len(label_list)]
        
        #label_list = [length] + label_list
        label_list = np.append([length], label_list, axis=0)
        
        if self.return_filename:
            return torch.FloatTensor((img - 128) / 128), filename
        else:
            return torch.FloatTensor((img - 128) / 128), label_list

    def __len__(self):
        return len(self.data)

In [18]:
class ResNet18_t3(nn.Module):
    def __init__(self):
        super(ResNet18_t3, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = nn.Sequential(
            BasicBlock(64, 64, None),
            BasicBlock(64, 64, None)
        )
        self.layer2 = nn.Sequential(
            BasicBlock(64, 128, (2, 2)),
            BasicBlock(128, 128, None)
        )
        self.layer3 = nn.Sequential(
            BasicBlock(128, 256, (2, 2)),
            BasicBlock(256, 256, None)
        )
        self.layer4 = nn.Sequential(
            BasicBlock(256, 512, (2, 2)),
            BasicBlock(512, 512, None)
        )
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.digitlength = nn.Linear(512, 1)
        self.digit1 = nn.Linear(512, len(alphabets))
        self.digit2 = nn.Linear(512, len(alphabets))
        self.digit3 = nn.Linear(512, len(alphabets))
        self.digit4 = nn.Linear(512, len(alphabets))

    def forward(self, x):
        out = self.bn1(self.conv1(x))
        out = self.relu(out)
        out = self.maxpool(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgpool(out)
        yl = self.digitlength(out.reshape(out.shape[0], -1))
        y1 = self.digit1(out.reshape(out.shape[0], -1))
        y2 = self.digit2(out.reshape(out.shape[0], -1))
        y3 = self.digit3(out.reshape(out.shape[0], -1))
        y4 = self.digit4(out.reshape(out.shape[0], -1))
        return [yl, y1, y2, y3, y4]

In [19]:
model = ResNet18_t3().to(device)

In [20]:
model.load_state_dict(torch.load('./model_weights/task3.pt'))

<All keys matched successfully>

In [21]:
test_data = []
with open('./dataset/sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

test_ds = Task3Dataset(test_data, root=TEST_PATH, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=1, num_workers=0, drop_last=False, shuffle=False)


if os.path.exists('submission.csv'):
    csv_writer = csv.writer(open('submission.csv', 'a', newline=''))
else:
    csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
    csv_writer.writerow(["filename", "label"])


model.eval()
batch_pbar = tqdm(test_dl)
for image, filenames in batch_pbar:
    image = image.to(device)
    
    outputs = model(image)
        
    pred1 = torch.argmax(outputs[1])
    pred2 = torch.argmax(outputs[2])
    pred3 = torch.argmax(outputs[3])
    pred4 = torch.argmax(outputs[4])

    pred2str_first = alphabets[pred1]
    pred2str_second = alphabets[pred2]
    pred2str_third = alphabets[pred3]
    pred2str_fourth = alphabets[pred4]

    res = pred2str_first + pred2str_second + pred2str_third + pred2str_fourth
    
    filenames = ''.join(filenames)
    #print(filenames)
    
    csv_writer.writerow([filenames, res])

# for filename, _ in test_data:
#     if filename.startswith("task3"):
#         csv_writer.writerow([filename, 0])

100%|██████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:13<00:00, 74.88it/s]
