In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All"
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
#importing necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset
import torchvision
from torchvision import datasets,transforms
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image
import albumentations as A
from tqdm import tqdm
import torch.optim as optim
from albumentations.pytorch import ToTensorV2
from sklearn.preprocessing import OneHotEncoder
import torch.nn as nn
import torch.nn.functional as F

In [None]:
#loading the training dataset
df_train  = pd.read_csv("/kaggle/input/train-images-doodle-detectives/train.csv")
df_train.info(memory_usage="deep")

In [None]:
#using the given function to get the image from the given csv file
from PIL import Image, ImageDraw
import numpy as np
import json

def vector_to_numpy(drawing, side=256):
    image = vector_to_image(drawing, side)
    image_array = np.array(image)
    return image_array

def vector_to_image(drawing, side=256):
    drawing = json.loads(drawing)
    min_x, min_y, max_x, max_y = calculate_bounding_box(drawing)

    # Calculate the offset to center the drawing within the canvas
    offset_x = (side - (max_x - min_x + 1)) // 2
    offset_y = (side - (max_y - min_y + 1)) // 2

    image = Image.new('L', (side, side), color='white')  # Create a white canvas
    draw = ImageDraw.Draw(image)

    for x, y in drawing:
        xy = [(x0 - min_x + offset_x, y0 - min_y + offset_y) for x0, y0 in zip(x, y)]
        draw.line(xy, fill='black', width=1)

    return image

def calculate_bounding_box(drawing):
    all_x = [x for x, _ in drawing]
    all_y = [y for _, y in drawing]

    min_x = min(min(x) for x in all_x)
    min_y = min(min(y) for y in all_y)
    max_x = max(max(x) for x in all_x)
    max_y = max(max(y) for y in all_y)

    return min_x, min_y, max_x, max_y


In [None]:
df_train["word"].value_counts()

In [None]:
#one hot encoding of labels
def one_hot_enc(df_train):
        df_train['word'] = df_train['word'].astype('category')
        df_train['word_new'] = df_train['word'].cat.codes
        enc = OneHotEncoder()
        enc_data = enc.fit_transform(
            df_train[['word_new']]).toarray()
        return enc_data

enc_data = one_hot_enc(df_train)
enc_data

