In [None]:
##STEP 1: Open all files

import pandas as pd

anime_train = pd.read_csv(".//Database//Anime_train.csv")
anime_test = pd.read_csv(".//Database//Anime_test.csv")

impath_train = ".//Database//char train//"
impath_test = ".//Database//char test//"


#Original format for characters:
anime_train["Main Characters"] = anime_train["Main Characters"].apply(lambda x:[int(y) for y in x[1:-1].split(", ")])
anime_test["Main Characters"] = anime_test["Main Characters"].apply(lambda x:[int(y) for y in x[1:-1].split(", ")])

In [None]:
###ALL GENERAL VALUES:

batch_size = 16
max_length_syn = 128
max_length_char = 256
epochs = 5 #1000

learning_rate = 5e-5
eps_value = 1e-8
syn_model_name_or_path = 'gpt2'
img_model_name_or_path = 'microsoft/resnet-50'

import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"torch running in {device}")

from transformers import set_seed
set_seed(42)

In [None]:
###STEP 2: Open all datasets

from torch.utils.data import Dataset
import numpy as np
import os
from PIL import Image 
import numpy as np

from tqdm import tqdm

def scaler(values, ratio): #Only works with positive values.
    return [(v - min(values)) * (ratio / (max(values) - min(values))) for v in values]

class AnimeDataset(Dataset):
    def __init__(self, df, mchars, img_path, transform=None):

        self.labels = []
        for x in df.index:
            self.labels.append(df["Score"][x])

        self.transform = transform
        self.img = []
        for x in tqdm(df.index, desc="Concatenating portraits"):
            list_chars = [str(y)+".png" for y in mchars[x]]
            chars = []
            for y in list_chars:
                img_name = img_path+y
                personaje = Image.open(img_name).convert('RGB') #Abrir la imagen
                chars.append(personaje)

            #Appending all portraits, horizontally.
            widths, heights = zip(*(img.size for img in chars))
            retratos = Image.new('RGB', (sum(widths), max(heights)))
            
            x_offset = 0
            for img in chars:
                retratos.paste(img, (x_offset, 0))
                x_offset += img.width

            if self.transform:
                retratos = self.transform(retratos)

            self.img.append(retratos)

        self.labels = scaler(self.labels, 1)

        return
    
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, item):
        return {'img':self.img[item], 'label':self.labels[item]}
    

transf = None
####It can be:
##
##transf = transforms.Compose([
##    transforms.Resize(size),        # You should first define a tuple "size". This can aid in memory saving.
##    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize images. Values are just illustrative.
##    ])

train_dataset = AnimeDataset(df=anime_train, mchars=anime_train["Main Characters"], img_path=impath_train, transform=transf)
test_dataset = AnimeDataset(df=anime_test, mchars=anime_test["Main Characters"], img_path=impath_test, transform=transf)

In [None]:
###STEP 3: Collate all the data into a unified input

class AnimeRegressionCollator(object):
    def __init__(self, img_processor):
        self.img_processor = img_processor
        return

    def __call__(self, sequences):
        labels = [sequence['label'] for sequence in sequences]

        img = [sequence['img'] for sequence in sequences]

        inputs = {'portraits': self.img_processor(images=img, return_tensors="pt")}
        inputs.update({'labels': torch.tensor(np.array(labels), dtype=torch.float)})
        return inputs

In [None]:
from transformers import AutoImageProcessor

processor = AutoImageProcessor.from_pretrained(pretrained_model_name_or_path=img_model_name_or_path)

regression_collator = AnimeRegressionCollator(img_processor=processor)

from torch.utils.data import DataLoader

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=regression_collator)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=regression_collator)

In [None]:
###STEP 4: Initialize the neural network

import torch.nn as nn
from transformers import ResNetModel

