# **Imports and data linking to Drive**

In [None]:
import torch
import torchvision.models as models
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import os
import random
import cv2
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
import seaborn as sn

In [None]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive')

#extract data
!unzip "/content/drive/MyDrive/Data/final_char_data.zip" -d "/content/char40_split"
!unzip "/content/drive/MyDrive/Data/hhd40.zip" -d "/content/pretrain_set"


# **Dataloader**

In [None]:
train_path = "/content/char40_split/final_char_data/train"
test_path =  "/content/char40_split/final_char_data/val"
test_path_no_morph = "/content/char40_split/final_char_data/val_no_morph"
pretrain_path = "/content/pretrain_set/binarized_hhd_40x40"

class ThresholdTransform(object):
  def __init__(self, thr_255):
    self.thr = thr_255 / 255.  # input threshold for [0..255] gray level, convert to [0..1]

  def __call__(self, x):
    return (x < self.thr).to(x.dtype)  # do not change the data type

class CharDataset(Dataset):
    def __init__(self, char_directory, size=(40, 40), normalize=False):
        super().__init__()
        # this should be the path to the directory where the char folders are
        self.char_directory = char_directory
        self.char_idx = dict()
        self.label_idx = dict()
        # go thru the folders and add the labels / path to chars to the dicts
        cnt = 0
        for j, label in enumerate(os.listdir(self.char_directory)):
          label_dir = os.path.join(self.char_directory, label)
          for i, fl in enumerate(os.listdir(label_dir)):
              self.label_idx[cnt] = label
              self.char_idx[cnt] = os.path.join(label_dir, fl)
              cnt += 1
        self.idx = 1
        # create a list of the indexes randomly shuffled use this to accses the dict
        # in this way we accses every char in the dict in a random order
        list_char = list(self.char_idx.keys())
        random.shuffle(list_char)
        self.random_keys = list_char
        self.range = len(self.char_idx.keys()) 
        if normalize:
            self.transform = transforms.Compose([
                transforms.Resize(size),
                transforms.Grayscale(num_output_channels=1),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize(size),
                transforms.Grayscale(num_output_channels=1),
                transforms.ToTensor(),
                ThresholdTransform(thr_255=200)                               
            ])
        

    def __getitem__(self, idx):
        #print(self.char_idx[self.random_keys[idx]])
        # char_img = cv2.imread(self.char_idx[self.random_keys[idx]])
        
        # ret, char_img = cv2.threshold(char_img,0,1,cv2.THRESH_BINARY)
        # char_img = Image.fromarray(char_img)
        char_img = Image.open(self.char_idx[self.random_keys[idx]])
        char_label = self.label_idx[self.random_keys[idx]]
        char_img = self.transform(char_img)

        return char_img, char_label

    def __len__(self):
        return len(self.char_idx.keys())

train_data = CharDataset(train_path)
train_dl = DataLoader(train_data, batch_size=1)
test_data = CharDataset(test_path)
test_data_no_morph = CharDataset(test_path_no_morph)
test_dl = DataLoader(test_data, batch_size=1)
pretrain_data = CharDataset(pretrain_path)

# Recognizer Model


In [None]:
class TheRecognizer(nn.Module):
  def __init__(self, l2_reg, lr):
    super(TheRecognizer, self).__init__()
    self.conv_layers = nn.Sequential(
        nn.Conv2d(in_channels=1, out_channels=10, kernel_size=5,stride=1, padding=0),
        nn.BatchNorm2d(10),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(10, 15, 5, 1, 0),
        nn.BatchNorm2d(15),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
        
    )
    self.lin_layers = nn.Sequential(
        nn.Linear(7*7*15, 300),
        nn.ReLU(),
        nn.Linear(300, 27),
        nn.LogSoftmax(dim=1)
    )

    self.opt = torch.optim.Adam(params=self.parameters(), weight_decay=l2_reg, lr=lr)


  def forward(self, x):
    x = self.conv_layers(x)
    x = x.view(-1, 7*7*15)
    x = self.lin_layers(x)

    return x

  def load_checkpoint(self, ckpt_path, map_location=None):
    ckpt = torch.load(ckpt_path, map_location=map_location)
    #print(' [*] Loading checkpoint from %s succeed!' % ckpt_path)
    return ckpt

  def save_checkpoint(self, state, save_path):
    torch.save(state, save_path)    

  def load_model(self, ckpt):
      self.epoch = ckpt['epoch']
      self.load_state_dict(ckpt['weights'])
      self.opt.load_state_dict(ckpt['optimizer'])