In [None]:
#performing transforms on the given images
IMAGE_HEIGHT = 256
IMAGE_WIDTH  = 256
train_transform = A.Compose(
        [
            A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
            A.Rotate(limit=35, p=1.0),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.1),
            A.Normalize(
                mean=[0.0],
                std=[1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )

val_transform = A.Compose(
        [
            A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
            A.Normalize(
                mean=[0.0],
                std=[1.0],
                max_pixel_value=255.0,
            ),
            ToTensorV2(),
        ],
    )

In [None]:
#creating a train dataset
class train_images(Dataset):
    def __init__(self,X,Y,transform):
        self.dataframe = X
        self.transform = transform
        self.encodings = Y

    '''def one_hot_enc(self,df_train):
        df_train['word'] = df_train['word'].astype('category')
        # Assigning numerical values and storing it in another columns
        df_train['word_new'] = df_train['word'].cat.codes
        # Create an instance of One-hot-encoder
        enc = OneHotEncoder()
        # Passing encoded columns
        enc_data = enc.fit_transform(
            df_train[['word_new']]).toarray()
        return enc_data'''

    def __len__(self):
        return len(self.dataframe)
    def __getitem__(self, idx):
        train_image = vector_to_numpy(self.dataframe.iloc[idx]["drawing"])
        train_image = 1 - self.transform(image = train_image)["image"].reshape((1,256,256))   #.to(DEVICE)
        label       = torch.tensor(self.encodings.iloc[idx])
        return train_image,label



In [None]:
#creating a test dataset
class test_images(Dataset):
    def __init__(self,X,transform):
        self.dataframe = X
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        train_image = vector_to_numpy(self.dataframe.iloc[idx]["drawing"])
        train_image = 1 - self.transform(image = train_image)["image"].reshape((1,256,256))   #.to(DEVICE)
        return train_image



In [None]:
#defining a Custom neural network for the final predictions
class Custom_nn(nn.Module):
    def __init__(self,Net_size = 256,input_shape = 512, output_shape =101):
        super().__init__()
        self.nn = nn.Sequential(nn.Linear(input_shape,Net_size),nn.Tanh(),
                                nn.Linear(Net_size,output_shape))

    def forward(self,input):
        return (self.nn(input))



In [None]:
# a = torch.randn(size = (8,1,256,256))
# model_final(a).shape

In [None]:
#train-test-split
from sklearn.model_selection import train_test_split

X_train,X_val, y_train,y_val = train_test_split(df_train,df_train['word_new'],test_size = 0.3,shuffle = True,random_state=42)

In [None]:
X_train.shape,y_train.shape,X_val.shape,y_val.shape

In [None]:
keys = X_train["word"]
values = X_train["word_new"]
dict_ = {}
for i in range(len(keys)):
    dict_[values.iloc[i]] = keys.iloc[i]

dict_


In [None]:
del df_train

In [None]:
#sample code to check if the train data is working fine
train_dataset = train_images(X_train,y_train, train_transform)
train_image,label = train_dataset.__getitem__(0)
plt.imshow(train_image.numpy().reshape(256,256))
label

In [None]:
train_image.shape

In [None]:
#sample code to check if the test data is working fine

test_dataset = test_images(X_val,val_transform)
train_image = test_dataset.__getitem__(244)
plt.imshow(train_image.numpy().reshape(256,256))

In [None]:
#data_loaders
def get_loaders(train_dataset,val_dataset,batch_size = 32):

    train_dataset = train_dataset
    val_dataset   = val_dataset
    train_loader  = torch.utils.data.DataLoader(train_dataset,batch_size = batch_size, num_workers = 2,shuffle = True)
    val_loader    = torch.utils.data.DataLoader(val_dataset,batch_size = batch_size, num_workers = 2)
    return train_loader,val_loader

In [None]:
#validation loss
def check_accuracy(loader, model, device="cuda"):
    model.eval()
    criteria = nn.CrossEntropyLoss()
    count,loss = 0,0
    with torch.no_grad():
        for x, y in loader:
            count+=1
            if count>=10:
                break
            x = x.to(device)
            y = y.type(torch.LongTensor).to(device)
            preds = (model(x))
            loss  += criteria(preds,y)

    print(f"loss: {loss/(count)}")
    model.train()
    return loss/count

In [None]:
loss_ = check_accuracy(val_loader, model, device=DEVICE)
loss_

In [None]:
#training function
def train_fn(loader, model, optimizer, loss_fn, scaler,loss_arr):
    loop = tqdm(loader)

    GLOBAL_COUNT = 0
    for batch_idx, (data, targets) in enumerate(loop):
        data = data.to(DEVICE)
        targets = targets.type(torch.LongTensor).to(DEVICE)
        #print(targets,targets.shape)
        with torch.cuda.amp.autocast():
            predictions = model(data)
            #print(predictions,predictions.shape)
            loss = loss_fn(predictions, targets)#calculate loss
            loss_arr.append(loss.item())
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        if GLOBAL_COUNT%100 == 0:
            if GLOBAL_COUNT%500 == 0 :
                print("model_saved")
            save_path = "model.pth"
            torch.save(model.state_dict(),save_path)
            GLOBAL_COUNT+=1

        # update tqdm loop
        loop.set_postfix(loss=loss.item())
    return loss_arr

In [None]:
#Defining an encoder part of UNET
import torch
import torch.nn as nn

class UNetEncoder(nn.Module):
    def __init__(self, in_channels, bottleneck_channels):
        super(UNetEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=9, padding=0),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=7, padding=0),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        self.conv1 = self.conv_block(64, 128)
        self.conv2 = self.conv_block(128, 256)
        self.conv3 = self.conv_block(256, 512)
        self.conv4 = self.conv_block(512, bottleneck_channels)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(out_channels, out_channels, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=0),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=1)
        )

    def forward(self, x):
        x1 = self.encoder(x)
        x2 = self.conv1(x1)
        x3 = self.conv2(x2)
        x4 = self.conv3(x3)
        x5 = self.conv4(x4)
        return x5

in_channels = 1  # gray scale image
bottleneck_channels = 512  # 512 features maps at the bottle neck
encoder = UNetEncoder(in_channels, bottleneck_channels)
model_custom = Custom_nn(input_shape = 512)
class final_model_unet(nn.Module):
    def __init__(self,encoder,model_custom):
        super().__init__()
        self.model1    = encoder
        self.model2    = model_custom

    def forward(self,input):

        out = self.model1(input)
        out = self.model2(out.reshape(out.shape[0],out.shape[1]*out.shape[2]*out.shape[3]))
        return out

model_final_unet = final_model_unet(encoder,model_custom)
model_final_unet

