In [None]:
!pip install torchsummary

In [1]:
import torch
import torchvision
from torchvision import transforms
import torch.utils.data as data
import torch.optim as optim
import torch.nn as nn
from torchsummary import summary
import time
import numpy as np
import os
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms,models
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image, ImageFont, ImageDraw
from pathlib import Path
import pandas as pd
import cv2
import requests
from sklearn.metrics import f1_score
from torchmetrics import F1Score, Accuracy, ConfusionMatrix
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
import albumentations as A

In [None]:
seed = 7
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
#random.seed(seed)


In [3]:
device = torch.device('cuda')
device=torch.device(device)
import gc
gc.collect()
torch.cuda.empty_cache()

In [4]:
BATCH_SIZE = 64
learning_rate = 0.0001
numClasses = 205

In [5]:
class TrafficData(Dataset):
    def __init__(self, df, image_dir, transforms=None):
        self.image_ids = df['Path'].unique()
        self.df = df
        self.image_dir = image_dir
        self.transforms = transforms

    def __getitem__(self, index, size = [112, 112]):
        image_path = self.image_ids[index]
        records = self.df[self.df['Path'] == image_path]

        #print(f'{self.image_dir}/{image_path}')
        image = cv2.imread(f'{self.image_dir}/{image_path}', cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, size)
        image = image.astype(float) / 255.0

        target = records['ClassId'].values

        if self.transforms:
            image = self.transforms(**image)

        return image, target

    def __len__(self) -> int:
        return self.image_ids.shape[0]

    @staticmethod
    def create_dataset(df, dir, transform=None):
       dataset = TrafficData(df, dir)
       return dataset

    @staticmethod 
    def loader(dataset, batch_size, shuffle=True, num_workers=0,):
       data_loader = DataLoader(
          dataset,
          batch_size=batch_size,
          shuffle=shuffle,
          num_workers=num_workers,
      )
       return data_loader

data_transforms = transforms.Compose([
    transforms.Resize([112, 112]),
    transforms.ToTensor()
    ])

In [7]:
path = Path("../input/traffic-signs-gtsrb-plus-162-custom-classes/Data_images")
df_train = pd.read_csv("../input/data-info/Train_balanced_data.csv")
df_test = pd.read_csv("../input/data-info/Test_data_cleaned.csv")
#df_inference = pd.read_csv('Data_images/data_inference.csv')

train_data = TrafficData.create_dataset(df_train, path, data_transforms)
train_set, val_set = torch.utils.data.random_split(train_data, [29000, 6465])

train_data_loader = TrafficData.loader(train_set, BATCH_SIZE)
val_data_loader = TrafficData.loader(val_set, BATCH_SIZE)

test_data = TrafficData.create_dataset(df_test, path, data_transforms)
test_set, _ = torch.utils.data.random_split(test_data, [10000, 31692])
test_data_loader = TrafficData.loader(test_set, BATCH_SIZE)
big_test_data_loader = TrafficData.loader(test_data, BATCH_SIZE)

In [9]:
for X, y in test_data_loader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [10]:
eff_net_b0 = models.efficientnet_b0(pretrained=True)
#vgg16 = models.vgg16(pretrained=True)
#ResNet = models.resnet34(pretrained=True)

class Eff_net(nn.Module):
    def __init__(self, pretrained_model):
        super(Eff_net,self).__init__()
        self.Eff_net = pretrained_model
        #num_inputs = pretrained_model.classifier[1].in_features
        self.fl1 = nn.Sequential(nn.Linear(1000, 256), nn.ReLU())
        self.fl2 = nn.Sequential(nn.Linear(256, numClasses), nn.Softmax())
        
    def forward(self, X):
        X = self.Eff_net(X)
        X = self.fl1(X)
        #X = F.dropout(X, p=0.25)
        X = self.fl2(X)
        return X
    
    def pred(self, X):
        X = cv2.resize(X, (112, 112))
        X = np.reshape(X, (1, 112, 112, 3))
        X = torch.tensor(X)
        X = X.float()
        X = X.permute(0, 3, 1, 2)
        X = X.cuda()
        prediction = self.forward(X)
        prediction = torch.argmax(prediction, axis=1)
        return prediction.cpu().numpy()


In [11]:
model1 = Eff_net(eff_net_b0)
#model2 = Eff_net(vgg16)
#model3 = Eff_net(ResNet)
model = model1

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

if torch.cuda.is_available():
    model = model.cuda()
    criterion = criterion.cuda()

In [12]:
def calculate_accuracy(y_pred, y):
    top_pred = y_pred.argmax(1, keepdim = True)
    top_pred = torch.reshape(top_pred, (-1,))
    true_pred = (top_pred == y).float().sum()
#     correct = top_pred.eq(y.view_as(top_pred)).sum()
#     acc = correct.float() / y.shape[0]
    return true_pred / len(y)

def calculate_f1_score(output,labels):
    predictions = output.argmax(1, keepdim = True)
    predictions = torch.reshape(predictions, (-1,))
    f1 = F1Score(num_classes=205).to(device)
    return f1(predictions, labels)

def show_meta(i):
    img = cv2.imread(str(path / ('Meta/' + str(int(i)) + '.png')))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.imshow(img)
    