# Helper functions to compute accuracy

In [None]:
name2idx = {'Alef': 0, 'Ayin': 1, 'Bet': 2, 'Dalet': 3, 'Gimel' : 4, 'He': 5,
            'Het': 6, 'Kaf': 7, 'Kaf-final': 8, 'Lamed': 9, 'Mem': 10, 
            'Mem-medial': 11, 'Nun-final': 12, 'Nun-medial': 13, 'Pe': 14,
            'Pe-final': 15, 'Qof': 16, 'Resh': 17, 'Samekh': 18, 'Shin': 19,
            'Taw': 20, 'Tet': 21, 'Tsadi-final': 22, 'Tsadi-medial': 23,
            'Waw': 24, 'Yod': 25, 'Zayin': 26}

def get_test_acc(test_dl, model, device, con_mat = False):
  correct = 0
  wrong = 0
  y_true = []
  y_pred = []
  for t, data in enumerate(test_dl):
    img, label = data
    img = img.to(device)
    out = model(img)
    label_idx = name2idx[label[0]]
    out_idx = torch.argmax(out).detach().cpu().numpy()
    if con_mat:
      y_true.append(label_idx)
      y_pred.append(out_idx)
    if out_idx == label_idx:
      correct += 1
    else:
      wrong += 1
  if con_mat:
    c_mat = confusion_matrix(y_true, y_pred)
    sn.heatmap(c_mat)
  return correct/(correct+wrong)

def get_train_acc(train_dl, model, device):
  correct = 0
  wrong = 0 
  for t, data in enumerate(train_dl):
    img, label = data
    img = img.to(device)
    out = model(img)
    label_idx = name2idx[label[0]]
    out_idx = torch.argmax(out).detach().cpu().numpy()
    if out_idx == label_idx:
      correct += 1
    else:
      wrong += 1
    
  return correct/(correct+wrong)

def get_pretrain_acc(train_dl, model, device):
  correct = 0
  wrong = 0 
  for t, data in enumerate(train_dl):
    img, label = data
    img = img.to(device)
    out = model(img)
    out_idx = torch.argmax(out).detach().cpu().numpy()
    if int(out_idx) == int(label[0]):
      correct += 1
    else:
      wrong += 1
  return correct/(correct+wrong)

# Pretraining Loop

In [None]:
#Hyperparamters
lr = 0.001
batch_size = 15
epochs = 5
l2_reg = 0.0001

device = 'cuda'
model = TheRecognizer(l2_reg, lr)
model.to(device)
criterion = nn.NLLLoss()
opt = model.opt
pretrain_dl = DataLoader(pretrain_data, batch_size = batch_size)
pretrain_dl_test = DataLoader(pretrain_data, batch_size = 1)
test_dl = DataLoader(test_data, batch_size=1)


for i in range(epochs):
  for j, data in enumerate(pretrain_dl):
    opt.zero_grad()
    img, label = data
    label = np.asarray(label, dtype=int)
    img = img.to(device)
    out = model(img)
    target = torch.LongTensor(label).to(device)
    loss = criterion(out, target)
    loss.backward()
    opt.step()
  print("train acc this epoch", get_pretrain_acc(pretrain_dl_test, model, device))



In [None]:
# save the pretrained model
save_dict = {
        'epoch': 0,
        'weights': model.state_dict(),
        'optimizer': opt.state_dict()
    }

model.save_checkpoint(save_dict, save_path='pretrained_net.ckpt')

# Cross-validation on Pretrained model


In [None]:
def test_fold(test_list, model, device):
  correct = 0
  wrong = 0
  for img, label in test_list:
    img = img.to(device)
    out = model(img)
    label_idx = name2idx[label[0]]
    out_idx = torch.argmax(out).detach().cpu().numpy()
    if out_idx == label_idx:
      correct += 1
    else:
      wrong += 1
  return (correct/(correct+wrong))

