In [None]:
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns

import os, cv2, collections, glob
from pathlib import Path
from typing import List, Tuple

from sklearn.model_selection import train_test_split

import albumentations
import pretrainedmodels

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.optim import Adam

import catalyst
from catalyst.dl import utils
from catalyst.dl.runner import SupervisedRunner
from catalyst.dl.callbacks import EarlyStoppingCallback, AccuracyCallback
from catalyst.contrib.nn import OneCycleLRWithWarmup
from catalyst.utils import imread
from catalyst.contrib.callbacks.inference_callback import InferCallback


In [None]:
train_image_path = "/home/hasan/Data Set/Bengali Digit/training/images"
train_file_path = "/home/hasan/Data Set/Bengali Digit/training/files"
test_image_path = "/home/hasan/Data Set/Bengali Digit/testing"


In [None]:
train_img = os.listdir(train_image_path)
train_img.sort()
train_file = glob.glob(f"{train_file_path}/*.csv")
train_file.sort()
test_img = os.listdir(test_image_path)
test_img.sort()
print("Train image folder name : \n{}  \n\nTrain file name: \n{} \n\nTest image folder name : \n{}". format(train_img, train_file, test_img))

# Preprocessing Train Images

In [None]:
df = pd.read_csv(train_file[0], usecols=['filename', 'digit'])
df1 = pd.read_csv(train_file[1], usecols=['filename', 'digit'])
df2 = pd.read_csv(train_file[2], usecols=['filename', 'digit'])
df3 = pd.read_csv(train_file[3], usecols=['filename', 'digit'])
df4 = pd.read_csv(train_file[4], usecols=['filename', 'digit'])
new_df = df.append([df1, df2, df3, df4], ignore_index=True)
new_df.head()

In [None]:
train_dict = {"path": [], "label": []}

for i, folder in enumerate(train_img):
    path = os.path.join(train_image_path, folder)
    file = [df, df1, df2, df3, df4]
    
    for img in range(len(file[i])):
        file_one = file[i]
        image = file_one.filename[img]
        label = file_one.digit[img]
        
        full_path = os.path.join(path, image)
            
        #train_dict['image_name'].append(image)
        train_dict['label'].append(label)
        train_dict['path'].append(full_path)
        

In [None]:
df = pd.DataFrame(train_dict)
df = df.sample(frac=1).reset_index(drop=True)
df.head()

# Preprocessing Test Images

In [None]:
test_dict = {'path': [], 'label': []}

for d in test_img:
    path = os.path.join(test_image_path, d)
    cls = test_img.index(d) # this is not correct class! I took it just for formality 
    for img in os.listdir(path):
        full_path = os.path.join(path, img)
        
        test_dict['path'].append(full_path)
        test_dict['label'].append(cls) 
        

In [None]:
test_df = pd.DataFrame(test_dict)
test_df = test_df.sample(frac=1).reset_index(drop=True)
test_df.head()

# Dividing into Train and Valid dataset

In [None]:
x_train, x_valid, y_train, y_valid = train_test_split(df.path, df.label, test_size=.2, stratify=df.label)
print("x_train shape {} x_valid shape {} y_train shape {} y_valid shape {}".format(x_train.shape, x_valid.shape, y_train.shape, y_valid.shape))


In [None]:
# Feature of test dataset 
x_test = pd.Series(test_df.path)
y_test = pd.Series(test_df.label) # don't need this one! as your wish to take or not 


### Custom Dataset

In [None]:
class Custom_Dataset:
    def __init__(self, image, label, test_data, train_data=False):
        self.img_path = image
        self.img_label = label
        self.test_data = test_data

        if train_data:
            self.aug = albumentations.Compose([
                                albumentations.Resize(32, 32, always_apply=True),
                                albumentations.ShiftScaleRotate(shift_limit=0.0625,
                                                                scale_limit=0.1,
                                                                rotate_limit=5,
                                                                p=0.9),
                                #albumentations.RandomBrightnessContrast(always_apply=False),
                                albumentations.RandomRotate90(always_apply=False),
                                albumentations.HorizontalFlip(),
                                albumentations.VerticalFlip(),
                                albumentations.Normalize(mean=(0.485, 0.456, 0.406), 
                                                         std=(0.229, 0.224, 0.225), 
                                                         always_apply=True)              
                                                ])

        else:
            self.aug = albumentations.Compose([
                                albumentations.Resize(32, 32, always_apply=True),
                                albumentations.Normalize(mean=(0.485, 0.456, 0.406), 
                                                         std=(0.229, 0.224, 0.225),
                                                         always_apply=True) 
                                ])                       
            

    def __len__(self):
        return len(self.img_path) 


    def __getitem__(self, idx):
        image1 = self.img_path[idx]
        image2 = cv2.imread(image1)
        image = cv2.resize(image2, (32, 32)).astype(float)
        #image = image.reshape(128, 128, 3).astype(float)
        #image = Image.fromarray(image).convert("RGB")
        img = self.aug(image=np.array(image))['image']
        img1 = np.transpose(img, (2, 0, 1)).astype(np.float) 
        label = self.img_label[idx]
        
        img2 = torch.tensor(img1, dtype=torch.float)
        label2 = torch.tensor(label, dtype=torch.long)
        
        if self.test_data == 1:
            return img2
        
        return img2, label2 
          

In [None]:
train_set = Custom_Dataset(image = x_train.values, 
                           label = y_train.values, 
                           test_data=0,
                           train_data=True
                           )

valid_set = Custom_Dataset(image = x_valid.values, 
                           label =y_valid.values, 
                           test_data = 0,
                           train_data = False
                           )