def predict_and_check(model, img):
    label_pred = model.pred(img)
    meta_img_bgr = cv2.imread(str(path / ('Meta/' + str(int(label_pred)) + '.png')))
    meta_img_bgr = meta_img_bgr / 255.0
    meta_img = np.zeros(meta_img_bgr.shape)
    meta_img[:, :, 0] = meta_img_bgr[:, :, 2]
    meta_img[:, :, 1] = meta_img_bgr[:, :, 1]
    meta_img[:, :, 2] = meta_img_bgr[:, :, 0]
    #meta_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    fig, ax = plt.subplots(ncols=2, figsize=(10, 10))
    ax[0].imshow(img)
    ax[0].set_title('your image')
    ax[1].imshow(meta_img)
    ax[1].set_title('predicted class: ' + str(int(label_pred)))
    

In [13]:
def train(model, loader, opt, criterion):
    print('training to', len(loader), end=':')
    epoch_loss = 0
    epoch_acc = 0

    model.train()
    
    i = 0
    for (images, labels) in loader:
        print(i, end=';')
        if i % 101 == 100:
            print('')
        i += 1
        images = images.permute(0,3,1,2)
        images = images.float()
        labels = torch.reshape(labels, (-1,))
        images = images.cuda()
        labels = labels.cuda()
        
        opt.zero_grad()
        
        output = model(images)
        loss = criterion(output, labels)
        

        loss.backward()
        
        acc = calculate_accuracy(output, labels)
        
        opt.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(loader), epoch_acc / len(loader)


In [14]:
def evaluate(model, loader, criterion):
    print('')
    print('validation to', len(loader), end=':')
    epoch_loss = 0
    epoch_acc = 0
    epoch_f1 = 0
    overall_test_output = torch.Tensor([]).to(device)
    overall_test_labels = torch.Tensor([]).to(device)
    
    
    model.eval()
    
    with torch.no_grad():
        i = 0
        for (images, labels) in loader:
            print(i, end=';')
            if i % 101 == 100:
                print('')
            i += 1
            images = images.permute(0,3,1,2)
            images = images.float()
            labels = torch.reshape(labels, (-1,))
            images = images.cuda()
            labels = labels.cuda()
            
            output = model(images)
            
            loss = criterion(output, labels)
            acc = calculate_accuracy(output, labels)
            f1_score = calculate_f1_score(output, labels)
            
            #overall_test_output, overall_test_labels = join_batches_output(output,labels,overall_test_output,overall_test_labels)
            
            epoch_loss += loss.item()
            epoch_acc += acc.item()
            epoch_f1 += f1_score.item()
    
    return epoch_loss / len(loader), epoch_acc / len(loader), epoch_f1 / len(loader)#, overall_test_output, overall_test_labels

In [15]:
'''
train_loss_list = []
train_acc_list = []
train_f1_list = []
val_loss_list = []
val_acc_list = []
val_f1_list = []
test_loss_list = []
test_acc_list = []
test_f1_list = []
'''

In [18]:
checkpoint = torch.load('../input/model-dict/model.pt')
model.load_state_dict(checkpoint['model_state_dict'])

In [19]:
EPOCHS = 2
'''
train_loss_list = [0]*EPOCHS
train_acc_list = [0]*EPOCHS
train_f1_list = [0]*EPOCHS
val_loss_list = [0]*EPOCHS
val_acc_list = [0]*EPOCHS
val_f1_list = [0]*EPOCHS
test_loss_list = [0]*EPOCHS
test_acc_list = [0]*EPOCHS
test_f1_list = [0]*EPOCHS
'''

for epoch in range(EPOCHS):
    print("Epoch-%d: " % (epoch))

    train_start_time = time.monotonic()
    train_loss, train_acc = train(model, train_data_loader, optimizer, criterion)
    train_end_time = time.monotonic()
    
    torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': train_loss,
            }, './model.pt')

    val_start_time = time.monotonic()
    val_loss, val_acc, val_f1 = evaluate(model, val_data_loader, criterion)
    val_end_time = time.monotonic()
    
    test_start_time = time.monotonic()
    test_loss, test_acc, test_f1 = evaluate(model, test_data_loader, criterion)
    test_end_time = time.monotonic()
    
    train_loss_list.append(train_loss)
    train_acc_list.append(train_acc)
    val_loss_list.append(val_loss)
    val_acc_list.append(val_acc)
    val_f1_list.append(val_f1)
    test_loss_list.append(test_loss)
    test_acc_list.append(test_acc)
    test_f1_list.append(test_f1)
    
    print("Training: Loss = %.4f, Accuracy = %.4f, Time = %.2f seconds" % (train_loss, train_acc, train_end_time - train_start_time))
    print("Validation: Loss = %.4f, Accuracy = %.4f, f1 = %.4f, Time = %.2f seconds" % (val_loss, val_acc, val_f1, val_end_time - val_start_time))
    print("Test: Loss = %.4f, Accuracy = %.4f, f1 = %.4f, Time = %.2f seconds" % (test_loss, test_acc, test_f1, val_end_time - val_start_time))
    print("")

In [None]:
img, _ = train_data.__getitem__(33000)
predict_and_check(model, img)