<a href="https://www.kaggle.com/code/khunanonr/vision-trasformer?scriptVersionId=131556247" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
! pip install --q datasets transformers

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import re
import os

In [None]:
import os
import pandas as pd

label_image = ['canal','electric','flooding','light','road','sanitary','sewer','sidewalk','stray','traffic']

img_dir = "/kaggle/input/2110446-data-science-and-data-engineering-2023/TraffyFondue/train/"

img_label_map = {}

paths = []
image_names = []
label_nums = []
for label_num, label in enumerate(label_image):
    _, _, files = next(os.walk(os.path.join(img_dir,label)))
    img_label_map[label] = label_num
    for image_name in files:
        paths.append(os.path.join(img_dir,label,image_name))
        label_nums.append(label_num) # [image_path, label_num]
        image_names.append(image_name)
        
df_dataset = pd.DataFrame({"path":paths, "label": label_nums, 'filename': image_names})
print(df_dataset.shape)
df_dataset.tail()

In [None]:
# load file name
!pip --q install gdown
!gdown --q 14X-tmO3Ni5a7sKxvVtRaFAfVGJR1rKp8 # file name

In [None]:
clean_df = pd.read_csv("/kaggle/working/filenames (4).csv")
new_df = clean_df.merge(df_dataset, on='filename', how='inner')
new_df = new_df.drop(columns=['filename','label'])
new_df['label'] = new_df['class'].map(img_label_map)
new_df = new_df.drop(columns=['class'])
df_dataset = new_df
new_df.shape

In [None]:
new_df['label'].value_counts()

# Data Preparation

In [None]:
from transformers import ViTFeatureExtractor
class TraffyFondueDataset(Dataset):
    def __init__(self, 
                 df, 
                label=True):
        super().__init__()
        self.df_data = df
        self.transform = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k')
        self.label = label

    def __len__(self):
        return len(self.df_data)

    def __getitem__(self, idx): 
        img = Image.open(self.df_data.loc[idx, 'path']).convert('RGB')
        x = self.transform(img, return_tensors='pt')['pixel_values']
#         x = x.view((3,224,224))
        if self.label :
            y = self.df_data.loc[idx, 'label']
            return x,y
        else : return x, self.df_data.loc[idx, 'path']

In [None]:
# Splitting Data 
from sklearn.model_selection import train_test_split

X = df_dataset[['path']]
y = df_dataset[['label']]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42, stratify=y)
X_train, X_val, y_train, y_val = train_test_split( X_train, y_train, test_size=1/9, random_state=42, stratify=y_train)

train = X_train.join(y_train).reset_index()
test = X_test.join(y_test).reset_index()
val = X_val.join(y_val).reset_index()

train.shape, test.shape, val.shape

In [None]:
train['label'].value_counts()

In [None]:
count_cat = train['label'].value_counts() 
target_amount = 1100
paths = []
labels = []
for i in range(10) :
    data = train[train['label'] == i].sample(n=target_amount, replace=count_cat[i] < target_amount, random_state=25)
    paths += list(data['path'])
    labels += list(data['label'])
new_train = pd.DataFrame({'path':paths, "label":labels})
new_train['label'].value_counts()

In [None]:
BATCH_SIZE = 64

trainset = TraffyFondueDataset(new_train)
valset = TraffyFondueDataset(val)
testset = TraffyFondueDataset(test)

train_loader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = torch.utils.data.DataLoader(valset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# functions to show an image
def imshow(img):
    img = img*torch.tensor([0.5, 0.5, 0.5]).mean() + torch.tensor([0.5, 0.5, 0.5]).mean()     # unnormalize
    npimg = img.numpy()
    plt.figure(figsize=(16,16))
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

# get some random training images
dataiter = iter(train_loader)
images, labels = next(dataiter)
# print(images.shape)
images = images.view((BATCH_SIZE ,3,224,224))

# show images
nrow = 9
imshow(torchvision.utils.make_grid(images, nrow = nrow))
print(labels)

# Model

In [None]:
from transformers import ViTModel
from transformers.modeling_outputs import SequenceClassifierOutput
import torch.nn as nn
import torch.nn.functional as F

class ViTForImageClassification(nn.Module):
    def __init__(self, num_labels=3):
        super(ViTForImageClassification, self).__init__()
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.vit.config.hidden_size, num_labels)
        self.num_labels = num_labels

    def forward(self, pixel_values, labels):
        outputs = self.vit(pixel_values=pixel_values)
        output = self.dropout(outputs.last_hidden_state[:,0])
        logits = self.classifier(output)

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        if loss is not None:
            return logits, loss.item()
        else:
            return logits, None

In [None]:
import torch.nn as nn
import numpy as np
from tqdm.notebook import tqdm
from sklearn.metrics import classification_report
import torch

num_classes = len(img_label_map)

LEARNING_RATE = 2e-5

model = ViTForImageClassification(num_classes)    

optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

loss_func = nn.CrossEntropyLoss()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 
if torch.cuda.is_available():
    model.cuda() 

# Train

In [None]:
import torch.utils.data as data
from torch.autograd import Variable


EPOCHS = 5


