In [1]:
import csv
import cv2
import numpy as np
import random
import os

from tqdm import tqdm

import torch
import torch.nn as nn
from torchvision import models,transforms, datasets
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim
from captcha.image import ImageCaptcha
import string

In [2]:
TRAIN_PATH = "train"
TEST_PATH = "test"
BATCH = 50
device = "cuda" if torch.cuda.is_available() else "cpu"
# try device = "cuda" 
# and change your settings/accelerator to GPU if you want it to run faster

In [3]:
ALPHABET = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
NUMBER = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
NUM_ALPHA = NUMBER + ALPHABET
def encode(label):
    ohlabel = []
    for l in label:
        oh = [0]*len(NUM_ALPHA)
        idx = NUM_ALPHA.index(l)
        oh[idx] = 1
        ohlabel += oh
    #print(label)
    #print(ohlabel)
    return np.array(ohlabel)

        

In [4]:
#generate data for task3
def gen_train_data(width, height, n_len, task):
    n_class = len(NUM_ALPHA)
    for i in range(5000):
        generator = ImageCaptcha(width=width, height=height)
        random_str = ''.join([random.choice(NUM_ALPHA) for j in range(n_len)])
        img = generator.generate_image(random_str)

        with open('./train/annotations.csv', 'a', newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow([f"task{task}/moretrain{i}.png", random_str])
        img.save(f"./train/task{task}/moretrain{i}.png")

#gen_train_data(72, 72, 2, 2)
#gen_train_data(96, 72, 4, 3)

'from captcha.image import ImageCaptcha\nimport matplotlib.pyplot as plt\nimport numpy as np\nimport random\n\nimport string\n\nwidth, height, n_len, n_class = 72, 72, 2, len(NUM_ALPHA)\nfor i in range(5000):\n    generator = ImageCaptcha(width=width, height=height)\n    random_str = \'\'.join([random.choice(NUM_ALPHA) for j in range(n_len)])\n    img = generator.generate_image(random_str)\n\n    with open(\'./train/annotations.csv\', \'a\', newline=\'\') as csvfile:\n        csv_writer = csv.writer(csvfile)\n        csv_writer.writerow([f"task2/moretrain{i}.png", random_str])\n    img.save(f"./train/task2/moretrain{i}.png")\n'

In [5]:
class Task1Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task1")]
        self.return_filename = return_filename
        self.root = root
        self.captchalen = 1
        self.h = None
        self.w = None
        self.c = 3
    
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread(f"{self.root}/{filename}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        h, w= img.shape
        self.h = h
        self.w = w
        #img = cv2.resize(img, (512,512))
        #img = np.mean(img, axis=2)
        #print(img.shape)
        if self.return_filename:
            return torch.FloatTensor(img), filename
        else:
            return torch.FloatTensor(img), encode(label)

    def __len__(self):
        return len(self.data)

In [6]:
class Task2Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task2")]
        self.return_filename = return_filename
        self.root = root
        self.captchalen = 2
        
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread(f"{self.root}/{filename}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        h, w= img.shape
        img = cv2.resize(img, (72,72))
        #img = np.mean(img, axis=2)
        #imgplot = plt.imshow(img)
        #plt.show()
        if self.return_filename:
            return torch.FloatTensor(img), filename
        else:
            return torch.FloatTensor(img), encode(label)

    def __len__(self):
        return len(self.data)

In [7]:
class Task3Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task3")]
        self.return_filename = return_filename
        self.root = root
        self.captchalen = 4
        
    def __getitem__(self, index):
        filename, label = self.data[index]
        img = cv2.imread(f"{self.root}/{filename}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        h, w= img.shape
        #print(h,w) 72 96
        #img = cv2.resize(img, (32, 32))
        #img = np.mean(img, axis=2)
        #imgplot = plt.imshow(img)
        #plt.show()
        if self.return_filename:
            return torch.FloatTensor(img), filename
        else:
            return torch.FloatTensor(img), encode(label)

    def __len__(self):
        return len(self.data)

In [8]:

train_data = []
val_data = []

with open(f'{TRAIN_PATH}/annotations.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        if random.random() < 0.7:
            train_data.append(row)
        else:
            val_data.append(row)

train_ds = Task1Dataset(train_data, root=TRAIN_PATH)
train_dl = DataLoader(train_ds, batch_size=BATCH, num_workers=0, drop_last=True, shuffle=True)
val_ds = Task1Dataset(val_data, root=TRAIN_PATH)
val_dl = DataLoader(val_ds, batch_size=BATCH, num_workers=0, drop_last=False, shuffle=False)

train2_ds = Task2Dataset(train_data, root=TRAIN_PATH)
train2_dl = DataLoader(train2_ds, batch_size=BATCH, num_workers=0, drop_last=True, shuffle=True)
val2_ds = Task2Dataset(val_data, root=TRAIN_PATH)
val2_dl = DataLoader(val2_ds, batch_size=BATCH, num_workers=0, drop_last=False, shuffle=False)

train3_ds = Task3Dataset(train_data, root=TRAIN_PATH)
train3_dl = DataLoader(train3_ds, batch_size=BATCH, num_workers=0, drop_last=True, shuffle=True)
val3_ds = Task3Dataset(val_data, root=TRAIN_PATH)
val3_dl = DataLoader(val3_ds, batch_size=BATCH, num_workers=0, drop_last=False, shuffle=False)

In [9]:

class Model(nn.Module):
    def __init__(self, OUTPUT_LEN, TEMP_OUT):
        super().__init__()
        self.OUTPUT_LEN = OUTPUT_LEN
        self.TEMP_OUT = TEMP_OUT
        self.conv1 = nn.Sequential(
            #nn.Conv2d(1, 3, kernel_size=3),
            nn.Conv2d(1, 8, kernel_size=3),
            nn.BatchNorm2d(8),
            nn.Conv2d(8, 16, kernel_size=3),
            nn.AvgPool2d(2),
            nn.BatchNorm2d(16),
            nn.ReLU()
        )
        self.conv2 = nn.Sequential(
            
            nn.Conv2d(16, 128, kernel_size=5),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, kernel_size=3),
            nn.AvgPool2d(2),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3),
            nn.BatchNorm2d(256),
            nn.Conv2d(256, 256, kernel_size=5),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(256),
            nn.ReLU()
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3),
            nn.Conv2d(512, 512, kernel_size=3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )
        # convolutional layer (sees 30*30*3 tensor)
        # linear layer (28*28*3 -> 100)
        self.fc1 = nn.Linear(TEMP_OUT, 500)
        self.drop = nn.Dropout(0.2)
        # linear layer (100 -> 10)
        self.fc2 = nn.Linear(500, self.OUTPUT_LEN)
        
        
    def forward(self, x):
        #print(x.size())
        batch, height, width = x.shape
        x = x.view(batch, 1, height, width)
        #print(x.size())
        # sequance of convolutional layers with relu activation
        x = self.conv1(x)
        #print(x.size())
        x = self.conv2(x)
        #x = self.drop(x)
        #print(x.size())
        x = self.conv3(x)
        x = self.drop(x)
        #print(x.size())
        #x = self.conv4(x)
        # flatten the image input
        #print(x.shape)
        x = x.view(-1, self.TEMP_OUT)
        # 1st hidden layer with relu activation
        #print(x.size())
        x = F.relu(self.fc1(x))
        # output-layer
        #print(x.size())
        
        #print(x.size())
        x = self.fc2(x)
        #print(x.size())
        return x

In [11]:
def train(train_dl, val_dl, train_ds, epochs=80, TEMP_OUT=4096):
    task = train_ds.captchalen
    OUTPUT_LEN = len(NUM_ALPHA) * train_ds.captchalen
    model = Model(OUTPUT_LEN = OUTPUT_LEN, TEMP_OUT = TEMP_OUT).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
    #optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    loss_fn = nn.CrossEntropyLoss()

    best_acc = 0
    for epoch in range(epochs):
        print(f"Epoch [{epoch}]")
        model.train()
        for image, ohlabel in train_dl:
            image = image.to(device)
            ohlabel = ohlabel.to(device,dtype=torch.float)
            #label = torch.cuda.LongTensor(label)
            
            pred = model(image)
            #print(pred.size())
            #print(ohlabel.size())
            loss = loss_fn(pred, ohlabel)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
        sample_count = 0
        correct_count = 0
        model.eval()
        for image, ohlabel in val_dl:
            image = image.to(device)
            ohlabel = ohlabel.to(device,dtype=torch.float)
            
            pred = model(image)
            #print(pred.size())
            #print(ohlabel.size())
            loss = loss_fn(pred, ohlabel)
            
            #pred = torch.argmax(pred, dim=1)
            

            #print(torch.argmax(pred[:,0:36], dim=1))
            #print(ohlabel)
            same_i = [0]*task
            for i in range(task):
                pred_i = torch.argmax(pred[:,i*36:(i+1)*36], dim=1)
                label = torch.argmax(ohlabel[:,i*36:(i+1)*36], dim=1)
                same_i[i] = torch.eq(pred_i, label).type(torch.uint8)
            result = same_i[0]
            for i in range(1,task):
                result = torch.logical_and(result, same_i[i]).type(torch.uint8)
            #print(result)
            sample_count += len(image)
            #print(label.size())
            #print(pred.size())
            correct_count += result.sum()
        val_acc = correct_count / sample_count
        print("accuracy (validation):", val_acc)
        if best_acc <= val_acc:
            best_acc = val_acc
            PATH=f"task{task}.pt"
            torch.save({
                    'epoch': epoch+1,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': loss,
                    }, PATH)
    return PATH


In [19]:
Path1 = train(train_dl, val_dl, train_ds, epochs=30, TEMP_OUT=4096)
Path2 = train(train2_dl, val2_dl, train2_ds, epochs=100, TEMP_OUT=4096)
Path3 = train(train3_dl, val3_dl, train3_ds, epochs=120, TEMP_OUT=7168)

Epoch [0]
accuracy (validation): tensor(0.3022, device='cuda:0')
Epoch [1]
accuracy (validation): tensor(0.5961, device='cuda:0')
Epoch [2]
accuracy (validation): tensor(0.7303, device='cuda:0')
Epoch [3]
accuracy (validation): tensor(0.7382, device='cuda:0')
Epoch [4]
accuracy (validation): tensor(0.7732, device='cuda:0')
Epoch [5]
accuracy (validation): tensor(0.8140, device='cuda:0')
Epoch [6]
accuracy (validation): tensor(0.8224, device='cuda:0')
Epoch [7]
accuracy (validation): tensor(0.8272, device='cuda:0')
Epoch [8]
accuracy (validation): tensor(0.8557, device='cuda:0')
Epoch [9]
accuracy (validation): tensor(0.8390, device='cuda:0')
Epoch [10]
accuracy (validation): tensor(0.8399, device='cuda:0')
Epoch [11]
accuracy (validation): tensor(0.8632, device='cuda:0')
Epoch [12]
accuracy (validation): tensor(0.8614, device='cuda:0')
Epoch [13]
accuracy (validation): tensor(0.8724, device='cuda:0')
Epoch [14]
accuracy (validation): tensor(0.8566, device='cuda:0')
Epoch [15]
accuracy 