class AniNet(nn.Module):
    def __init__(self, pretr_img):
        super(AniNet, self).__init__()

        #CHARACTERS:
        self.img_net = ResNetModel.from_pretrained(pretr_img)

        self.char_classifier = nn.Sequential(
                nn.Dropout(p=0.1),
                nn.Linear(49, 24, bias=True),
                nn.Tanh(),
                nn.Linear(24, 12, bias=True),
                nn.Tanh(),
                nn.Linear(12, 6, bias=True),
                nn.ReLU(),
                nn.Linear(6, 3, bias=True),
                nn.ReLU(),
                nn.Linear(3, 1, bias=True),
                nn.ReLU(),
                )
        
    def forward(self, img_input_ids):

        img_output = self.img_net(img_input_ids).last_hidden_state[:, 0, :]
        img_output = img_output.view(img_output.shape[0], -1) #Flattening to shape [bsize, 49]

        output = self.char_classifier(img_output)

        return output
    

model = AniNet(pretr_img=img_model_name_or_path)
model.to(device)

In [None]:
def training_loop(train_loader, predictions, true_labels, optimizer_, scheduler_, device_, loss_fn):
    global model

    model.train()

    total_loss = 0

    for batch in tqdm(train_loader, total=len(train_loader), desc="Batch"):

        true_labels += batch['labels'].numpy().flatten().tolist()

        model.zero_grad()
        
        ##INPUTS:
        img_key = 'pixel_values' #'input_ids'
        img_input_ids = batch['portraits'][img_key].type(torch.float).to(device_)

        outputs = model(img_input_ids=img_input_ids).to(device_)
        
        logits = outputs

        predictions_loss = logits.squeeze()

        lbels = torch.Tensor(batch['labels'].float()).to(device_)
        loss = loss_fn(predictions_loss, lbels)
        total_loss += loss.item()

        #optimizer_.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer_.step()
        scheduler_.step()

        predictions += predictions_loss

    avg_epoch_loss = total_loss / len(train_loader)
    return true_labels, predictions, avg_epoch_loss


def validation(test_loader, device_, loss_fn):
    global model

    predictions = []
    true_labels = []

    total_loss = 0

    model.eval()

    for batch in tqdm(test_loader, total=len(test_loader)):
        true_labels += batch['labels'].numpy().flatten().tolist()
        
        ##INPUTS:
        img_key = 'pixel_values' #'input_ids'
        img_input_ids = batch['portraits'][img_key].type(torch.float).to(device_)


        with torch.no_grad(): # Telling the model not to compute or store gradients, saving memory and speeding up validation
            outputs = model(img_input_ids=img_input_ids).to(device_)
            logits = outputs

            predictions += logits.squeeze().detach().cpu().tolist()
            predictions_loss = torch.Tensor(logits.squeeze().detach().cpu()).to(device_)

            loss = loss_fn(predictions_loss, torch.Tensor(batch['labels'].float()).to(device_))

            total_loss += loss.item()

    avg_epoch_loss = total_loss / len(test_loader)

    return true_labels, predictions, avg_epoch_loss

In [None]:
from Ca_Naxca import regression_report
from transformers import get_linear_schedule_with_warmup

##TRAIN THE MODEL.

optimizer_ = torch.optim.AdamW(model.parameters(), lr = learning_rate, eps = eps_value)
total_steps = len(train_dataloader) * epochs
scheduler_ = get_linear_schedule_with_warmup(optimizer_, num_warmup_steps = 0, num_training_steps = total_steps)
loss_fn = nn.MSELoss()  # Loss function for regression problems


trainval = []

for epoch in tqdm(range(epochs), desc="Epoch"):
    true_labels = []
    predictions = []
    avg_epoch_loss = 0

    # Train the model:
    true_labels, predictions, train_loss = training_loop(train_dataloader, predictions, true_labels, optimizer_, scheduler_, device, loss_fn)
    # Test the model:
    valid_labels, valid_predict, val_loss = validation(test_dataloader, device, loss_fn)

    # Is it good enough?
    reporte = regression_report(valid_labels, valid_predict, [i for i in range(len(valid_labels))])
    reps = reporte.display()
    print(reps)
    reps.to_csv(".//final_reports//Img//reporte_img.csv")

    for x in [[valid_labels[i], valid_predict[i]] for i in range(10)]:
        print(x)

    print("  train_loss: %.5f - val_loss: %.5f "%(train_loss, val_loss))
    print()
    trainval.append([train_loss, val_loss])


import json

jsonfile = open(".//final_reports//Img//train_val_loss.json", "w")
json.dump(trainval, jsonfile)
jsonfile.close()