history_train = {'loss':np.zeros(EPOCHS), 'acc':np.zeros(EPOCHS), 'f1-score':np.zeros(EPOCHS)}
history_val = {'loss':np.zeros(EPOCHS), 'acc':np.zeros(EPOCHS), 'f1-score':np.zeros(EPOCHS)}
min_val_loss = 1e10
PATH = './best_model.pth'

# Train the model
for epoch in range(EPOCHS): 
    
    print(f'epoch {epoch + 1} \nTraining ...')
    model.train()
    y_predict = list()
    y_labels = list()
    training_loss = 0.0
    n = 0
    for step, (x, y) in enumerate(tqdm(train_loader)):
   
        # Apply feature extractor, stack back into 1 tensor and then convert to tensor
        x = x.view((-1,3,224,224))
        x, y  = x.to(device), y.to(device)
        b_x = Variable(x)   # batch x (image)
        b_y = Variable(y)   # batch y (target)
        # Feed through model
        output, loss = model(b_x, None)
        # Calculate loss
        if loss is None: 
            loss = loss_func(output, b_y)   
            optimizer.zero_grad()           
            loss.backward()                 
            optimizer.step()
        
        training_loss += loss.item()
        n+=1
        y_labels += list(y.cpu().numpy())
        y_predict += list(output.argmax(dim=1).cpu().numpy())

    # print statistics
    report = classification_report(y_labels, y_predict, digits = 4, output_dict = True)
    acc = report["accuracy"]
    f1 = report["weighted avg"]["f1-score"]
    support = report["weighted avg"]["support"]
    training_loss /= n
    print(f"training loss: {training_loss:.4}, acc: {acc*100:.4}%, f1-score: {f1*100:.4}%, support: {support}" )
    history_train['loss'][epoch] = training_loss
    history_train['acc'][epoch] = acc
    history_train['f1-score'][epoch] = f1
    
    print('validating ...')
    model.eval()
    optimizer.zero_grad()
    
    y_predict = list()
    y_labels = list()
    validation_loss = 0.0
    n = 0
    with torch.no_grad():
        for data in tqdm(val_loader):
            inputs, labels = data
            inputs = inputs.view((-1,3,224,224))
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs, loss = model(inputs, labels)
            
            validation_loss += loss

            y_labels += list(labels.cpu().numpy())
            y_predict += list(outputs.argmax(dim=1).cpu().numpy())
            n+=1
            
    # print statistics
    report = classification_report(y_labels, y_predict, digits = 4, output_dict = True)
    acc = report["accuracy"]
    f1 = report["weighted avg"]["f1-score"]
    support = report["weighted avg"]["support"]
    validation_loss /= n
    print(f"validation loss: {validation_loss:.4}, acc: {acc*100:.4}%, f1-score: {f1*100:.4}%, support: {support}" )
    history_val['loss'][epoch] = validation_loss
    history_val['acc'][epoch] = acc
    history_val['f1-score'][epoch] = f1
    
    #save min validation loss
    if validation_loss < min_val_loss:
        torch.save(model.state_dict(), PATH)
        min_val_loss = validation_loss   
        print("Save best model")
    
print('Finished Training')

# Evaluate model

In [None]:
# !gdown 1eJ-78n7NYvMqc1u9-E2Evo0JraZsZ0VF

In [None]:
PATH = './best_model.pth'
best_model = ViTForImageClassification(num_classes)  

best_model = best_model.to(device)
best_model.load_state_dict(torch.load(PATH))

In [None]:
import matplotlib.pyplot as plt
import numpy as np


print('testing ...')
y_predict = list()
y_labels = list()

with torch.no_grad():
    for data in tqdm(test_loader) :
        best_model.eval()
        inputs, target = data
        inputs = inputs.view((-1,3,224,224))
        # Send to appropriate computing device
        inputs = inputs.to(device)
        target = target.to(device)
        
        

        # Generate prediction
        prediction, loss = best_model(inputs, target)

        # Predicted class value using argmax
        y_predict += list(prediction.argmax(dim=1).cpu().numpy())
        y_labels += list(target.cpu().numpy())
report = classification_report(y_labels, y_predict, digits = 4)
print(report)

# Export output

In [None]:
id_range = []
test_dir = "/kaggle/input/2110446-data-science-and-data-engineering-2023/TraffyFondue/test/"
paths = []
label_nums = []
label_num = 0
filename = [os.path.join(test_dir, x) for x in sorted(os.listdir(test_dir))]
test_submit_df = pd.DataFrame({"path":filename})
test_submit_dataset = TraffyFondueDataset(test_submit_df, label=False)

test_submit_loader = DataLoader(test_submit_dataset, batch_size=BATCH_SIZE, shuffle=False)

IDs = []
y_out = []

with torch.no_grad():
    for data in tqdm(test_submit_loader):
        best_model.eval()
        inputs, path =  data
        inputs = inputs.view((-1,3,224,224))
        # Send to appropriate computing device
        inputs = inputs.to(device)

        # Generate prediction
        prediction, loss = best_model(inputs, None)
        IDs += list([re.sub("(.*test|\.jpg)", "", x) for x in path])
        y_out += list(prediction.argmax(dim=1).cpu().numpy())

result = pd.DataFrame({"class":y_out, "ID": IDs})
result = result.set_index('ID')
result.to_csv("submiting_ds.csv")