In [None]:
import numpy as np
import pandas as pd
import csv
import cv2
import numpy as np
import random
import os
from tqdm import tqdm

import torch
import torch.nn as nn
import torchvision.models
import torchvision.transforms as trans
from torch.utils.data import Dataset, DataLoader

In [None]:
'''
path define block
'''
TRAIN_PATH = "train"
TEST_PATH = "test"
#device = "cuda"
device = torch.device("cuda", 1)
data_multiple = 5
# try device = "cuda" 
# and change your settings/accelerator to GPU if you want it to run faster

In [None]:
'''
self-define dataset 
'''
class multi_Dataset(Dataset):
    def __init__(self, data, root, return_filename=False, len = 1):
        self.data = data
        self.return_filename = return_filename
        self.root = root
        self.len = len
    
    def __getitem__(self, index):
        filename, label, ID = self.data[index]

        label_code = []
        for character in label:
            if(character.isalpha()):label_code.append(ord(character)- ord('a') + 10)
            else: label_code.append(int(character))

        img = cv2.imread(f"{self.root}/{filename}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (64,64))
        #img = np.mean(img, axis=2) # take the means of color
        process = trans.Compose([
                            trans.ToTensor(), 
	                        trans.Normalize(mean = [0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                            ])
        img = process(img)
        if self.return_filename:
            img = trans.functional.rotate(img, (random.randint(0,data_multiple-1) - data_multiple/2)*10, fill = 255)
            return img, filename
        else:
            img = trans.functional.rotate(img, (ID-data_multiple/2)*10, fill = 255)
            #plt.imshow(  img.permute(1, 2, 0)  )
            return img, label_code

    def __len__(self):
        return len(self.data)

In [None]:
'''
dataset division block
'''
task1_train = []
task2_train = []
task3_train = []
task1_val = []
task2_val = []
task3_val = []

with open(f'{TRAIN_PATH}/annotations.csv', newline='') as csvfile:
  for row in csv.reader(csvfile, delimiter=','):
    for i in range(data_multiple):
      row_i = row + [i]
      if(row[0].startswith("task1")):
          if random.random() < 0.7: task1_train.append(row_i)
          else: task1_val.append(row_i)
      elif(row[0].startswith("task2")):
          if random.random() < 0.7: task2_train.append(row_i)
          else: task2_val.append(row_i)
      elif(row[0].startswith("task3")):
          if random.random() < 0.7: task3_train.append(row_i)
          else: task3_val.append(row_i)

task1_ds = multi_Dataset(task1_train, root=TRAIN_PATH, len = 1)
task1_dl = DataLoader(task1_ds, batch_size=500, num_workers=1, drop_last=True, shuffle=True)
task1_val_ds = multi_Dataset(task1_val, root=TRAIN_PATH, len = 1)
task1_val_dl = DataLoader(task1_val_ds, batch_size=500, num_workers=1, drop_last=False, shuffle=False)

task2_ds = multi_Dataset(task2_train, root=TRAIN_PATH, len = 2)
task2_dl = DataLoader(task2_ds, batch_size=500, num_workers=1, drop_last=True, shuffle=True)
task2_val_ds = multi_Dataset(task2_val, root=TRAIN_PATH, len = 2)
task2_val_dl = DataLoader(task2_val_ds, batch_size=500, num_workers=1, drop_last=False, shuffle=False)

task3_ds = multi_Dataset(task3_train, root=TRAIN_PATH, len = 4)
task3_dl = DataLoader(task3_ds, batch_size=500, num_workers=1, drop_last=True, shuffle=True)
task3_val_ds = multi_Dataset(task3_val, root=TRAIN_PATH, len = 4)
task3_val_dl = DataLoader(task3_val_ds, batch_size=500, num_workers=1, drop_last=False, shuffle=False)

In [None]:
'''
define model_train function
'''
def model_train(model, model_name, dl, val, label_len, epoch):
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.CrossEntropyLoss()
    max_acc = 0
    for e in range(epoch):
        print(f"Epoch [{e}]")
        model.train()

        for image, label in dl:
            image = image.to(device)
            for i in range(label_len):label[i] = label[i].to(device)
            pred = model(image)
            #print(pred[0].shape)
            #print(pred)
            #print(label[0].shape)

            for i in range(label_len):
                if(i == 0): loss = loss_fn(pred[i], label[i])
                else: loss += loss_fn(pred[i], label[i])
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        sample_count = 0
        correct_count = 0
        temp = 0
        for image, label in val:
            image = image.to(device)
            for i in range(label_len):label[i] = label[i].to(device)
            pred = model(image)
            
            
            for i in range(label_len):
                if(i == 0): loss = loss_fn(pred[i], label[i])
                else: loss += loss_fn(pred[i], label[i])
                pred[i] = torch.argmax(pred[i], dim=1)
            #print(pred[0].shape) # torch.Size([5000])
            #print(label[0].shape) # torch.Size([5000])
            
            sample_count += len(image)
            if (label_len == 1): correct_count += (label[0] == pred[0]).sum()
            else:
                temp += (label[0] == pred[0]).sum()
                count = (label[0] == pred[0])
                for i in range(label_len)[1:]:
                    count= count * (label[i] == pred[i])
                correct_count += count.sum()
            
        print("accuracy (validation):", correct_count / sample_count)
        if correct_count / sample_count> max_acc:
            max_acc = correct_count / sample_count
            torch.save(model.state_dict(), f'109550110/' + model_name)
            print("model_save: " + str(e))
        print("accuracy (validation):", temp / sample_count)

In [None]:
'''
task1 model
'''
class task1_Resnet50(torchvision.models.resnet.ResNet):
    def __init__(self):
        # Pass default resnet50 arguments to super init
        # https://github.com/pytorch/vision/blob/e130c6cca88160b6bf7fea9b8bc251601a1a75c5/torchvision/models/resnet.py#L260
        super(task1_Resnet50, self).__init__(torchvision.models.resnet.Bottleneck, [3, 4, 6, 3])
        self.load_state_dict(torchvision.models.resnet50(pretrained=True).state_dict())
        self.mylayer1 = nn.Linear(2048, 512)
        self.mylayer2 = nn.Linear(512, 10)

    def _forward_impl(self, x):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.mylayer1(x)
        x = self.mylayer2(x)

        return [x]

    def forward(self, x):
        return self._forward_impl(x)

In [None]:
'''
task2 model
'''
class task2_Resnet50(torchvision.models.resnet.ResNet):
    def __init__(self, pretrained=True, len = 2):
        # Pass default resnet50 arguments to super init
        # https://github.com/pytorch/vision/blob/e130c6cca88160b6bf7fea9b8bc251601a1a75c5/torchvision/models/resnet.py#L260
        super(task2_Resnet50, self).__init__(torchvision.models.resnet.Bottleneck, [3, 4, 6, 3])
        self.load_state_dict(torchvision.models.resnet50(pretrained=True).state_dict())
        self.mylayer1 = nn.Linear(2048, 512)
        self.len = len
        self.mylayer2A = nn.Linear(512, 36)
        self.mylayer2B = nn.Linear(512, 36)
        
    def _forward_impl(self, x):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.mylayer1(x)
        x1 = self.mylayer2A(x)
        x2 = self.mylayer2B(x)

        return [x1, x2]

    def forward(self, x):
        return self._forward_impl(x)

In [None]:
'''
task3 model
'''
class task3_Resnet50(torchvision.models.resnet.ResNet):
    def __init__(self, pretrained=True, len = 2):
        # Pass default resnet50 arguments to super init
        # https://github.com/pytorch/vision/blob/e130c6cca88160b6bf7fea9b8bc251601a1a75c5/torchvision/models/resnet.py#L260
        super(task3_Resnet50, self).__init__(torchvision.models.resnet.Bottleneck, [3, 4, 6, 3])
        self.load_state_dict(torchvision.models.resnet50(pretrained=True).state_dict())
        self.mylayer1 = nn.Linear(2048, 512)
        self.len = len
        self.mylayer2A = nn.Linear(512, 36)
        self.mylayer2B = nn.Linear(512, 36)
        self.mylayer2C = nn.Linear(512, 36)
        self.mylayer2D = nn.Linear(512, 36)
        '''
        self.mylayerA = nn.Linear(512, 36)
        self.mylayerB = nn.Linear(512, 36)
        '''
    def _forward_impl(self, x):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.mylayer1(x)
        x1 = self.mylayer2A(x)
        x2 = self.mylayer2B(x)
        x3 = self.mylayer2C(x)
        x4 = self.mylayer2D(x)

        return [x1, x2, x3, x4]

    def forward(self, x):
        return self._forward_impl(x)

In [None]:
'''
model training block
(model will be saved in floder "/109550110", be aware that the models will be save by model_train function)
'''

task1_model = task1_Resnet50().to(device) 
model_train(task1_model, 'task1_model_1.pt', task1_dl, task1_val_dl, 1, 30)

task2_model = task2_Resnet50(len = 2).to(device) 
model_train(task2_model, 'task2_model_1.pt', task2_dl, task2_val_dl, 2, 50)

task3_model = task3_Resnet50(len = 4).to(device) 
model_train(task3_model, 'task3_model_1.pt', task3_dl, task3_val_dl, 4, 50)