In [None]:
import os
import pickle
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader
from torchvision.utils import save_image
import torch.nn as nn
import cv2
import random
import torchvision.transforms as transforms
from torch.utils import data
from torch.nn import functional as F
from torchvision import models
import matplotlib.pyplot as plt
import pandas as pd
import torchvision
import numpy as np
import torch.optim as optim
import torchvision.datasets as td
from torchvision import transforms
from PIL import Image
from skimage.feature import hog as hog
from sklearn.model_selection import cross_val_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import plot_confusion_matrix,f1_score, accuracy_score, average_precision_score
from sklearn.preprocessing import OneHotEncoder, MultiLabelBinarizer

In [None]:
from google.colab import drive
import pathlib

drive.mount('/content/drive', force_remount=True)

drive = pathlib.Path('./drive/MyDrive') / 'ML_Project' / 'ML_FP_2022'

Mounted at /content/drive


# Load data

In [None]:
with open(drive / "data2022" / "multi-task" / "label_v1" / "eval_label.pickle", 'rb') as pickle_file:
  eval_ = pickle.load(pickle_file)
pickle_file.close()

with open(drive / "data2022" / "multi-task" / "label_v1" / "test_label.pickle", 'rb') as pickle_file:
  test_ = pickle.load(pickle_file)
pickle_file.close()

with open(drive / "data2022" / "multi-task" / "label_v1" / "train_label.pickle", 'rb') as pickle_file:
  train_ = pickle.load(pickle_file)
pickle_file.close()

# Define preprocessor and label encoder

In [None]:
#data augmentation, define image preprocessor
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
   
])

labels_c = train_['creator']+test_['creator']+eval_['creator']
encoder_c = MultiLabelBinarizer()
encoder_c.fit(labels_c)

labels_m = train_['material']+test_['material']+eval_['material']
encoder_m = MultiLabelBinarizer()
encoder_m.fit(labels_m)

labels_t = train_['type']+test_['type']+eval_['type']
encoder_t = MultiLabelBinarizer()
encoder_t.fit(labels_t)

MultiLabelBinarizer()

In [None]:
encoder_m.classes_

array(['Japans papier', 'dekverf', 'doek', 'ebbenhout', 'eikenhout',
       'fluweel', 'goud', 'hout', 'inkt', 'koper', 'krijt', 'leer',
       'olieverf', 'paneel', 'papier', 'porselein', 'potlood', 'staal',
       'steengoed', 'verf', 'waterverf', 'zilver'], dtype=object)

# Dataset

In [None]:
class train_dataset():
  def __init__(self, dict_ = train_, encoder = [encoder_c, encoder_m, encoder_t]):
    #load data
    self.transform = preprocess
    self.image = dict_['identifier']
    self.creator = encoder[0].transform(dict_['creator'])
    self.material = encoder[1].transform(dict_['material'])
    self.art_type = encoder[2].transform(dict_['type'])
    
  def __len__(self):
    return len(self.image)

  def __getitem__(self, index, img_path=drive / "data2022" / "multi-task" / 'data' / "train set new"):
    # img augmentation
    img = cv2.imread(os.path.join(img_path / self.image[index]))
    img = self.transform(img)
    
    #label
    creator = self.creator[index]
    material = self.material[index]
    art_type = self.art_type[index]

    sample = {'image' : img, 'creator' : creator, 'material' : material, 'type' : art_type}
    return sample

In [None]:
class test_dataset():
  def __init__(self, dict_ = test_, encoder = [encoder_c, encoder_m, encoder_t]):
    #load data
    self.transform = preprocess
    self.image = dict_['identifier']
    self.creator = encoder[0].transform(dict_['creator'])
    self.material = encoder[1].transform(dict_['material'])
    self.art_type = encoder[2].transform(dict_['type'])
    
  def __len__(self):
    return len(self.image)

  def __getitem__(self, index, img_path=drive / "data2022" / "multi-task" / 'data' / "test set"):
    # img augmentation
    img = cv2.imread(os.path.join(img_path / self.image[index]))
    img = self.transform(img)
    
    #label
    creator = self.creator[index]
    material = self.material[index]
    art_type = self.art_type[index]

    sample = {'image' : img, 'creator' : creator, 'material' : material, 'type' : art_type}
    return sample

In [None]:
class val_dataset():
  def __init__(self, dict_ = eval_, encoder = [encoder_c, encoder_m, encoder_t]):
    #load data
    self.transform = preprocess
    self.image = dict_['identifier']
    self.creator = encoder[0].transform(dict_['creator'])
    self.material = encoder[1].transform(dict_['material'])
    self.art_type = encoder[2].transform(dict_['type'])
    
  def __len__(self):
    return len(self.image)

  def __getitem__(self, index, img_path=drive / "data2022" / "multi-task" / 'data' / "eval set"):
    # img augmentation
    img = cv2.imread(os.path.join(img_path / self.image[index]))
    img = self.transform(img)
    
    #label
    creator = self.creator[index]
    material = self.material[index]
    art_type = self.art_type[index]

    sample = {'image' : img, 'creator' : creator, 'material' : material, 'type' : art_type}
    return sample

In [None]:
train = train_dataset()
test = test_dataset()
val = val_dataset()

# Data Loader

