In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import numpy as np
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import sys
import os
import time
from PIL import Image
import cv2
import csv
import copy
import json
from collections import OrderedDict
from scipy import spatial
import glob

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

from torchvision import models
import torchvision.transforms as transforms

import PIL.Image
import pickle
from joblib import dump, load

import warnings
warnings.filterwarnings('ignore')

In [None]:
data_dir = '/content/drive/MyDrive/progetto_2021_dataset'
#data_dir = './progetto_2021_dataset'

#with open('./progetto_2021_dataset/train_test_split_dict.json') as f:
with open('/content/drive/MyDrive/progetto_2021_dataset/train_test_split_dict.json') as f:
    dataset_json = json.load(f)

In [None]:
labels = set()

for k in dataset_json.values():
    for lable_list in k.values():
        for v in lable_list:
            labels.add(v)
            
label_idx = {v: i for i, v in enumerate(sorted(labels))}

In [None]:
def pull_diz(name): 
    with open(name, "rb") as myFile:
        myNewPulledInDictionary = pickle.load(myFile)
    return myNewPulledInDictionary

In [None]:
#dict_test_score= pull_diz('./progetto_2021_dataset/dict_score_film.txt')
dict_test_score= pull_diz('/content/drive/MyDrive/progetto_2021_dataset/dict_score_film.txt')

In [None]:
path_drive='/content/drive/MyDrive'

path_without_drive='.'

In [None]:

class FrameDataset(Dataset):
    def __init__(self, root, labels_dict):
        self.root = root
        self.labels_dict = labels_dict
        self.film_names = np.array(list(self.labels_dict.keys())) #nomi cartelle
    
    def _transform(self, image):    
        transform = transforms.Compose([transforms.CenterCrop(224),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomRotation(degrees=45),
                transforms.ToTensor()])
        return transform(image)
    
    def __len__(self):
        return len(self.film_names)
    
    def __getitem__(self, index):
        film_name = self.film_names[index] #nome cartella

        numImgFilm=10
        
        dict_frame= dict_test_score.get(film_name)

        cont=0
        for i in dict_frame:
          if(cont==0):
            film=self._transform(Image.open(path_drive+i[1:]).convert('RGB')).unsqueeze(0) 
          else:
            film = torch.cat([film, self._transform(Image.open(path_drive+i[1:]).convert('RGB')).unsqueeze(0)])
          if(cont==numImgFilm-1):
            break
          cont+=1

        numImgDaGen= numImgFilm-len(dict_frame)

        if(len(dict_frame)==0):
          for img in os.listdir(self.root + f'/{film_name}'):
            path_img=self.root +'/'+ film_name +'/'+img
            film=self._transform(Image.open(path_img).convert('RGB')).unsqueeze(0)
            break
          numImgDaGen=numImgDaGen-1
          for i in range(numImgDaGen):
            film = torch.cat([film,self._transform(Image.open(path_img).convert('RGB')).unsqueeze(0)],0)

        else:
          for i in range(numImgDaGen):
              film = torch.cat([film,self._transform(Image.open(path_drive+list(dict_frame.items())[0][0][1:]).convert('RGB')).unsqueeze(0)])

        labels = torch.zeros(len(label_idx), dtype=torch.float32)
        for label in self.labels_dict[film_name]:
            labels[label_idx[label]] = 1
            
        return film, labels #tensori

In [None]:
# CREATE DATASET
dataset_train = FrameDataset(data_dir, dataset_json['train'])
dataset_test = FrameDataset(data_dir, dataset_json['test'])

In [None]:
# CREATE TRAIN AND VALIDATION DATA LOADERS
train_loader = DataLoader(dataset_train, batch_size=32, num_workers=2)
data_loaders = {'train': train_loader}
test_loader = DataLoader(dataset_test, batch_size=32, num_workers=2)

In [None]:
class MVCNN(nn.Module):
    def __init__(self, num_classes=3, pretrained=True):
        super(MVCNN, self).__init__()
        resnet = models.resnet50(pretrained = pretrained)
        fc_in_features = resnet.fc.in_features
        self.features = nn.Sequential(*list(resnet.children())[:-1])
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(fc_in_features, 2048),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(2048, 2048),
            nn.ReLU(inplace=True),
            nn.Linear(2048, num_classes)
        )

    def forward(self, inputs): # inputs.shape = samples x views x height x width x channels
        inputs = inputs.transpose(0, 1)
        view_features = [] 
        for view_batch in inputs:
            view_batch = self.features(view_batch)
            view_batch = view_batch.view(view_batch.shape[0], view_batch.shape[1:].numel())
            view_features.append(view_batch)   

        pooled_views= torch.mean(torch.stack(view_features),0)
        outputs = self.classifier(pooled_views)
        return outputs

In [None]:
# BUILD AND VISUALIZE THE MODEL
model = MVCNN(num_classes=85, pretrained=True)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


HBox(children=(FloatProgress(value=0.0, max=102530333.0), HTML(value='')))




In [None]:
# DEFINE THE DEVICE
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time()

    for epoch in range(1, num_epochs+1):
        print('Epoch {}/{}'.format(epoch, num_epochs))
        print('-' * 10)

        for phase in ['train']:
          
            model.train()  # Set model to training mode

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    outputs = model(inputs)
                    outputs = torch.sigmoid(outputs)
                    loss = criterion(outputs, labels)

                    loss.backward()
                    optimizer.step()
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

    return model

In [None]:
for param in model.features.parameters():
     param.requires_grad = False

In [None]:
# TRAIN THE CLASSIFIER BLOCK OF THE MODEL (I.E TOP DENSE LAYERS)
model.to(device)
EPOCHS = 20
criterion = nn.BCELoss()
optimizer = optim.Adam(model.classifier.parameters(), lr=0.0005)
model = train_model(model=model, dataloaders=data_loaders, criterion=criterion, optimizer=optimizer, num_epochs=EPOCHS)

Epoch 1/20
----------

Epoch 2/20
----------

Epoch 3/20
----------

Epoch 4/20
----------

Epoch 5/20
----------

Epoch 6/20
----------

Epoch 7/20
----------

Epoch 8/20
----------

Epoch 9/20
----------

Epoch 10/20
----------

Epoch 11/20
----------

Epoch 12/20
----------

Epoch 13/20
----------

Epoch 14/20
----------

Epoch 15/20
----------

Epoch 16/20
----------

Epoch 17/20
----------


In [None]:
"""torch.save({
            'epoch': EPOCHS,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': criterion,
            }, '/content/drive/MyDrive/progetto_2021_dataset/outputs/model_mean_final10.pth')
# plot and save the train and validation line graphs"""

In [None]:
from PIL import Image

from skimage import io

topk=10
predictions = []
y_true = []

with torch.no_grad():
  
  for inputs, labels in test_loader:

    inputs = inputs.to(device)
    logps = model(inputs)

    y_pred = torch.sigmoid(logps)

    _, idx = y_pred.topk(topk, dim=1)

    y_pred = torch.zeros_like(y_pred)
    y_pred.scatter_(1, idx, 1)
    predictions.append(y_pred.cpu())

    y_true.append(labels.cpu())


y_true, predictions = torch.cat(y_true, axis=0), torch.cat(predictions, axis=0)

In [None]:
from sklearn.metrics import classification_report

report = classification_report(y_true, predictions, 
                               target_names=list(sorted(label_idx.keys())))
print(report)