In [None]:
from google.colab import drive
drive.mount('/drive')

In [None]:
%cd ../drive/My Drive/hw2ml

In [None]:
#insert the kaggle json file in the content directory before running this cell
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/content"

In [None]:
!kaggle datasets download -d ibrahimserouis99/one-piece-image-classifier

In [None]:
from zipfile import ZipFile
# Create a ZipFile Object and load sample.zip in it
with ZipFile('one-piece-image-classifier.zip', 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall()

In [None]:
classnames=[
"Ace",
"Akainu",
"Brook" ,
"Chopper",
"Crocodile",
"Franky",
"Jinbei",
"Kurohige",
"Law",
"Luffy",
"Mihawk",
"Nami",
"Rayleigh",
"Robin",
"Sanji",
"Shanks",
"Usopp",
"Zoro"]

classes={name:i for i, name in enumerate(classnames)}



In [None]:
#read the data folder and create a file with the annotations
import os 

with open("./annotations.txt", "w") as f:
    for i,n in enumerate(classes):
        for file in os.listdir("./Data/Data/"+n):
            f.write("./Data/Data/"+n+"/"+file+" "+str(i)+"\n")
    print(n+" done")

In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image
import numpy as np

class OnePieceDataset(Dataset):
    items=[]
    def __init__(self,w,h):
        self.w=w
        self.h=h
        with open("./annotations.txt", "r") as f:
            for line in f.readlines():
                self.items.append(line.split(" "))

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        item=self.items[idx]

        #one hot encoding of the class of the item
        label=torch.zeros(len(classes))
        label[int(item[-1].strip())]=1
        
        #load the image as pil image
        image=Image.open(" ".join(item[0:-1])).convert('RGB')
        image = image.resize((self.w, self.h)) 

        #convert it to a tensor
        image=torch.tensor(np.array(image),dtype=torch.float32)
        
        return image, label

In [None]:
import torch
import torch.nn as nn

class Mlp(nn.Module):
    def __init__(self,w,h):
        self.w=w
        self.h=h
        super(Mlp, self).__init__()
        self.fc1 = nn.Linear(self.w*self.h*3, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 128)
        self.fc4 = nn.Linear(128, 128)
        self.fc5 = nn.Linear(128, 128)
        self.fc6 = nn.Linear(128, 18)

    def forward(self, x):
        x = self.fc1(x)
        x=nn.ReLU()(x)

        x = self.fc2(x)
        x=nn.ReLU()(x)

        x=self.fc3(x)
        x=nn.ReLU()(x)

        x=self.fc4(x)
        x=nn.ReLU()(x)

        x=self.fc5(x)
        x=nn.ReLU()(x)

        x=self.fc6(x)
        
        #do not need to use softmax or sigmoid because we use cross entropy loss and it does it for us
        return x
    

In [None]:
import torch
from dataset import OnePieceDataset
from torch.utils.data import DataLoader
from torch import nn
from nn import Mlp
import torchvision
import wandb

#device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

#declare parameters
num_epochs=5
batch_size=32
w,h=50,50
w_and_b=True
nn_type="mlp"

if w_and_b:
    wandb.init(project='hw2ml', entity='bbooss97',name=nn_type)

#read the dataset
dataset=OnePieceDataset(w,h)

#split in train and test set
split=[int(0.8*len(dataset)),int(0.2*len(dataset))+1]
train,test = torch.utils.data.random_split(dataset,split)

#dataloader
train_dataloader = DataLoader(train, batch_size=batch_size, shuffle=True , drop_last=True)
test_dataloader = DataLoader(test, batch_size=batch_size, shuffle=True , drop_last=True)

#types where have to change the input in the train and test
typesToChange=["resnetFrom0","resnetPretrainedFineTuneFc","resnetPretrainedFineTuneAll","mobilenetPretrainedFineTuneAll"]

#define the model
if nn_type=="mlp":
    model=Mlp(w,h)
elif nn_type=="resnetPretrainedFineTuneFc":
    model=torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=True)
    model.fc=torch.nn.Linear(512,18)
    toFreeze=[j for i,j in model.named_parameters()][:-2]
    for i in toFreeze:
        i.requires_grad=False
elif nn_type=="resnetPretrainedFineTuneAll":
    model=torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', pretrained=True)
    model.fc=torch.nn.Linear(512,18)
elif nn_type=="resnetFrom0":
    model=torch.hub.load('pytorch/vision:v0.6.0', 'resnet18',pretrained=False)
    model.fc=torch.nn.Linear(512,18)
elif nn_type=="mobilenetPretrainedFineTuneAll":
    model=torchvision.models.mobilenet_v3_small()
    model.classifier[3]=torch.nn.Linear(1024,18)

if w_and_b:
    wandb.watch(model)

model.to(device)

#define loss and the optimizer
loss=nn.CrossEntropyLoss()
optimizer=torch.optim.Adam(model.parameters())

for epoch in range(num_epochs):

    #train
    model.train()
    if w_and_b:
        wandb.log({"epoch":epoch})

    #train loop
    for i, (images, labels) in enumerate(train_dataloader):

        #move the data to the device
        images=images.to(device)
        labels=labels.to(device)

        #reshape the images
        images=images.reshape(batch_size,-1)

        #change the type of the input
        if nn_type in typesToChange:
            images =images.reshape(batch_size,50,50,3)
            images=torch.einsum("abcd->adbc",images)

        #forward pass
        outputs=model(images)
        
        #calculate the loss
        l=loss(outputs,labels)
        
        #backpropagation
        optimizer.zero_grad()
        l.backward()
        optimizer.step()
        
        #print the loss
        print("epoch: {}/{}, step: {}/{}, loss: {}".format(epoch+1,num_epochs,i+1,len(train_dataloader),l.item()))
        if w_and_b:
            if i%20==0:
                wandb.log({"loss_train":l.item()})
        
    #test
    model.eval()
    # wandb.watch(model)

    # Initialize variables to store metrics
    l = 0.0
    accuracy = 0.0

    # Loop over the data in the test set
    with torch.no_grad():
        for i,(images, labels) in enumerate(test_dataloader):

            # Move the data to the device
            images = images.to(device)
            labels = labels.to(device)

            # Reshape the images
            images=images.reshape(batch_size,-1)

            #change the input for those models
            if nn_Type in typesToChange:
                images =images.reshape(batch_size,50,50,3)
                images=torch.einsum("abcd->adbc",images)

            # Forward pass: compute predictions and loss
            outputs = model(images)
            ls = loss(outputs, labels)

            # Compute running metrics
            l += ls.item()
            accuracy += (outputs.argmax(dim=1) == labels.argmax(dim=1)).float().mean().item()

    # Compute average metrics
    avg_loss = l / len(test_dataloader)
    avg_accuracy = accuracy / len(test_dataloader)

    # Print the metrics
    print(f'Test loss: {avg_loss:.4f}')
    print(f'Test accuracy: {avg_accuracy:.4f}')
    if w_and_b:
        wandb.log({"avg_loss_test":avg_loss,"avg_accuracy_test":avg_accuracy})

#save the model
torch.save(model,"./hw2/models/"+nn_type+".pt")

if w_and_b:
    wandb.finish()

print("finished")