In [None]:
BATCH_SIZE = 32

train_dataloader = DataLoader(train, shuffle=True, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test, shuffle=False, batch_size=BATCH_SIZE)
val_dataloader = DataLoader(val, shuffle=False, batch_size=BATCH_SIZE)

# Train and Evaluate Module

In [None]:
###main train and evaluate function ###
def train_main(model,loss_fn,optimizer,train_loader,device):
  loss_epoch=[]
  for batchind,data in enumerate(train_loader):
    model.train()
    ##(b,c,h,w)
    images = torch.Tensor(data['image']).to(device)
    label = torch.Tensor(data['material']).to(device)
    #print(images.shape)
    optimizer.zero_grad()
    outputs=model(images)
    outputs=outputs.unsqueeze(1).float()
    label=label.unsqueeze(1).float()

    loss=loss_fn(outputs,label)
    loss_epoch.append(loss.item())

    loss.backward()
    optimizer.step()
    print("batch : ", batchind, " is finished.")
  return loss_epoch

def evaluate_whole(model,evaluate_loader):
  predict_list=[]
  y_ = []
  model.eval()
  with torch.no_grad():
    for batchind,data in enumerate(evaluate_loader):
      images = torch.Tensor(data['image']).to(device)
      label = torch.Tensor(data['material']).to(device).float()
      ##change to (1,c,h,w)
      outputs=model(images).detach().float()
      predict_list.append(torch.Tensor.round(outputs))
      y_.append(torch.Tensor.round(label))
      
  #mAP
  print(y_[0].shape)
  print(predict_list[0].shape)
  mAp = 0
  for index in range(len(predict_list)-1):
    for i in range(BATCH_SIZE):
      mAp = mAp + average_precision_score(predict_list[index][i, :].detach().cpu().numpy(), y_[index][i, :].detach().cpu().numpy())
    mAp = mAp / BATCH_SIZE
  return predict_list, mAp

# ResNet 50

In [None]:
##load pretrained resnet50 network ###
resnet50 = torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_resnet50', pretrained=True)
resnet50=nn.Sequential(
    resnet50,
    nn.Linear(1000,22),
    nn.Sigmoid()
)

Downloading: "https://github.com/NVIDIA/DeepLearningExamples/zipball/torchhub" to /root/.cache/torch/hub/torchhub.zip
Downloading: "https://api.ngc.nvidia.com/v2/models/nvidia/resnet50_pyt_amp/versions/20.06.0/files/nvidia_resnet50_200821.pth.tar" to /root/.cache/torch/hub/checkpoints/nvidia_resnet50_200821.pth.tar


  0%|          | 0.00/97.7M [00:00<?, ?B/s]

In [None]:
num_epoch=3
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model=resnet50.to(device)
model.load_state_dict(torch.load(drive / "model" / "ResNet50_material_v2" / 'model_paramenter_ResNet50_material_v2.txt'))
loss_fn = nn.BCELoss()
optimizer=optim.Adam(filter(lambda p:p.requires_grad,model.parameters()),lr=1e-4)
state_dict = torch.load(drive / "model" / "ResNet50_material_v2" / 'optimizer_paramenter_ResNet50_material_v2.txt')
print(type(state_dict))
# optimizer.load_state_dict(state_dict)

<class 'dict'>


In [None]:
train_loss=[]
evaluate_score=[]
for i in range(num_epoch):
  loss=train_main(model,loss_fn,optimizer,train_dataloader, device)
  train_loss.append(np.mean(loss))
  predict_r, mAp =evaluate_whole(model, evaluate_loader=val_dataloader)
  evaluate_score.append(mAp)
  print(f'epoch{i+1} has been trained')

batch :  0  is finished.
batch :  1  is finished.
batch :  2  is finished.
batch :  3  is finished.
batch :  4  is finished.
batch :  5  is finished.
batch :  6  is finished.
batch :  7  is finished.
batch :  8  is finished.
batch :  9  is finished.
batch :  10  is finished.
batch :  11  is finished.
batch :  12  is finished.
batch :  13  is finished.
batch :  14  is finished.
batch :  15  is finished.
batch :  16  is finished.
batch :  17  is finished.
batch :  18  is finished.
batch :  19  is finished.
batch :  20  is finished.
batch :  21  is finished.
batch :  22  is finished.
batch :  23  is finished.
batch :  24  is finished.
batch :  25  is finished.
batch :  26  is finished.
batch :  27  is finished.
batch :  28  is finished.
batch :  29  is finished.
batch :  30  is finished.
batch :  31  is finished.
batch :  32  is finished.
batch :  33  is finished.
batch :  34  is finished.
batch :  35  is finished.
batch :  36  is finished.
batch :  37  is finished.
batch :  38  is finish

In [None]:
root_path = drive / "model" / "ResNet50_material_v2"
path=os.path.join(root_path / 'model_paramenter_ResNet50_material_v2.txt')
torch.save(model.state_dict(),path)

path2=os.path.join(root_path / 'optimizer_paramenter_ResNet50_material_v2.txt')
torch.save(optimizer.state_dict(),path2)

In [None]:
mAp

1.032258064516129

In [None]:
predict_r, mAp =evaluate_whole(model, evaluate_loader=test_dataloader)

AttributeError: ignored

.