In [184]:
import os
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
import torchvision.transforms as T
from PIL import Image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torchvision import models
import torch.nn as nn
import torch.nn.functional as F
from colorama import Fore

import torch
from torch.autograd import Variable
from tqdm import tqdm

In [185]:
plt.rcParams["figure.figsize"] = (10,10)

In [186]:
gpu = torch.device('cuda')

In [187]:
PATH_TRAIN = '../data/train.pkl'
PATH_TEST = '../data/test.pkl'
PATH_LABEL = '../data/train_y.csv'
PATH_IMAGE_TRAIN = '../images/train'
PATH_IMAGE_TEST = '../images/test'

In [188]:
epochs = 5
num_classes = 10
gru_hidden_size = 128
gru_num_layers = 2
cnn_output_height = 16
cnn_output_width = 64
digits_per_sequence = 3
dataset_sequences = []
dataset_labels = []

In [189]:
images_train = pd.read_pickle(PATH_TRAIN)
mean = images_train.mean()
std = images_train.std()

In [190]:
transform = T.Compose([
    T.Resize([64, 64]),
    T.ToTensor(),
    T.Grayscale(),
    T.Normalize(mean=mean, std=std)
])

In [191]:
batch_size = 64

dataset = ImageFolder(root=PATH_IMAGE_TRAIN, transform=transform)

# Split
train_set, val_set = torch.utils.data.random_split(dataset, [35000, 5000])

# Create dataloader
train_loader = DataLoader(dataset=train_set, shuffle=True, batch_size=batch_size, pin_memory=True)
val_loader = DataLoader(dataset=val_set, shuffle=True, batch_size=1, pin_memory=True)

In [192]:
# ================================================= MODEL ==============================================================
class CRNN(nn.Module):

    def __init__(self):
        super(CRNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(6, 6))
        self.norm1 = nn.InstanceNorm2d(64)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=(6, 6), stride=2)
        self.norm2 = nn.InstanceNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=(6, 6))
        self.norm3 = nn.InstanceNorm2d(128)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=(6, 6), stride=2)
        self.norm4 = nn.InstanceNorm2d(64)
        self.gru_input_size = cnn_output_height * 128
        self.gru = nn.GRU(self.gru_input_size, gru_hidden_size, gru_num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(gru_hidden_size * 2, num_classes)

    def forward(self, x):
        batch_size = x.shape[0]
        out = self.conv1(x)
        out = self.norm1(out)
        out = F.leaky_relu(out)
        out = self.conv2(out)
        out = self.norm2(out)
        out = F.leaky_relu(out)
        out = self.conv3(out)
        out = self.norm3(out)
        out = F.leaky_relu(out)
        out = self.conv4(out)
        out = self.norm4(out)
        out = F.leaky_relu(out)
        out, _ = self.gru(out)
        out = torch.stack([F.log_softmax(self.fc(out[i]), dim=-1) for i in range(out.shape[0])])
        return out

In [193]:
model = CRNN().to(DEVICE)
criterion = nn.CTCLoss(reduction='mean', zero_infinity=True)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [195]:
# ================================================ TRAINING MODEL ======================================================
for _ in range(epochs):
    # ============================================ TRAINING ============================================================
    train_correct = 0
    train_total = 0
    for x_train, y_train in tqdm(train_loader,
                                 position=0, leave=True,
                                 file=sys.stdout, bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Fore.RESET)):
        print(x_train.shape)
        batch_size = x_train.shape[0]
        optimizer.zero_grad()
        y_pred = model(x_train.cuda())
        input_lengths = torch.IntTensor(batch_size).fill_(cnn_output_width)
        target_lengths = torch.IntTensor([len(t) for t in y_train])
        loss = criterion(y_pred, y_train, input_lengths, target_lengths)
        loss.backward()
        optimizer.step()
        _, max_index = torch.max(y_pred, dim=2)  # max_index.shape == torch.Size([32, 64])
        for i in range(batch_size):
            raw_prediction = list(max_index[:, i].detach().cpu().numpy())  # len(raw_prediction) == 32
            prediction = torch.IntTensor([c for c, _ in groupby(raw_prediction) if c != blank_label])
            if len(prediction) == len(y_train[i]) and torch.all(prediction.eq(y_train[i])):
                train_correct += 1
            train_total += 1
    print('TRAINING. Correct: ', train_correct, '/', train_total, '=', train_correct / train_total)

    # ============================================ VALIDATION ==========================================================
    val_correct = 0
    val_total = 0
    for x_val, y_val in tqdm(val_loader,
                             position=0, leave=True,
                             file=sys.stdout, bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.BLUE, Fore.RESET)):
        batch_size = x_val.shape[0]
        y_pred = model(x_val.cuda())
        y_pred = y_pred.permute(1, 0, 2)
        input_lengths = torch.IntTensor(batch_size).fill_(cnn_output_width)
        target_lengths = torch.IntTensor([len(t) for t in y_val])
        criterion(y_pred, y_val, input_lengths, target_lengths)
        _, max_index = torch.max(y_pred, dim=2)
        for i in range(batch_size):
            raw_prediction = list(max_index[:, i].detach().cpu().numpy())
            prediction = torch.IntTensor([c for c, _ in groupby(raw_prediction) if c != blank_label])
            if len(prediction) == len(y_val[i]) and torch.all(prediction.eq(y_val[i])):
                val_correct += 1
            val_total += 1
    print('TESTING. Correct: ', val_correct, '/', val_total, '=', val_correct / val_total)

  0%|[32m          [39m| 0/547 [00:00<?, ?it/s]torch.Size([64, 1, 64, 64])
  0%|[32m          [39m| 0/547 [00:00<?, ?it/s]


RuntimeError: input must have 3 dimensions, got 4