test_set = Custom_Dataset(image = x_test.values,
                          label = y_test.values,
                          test_data = 1,
                          train_data = False
                          )


In [None]:
train_loader = DataLoader(train_set, 
                          batch_size = 32, 
                          shuffle = True,
                          num_workers = 0)

valid_loader = DataLoader(valid_set,
                          batch_size = 32,
                          shuffle = False,
                          num_workers = 0)

test_loader = DataLoader(test_set)

In [None]:
loaders = collections.OrderedDict()

loaders['train'] = train_loader
loaders['valid'] = valid_loader 
loaders['test'] = test_loader
loaders

# Model

In [None]:
class Resnet34(nn.Module):
    def __init__(self):
        super(Resnet34, self).__init__()
        self.model = pretrainedmodels.__dict__['resnet34'](pretrained='imagenet')  
        self.l0 = nn.Linear(512, 10)

    def forward(self, x):
        bs, c, h, w = x.shape
        x = self.model.features(x) 
        x = F.adaptive_avg_pool2d(x, 1).reshape(bs, -1)
        op_layer_one = self.l0(x)
        return op_layer_one


# Some Variable

In [None]:
num_epochs = 30
num_classes = 10
logdir = "./logs/bengali_digit_simple_notebook_1"
device = utils.get_device()

is_fp16_used = False
if is_fp16_used:
    fp16_params = dict(opt_level="01") # params for fp16
else:
    fp16_params = None

# Model Train

In [None]:

# model, criterion, optimizer
model = Resnet34()
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.0003)
scheduler = OneCycleLRWithWarmup(
                            optimizer,
                            num_steps=num_epochs, 
                            lr_range=(0.005, 0.00005),
                            warmup_steps=2,
                            momentum_range=(0.85, 0.95)
                            )


# model runner
runner = SupervisedRunner(device=device)


callbacks = [
    AccuracyCallback(
                    num_classes=num_classes
                    ),
    #AUCCallback(num_classes=num_classes,input_key="targets_one_hot", class_names=class_names),
    #F1ScoreCallback(input_key="targets_one_hot", activation="Softmax"),
    EarlyStoppingCallback(
                         patience=2, 
                         metric="loss", 
                         minimize=True, 
                         min_delta=1e-06
                         )
]

In [None]:
# model training
runner.train(
    model=model,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    loaders={"train": loaders['train'], "valid": loaders['valid']},
    valid_loader = "valid", 
    num_epochs=num_epochs,
    callbacks=callbacks,
    logdir=logdir,
    minimize_metric=False,
    fp16=fp16_params,
    verbose=True
)


# Accuracy and Loss Graph of Training

In [None]:
utils.plot_metrics(logdir=logdir)

In [None]:
utils.plot_metrics(logdir=logdir, step="epoch", metrics=["accuracy01"])

# Visualizing Some Images

In [None]:
root_path = "/home/hasan/Data Set/Bengali Digit/training/images"
folder_name = os.listdir(root_path)
folder_name.sort()

In [None]:
all_img_path = []

for i, fold in enumerate(folder_name):
    ALL_IMAGES = list(Path(os.path.join(root_path, fold)).glob("**/*.png"))
    ALL_IMAGES = list(filter(lambda x: not x.name.startswith("."), ALL_IMAGES))
    all_img_path = all_img_path + ALL_IMAGES
        

In [None]:
print("Total number of training images are :{}".format(len(l)))

In [None]:
def show_examples(images: List[Tuple[str, np.ndarray]]):
    _indexes = [(i, j) for i in range(2) for j in range(2)]
    
    f, ax = plt.subplots(2, 2, figsize=(16, 16))
    for (i, j), (title, img) in zip(_indexes, images):
        ax[i, j].imshow(img)
        ax[i, j].set_title(title)
    f.tight_layout()
    
    
def read_random_images(paths: List[Path]) -> List[Tuple[str, np.ndarray]]:
    data = np.random.choice(paths, size=4)
    result = []
    for d in data:
        title = f"{d.parent.name}: {d.name}"
        _image = imread(d)
        result.append((title, _image))
    
    return result

In [None]:
images = read_random_images(all_img_path)
show_examples(images)

# Predicting with Valid data

In [None]:
# predicted result of valid dataset
predictions = np.vstack(list(map(
    lambda x: x["logits"].cpu().numpy(), 
    runner.predict_loader(loader=loaders["valid"], resume=f"{logdir}/checkpoints/best.pth")
)))
print(predictions.shape)


# taking only one predicted result
print("logits: ", predictions[0])

probabilities = torch.softmax(torch.from_numpy(predictions[0]), dim=0)
print("probabilities: ", probabilities)

label = probabilities.argmax().item()
print("Predicted label is :{}".format(label))
#print(f"predicted: {class_names[label]}")


In [None]:
# All predicted labels
valid_label_dict = {"label": []}

for i in range(len(predictions)):
    probabilities = torch.softmax(torch.from_numpy(predictions[i]), dim=0)
    label = probabilities.argmax().item()
    valid_label_dict['label'].append(label)
    

# Predicting with Test data

In [None]:
# Here label columns data are incorrect 
test_df.head(3)

In [None]:
test_loader = collections.OrderedDict([("infer", loaders["test"])])

runner.infer(
    model=model,
    loaders=test_loader,
    callbacks=[InferCallback()],
)

In [None]:
predicted_pro = runner.callbacks[0].predictions["logits"]
predicted_pro.head()

In [None]:
predicted_pro.shape