In [None]:
a = torch.randn(size = (8,1,256,256))
model_final_unet(a).shape

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
LEARNING_RATE = 1e-5
# model_final_unet = final_model_unet(encoder,model_custom)
# model_final_unet.load_state_dict(torch.load("/kaggle/working/model.pth"))
model = model_final_unet.to(DEVICE)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE) #initializing an Adam Optimizer

In [None]:
#datasets
train_dataset = train_images(X_train,y_train, train_transform)
val_dataset   = train_images(X_val,y_val, val_transform)
train_loader, val_loader = get_loaders(train_dataset,val_dataset,batch_size = 32)

In [None]:
scaler = torch.cuda.amp.GradScaler()
loss_epoch = []
#train function
for epoch in range(100):
        loss_epoch = train_fn(train_loader, model, optimizer, loss_fn, scaler,loss_epoch)
        try:
            print("First epoch")
            save_path = "model.pth"
            torch.save(model.state_dict(),save_path)
            loss_ = check_accuracy(val_loader, model, device=DEVICE)
            loss_epoch.append(loss_)
        except:
            continue

In [None]:
#cross checking if the model saved is loading fine
model_dummy = final_model(model_EMSRB1,model_EMSRB2,model_resent,model_custom)
model_dummy.load_state_dict(torch.load("/kaggle/working/model.pth"))
model_dummy

In [None]:
# import torchvision.models as models

# model_EMSRB1  = EMSRB(1,16,5,7)
# model_EMSRB2  = EMSRB(16,3,3,5)
# model_resent = models.resnet18(pretrained=True)
# model_resent = nn.Sequential(*list(model_resent.children())[:-1])
# model_custom = Custom_nn(input_shape = 512)
# for params in model_resent.parameters():
#     params.requires_grad = False

# class final_model(nn.Module):
#     def __init__(self,model_EMSRB1,model_EMSRB2,model_resent,model_custom):
#         super().__init__()
#         self.model1    = model_EMSRB1
#         self.model2    = model_EMSRB2
#         self.resnet    = model_resent
#         for param in self.resnet.parameters():
#             param.requires_grad = False
#         self.model4    = model_custom
#     def forward(self,input):

#         out = self.model1(input)
#         out = self.model2(out)
#         out = self.resnet(out)
#         #print(out.shape)
#         out = self.model4(out.reshape(out.shape[0],out.shape[1]*out.shape[2]*out.shape[3] ))
#         #print(out.shape)
#         return out

# model_final = final_model(model_EMSRB1,model_EMSRB2,model_resent,model_custom)
# model_final

In [None]:
# class EMSRB(nn.Module):
#     def __init__(self,input_channels,num_features,f1,f2):
#         super().__init__()
#         self.ic    = input_channels
#         self.conv3_1 = nn.Conv2d(in_channels = self.ic, out_channels=num_features,kernel_size = f1)
#         self.conv5_1 = nn.Conv2d(in_channels = self.ic, out_channels=num_features,kernel_size = f2,padding=1)
#         self.conv1_1 = nn.Conv2d(in_channels = self.ic, out_channels=num_features,kernel_size = 1)

#         self.relu  = nn.ReLU()

#         self.conv3_2 = nn.Conv2d(in_channels = 2* num_features, out_channels=num_features,kernel_size = f1)
#         self.conv5_2 = nn.Conv2d(in_channels = 2* num_features, out_channels=num_features,kernel_size = f2,padding=1)
#         self.conv1_2 = nn.Conv2d(in_channels = 2* num_features, out_channels=num_features,kernel_size = 1,padding=self.get_padding(f1,f2))

#     def get_padding(self,f1,f2):
#         if f1 == 5 and f2 == 7:
#             return 4
#         else:
#             return 2

#     def forward(self,input):

#         in_path1 = self.relu(self.conv3_1(input))
#         #print(in_path1.shape)
#         in_path2 = self.relu(self.conv5_1(input))
#         #print(in_path2.shape)
#         concat_features = torch.cat(tensors = (in_path1,in_path2),dim = 1)
#         #print(concat_features.shape)
#         in_path1 = self.relu(self.conv3_2(concat_features))
#         #print(in_path1.shape)
#         in_path2 = self.relu(self.conv5_2(concat_features))
#         #print(in_path2.shape)
#         concat_features = torch.cat(tensors = (in_path1,in_path2),dim = 1)
#         #print(concat_features.shape)
#         concat_features = self.conv1_2(concat_features)
#         #print(concat_features.shape)
#         in_ = self.conv1_1(input)
#         #print(in_.shape)
#         concat_features = concat_features+in_
#         #print(concat_features.shape)
#         return concat_features