In [None]:
# fixed Hyperparamters
l2_reg = 0
lr = 0.00155
epochs = 10
batch_size = 1
device = 'cuda'
criterion = nn.NLLLoss()
k = 5
lr_list = []
train_dl = DataLoader(train_data, batch_size = batch_size)
fold_length = len(train_dl) / k

# cross-val loop
for lr in np.arange(0.0001, 0.005, 0.0005):
  total_acc = 0
  fold_length_start = 0
  fold_length_end = fold_length
  for fold in range(k):
    model = TheRecognizer(l2_reg, lr)
    # load the pretrained model each time to reset the parameters
    model.load_model(model.load_checkpoint('pretrained_net.ckpt', map_location=torch.device('cuda')))
    model.to(device)
    opt = torch.optim.Adam(params=model.parameters(), weight_decay=l2_reg, lr=lr)
    # cross-val data split
    test_list = []
    train_list = []
    for j, data in enumerate(train_dl):
        if j < fold_length_end and j > fold_length_start:
          test_list.append(data)
        else:
          train_list.append(data)
    fold_length_start += fold_length
    fold_length_end += fold_length
    # train on part of the data
    for u in range(epochs):  
      for img, label in train_list:
        # emtpy label list each iteration
        label_idx = []
        opt.zero_grad()
        img = img.to(device)
        out = model(img)
        # loop and make list of label idx
        for x in range(len(label)):
          label_idx.append(name2idx[label[x]])
        target = torch.LongTensor(label_idx).to(device)
        loss = criterion(out, target)
        loss.backward()
        opt.step()
    # evaluate on other part
    total_acc += test_fold(test_list, model, device)

  print("current lr rate: ", lr)
  print("Acc: ", total_acc / k)
  lr_list.append({lr, (total_acc/k)})

current lr rate:  0
Acc:  0.9575327130276895
current lr rate:  0
Acc:  0.953432871778635


# Training loop on whole train data and testing accuracy on test set

In [None]:
# Best Hyperparamters found using cross-validation
lr = 0.00155
batch_size = 1
epochs = 20
l2_reg = 0.0001
model = TheRecognizer(l2_reg, lr)
# load the pretrained model each time to reset the parameters
model.load_model(model.load_checkpoint('pretrained_net.ckpt', map_location=torch.device('cuda')))
model.to(device)
# device = 'cuda'
# model = TheRecognizer()
# model.to(device)

optimizer = torch.optim.Adam(params=model.parameters(), weight_decay=l2_reg)
criterion = nn.NLLLoss()

train_dl = DataLoader(train_data, batch_size = batch_size)
test_dl = DataLoader(test_data, batch_size=1)
test_dl_no_morph = DataLoader(test_data_no_morph, batch_size=1)
best_acc = 0

for i in range(epochs):
  for j, data in enumerate(train_dl):
    # emtpy label list each iteration
    label_idx = []
    optimizer.zero_grad()
    img, label = data
    img = img.to(device)
    out = model(img)
    # loop and make list of label idx
    for x in range(len(label)):
      label_idx.append(name2idx[label[x]])
    target = torch.LongTensor(label_idx).to(device)
    loss = criterion(out, target)
    loss.backward()
    optimizer.step()
  print(i)
  test_acc = get_test_acc(test_dl, model, device)
  print("Train acc this epoch", get_train_acc(train_dl, model, device))
  print("Test acc this epoch", test_acc)
  if test_acc > best_acc:
    best_acc = test_acc
    save_dict = {
        'epoch': 0,
        'weights': model.state_dict(),
        'optimizer': optimizer.state_dict()
    }

    model.save_checkpoint(save_dict, save_path='40_char_rec.ckpt')
    print("BEST ACCURACY: ", best_acc)
  print("Test acc this epoch no morph", get_test_acc(test_dl_no_morph, model, device))


In [None]:
# save the final recognizer model 
save_dict = {
        'epoch': 0,
        'weights': model.state_dict(),
        'optimizer': optimizer.state_dict()
    }

model.save_checkpoint(save_dict, save_path='40_char_rec.ckpt')