# Test pipeline of the project

## Dataset

In [None]:
import os
from src.utils.data_loading_functions import *

In [None]:
label_path="data/MLinApp_course_data/tcga_mir_label.csv"
data_path="data/MLinApp_course_data/tcga_mir_rpm.csv"

miRna_labels, miRna_data, miRna_tissues = load_data(data_path, label_path)

### Preprocessing data: class balancing, normalization and split.

In [None]:
miRna_data, miRna_labels , _, _, _, _ = class_balancing(miRna_labels, miRna_data, miRna_tissues)

miRna_data = normalize_data(miRna_data)

train_data, test_data, train_label, test_label = split_data(miRna_data, miRna_labels)

We are gonna use only the test data because we don't need to train.

## Intelligent Reasoning

In [None]:
from src.utils.metadata_functions import *
from src.utils.utils import *
from src.utils.statistics import *

These superclasses are selected in the training phase of the representation procesess

In [None]:
superclasses = { '0' :
    ['BRCA', 'KICH', 'KIRC', 'LUAD', 'LUSC', 'MESO', 'SARC', 'UCEC'], '1' :
    ['BLCA', 'CESC', 'HNSC', 'KIRP', 'PAAD', 'READ', 'STAD'], '2' :
    ['DLBC', 'LGG', 'PRAD', 'TGCT', 'THYM', 'UCS'], '3' :
    ['ACC', 'CHOL', 'LIHC'], '4' :
    ['ESCA', 'PCPG', 'SKCM', 'THCA', 'UVM']
}

### Metadata creation and classification

In [None]:
test_superlabel = lab2super(test_label, superclasses)

In [None]:
ovo, ovr = load_ovo_ovr('_trained_1')

Creation of test metadata

In [None]:
meta_data = meta_data_creation_test(ovo, ovr, test_data, test_superlabel)

In [None]:
meta_data.shape

In [None]:
rf = load_model('src/models/metadata/', 'rf_trained_1')

In [None]:
prediction = rf.predict(meta_data)

In [None]:
show_matrix(test_superlabel, prediction, superclasses.keys())


In [None]:
# Get some statistics
from sklearn.metrics import classification_report
print(classification_report(test_superlabel, prediction))

## CNN and SCNN classification

In [None]:
# TODO: add all the test pipeline of the CNN and SCNN classification.
from src.utils.dataloader import load_dataset
from src.models.CNN import CNN
from src.models.SCNN import SCNN

import torch
import torch.nn as nn
import torch.optim as optim

import os
import json

def get_device():
    if torch.cuda.is_available():
        return 'cuda'
    elif torch.backends.mps.is_available():
        return 'mps'
    else:
        return 'cpu'

# device = get_device()
device = 'cpu'

In [None]:
superclasses = [
    ['BRCA', 'KICH', 'KIRC', 'LUAD', 'LUSC', 'MESO', 'SARC', 'UCEC'],
    ['BLCA', 'CESC', 'HNSC', 'KIRP', 'PAAD', 'READ', 'STAD'],
    ['DLBC', 'LGG', 'PRAD', 'TGCT', 'THYM', 'UCS'],
    ['ACC', 'CHOL', 'LIHC'],
    ['ESCA', 'PCPG', 'SKCM', 'THCA', 'UVM']
]

In [None]:
def load_params(typeof='cnn', num_class=0):
    '''
    Load the parameters of the model.

    Args:
        typeof (str): type of the model, cnn or scnn.
        num_class (int): number of classes.

    Returns:
        params (dict): dictionary with the parameters of the model.
    '''
    assert typeof.startswith(('cnn', 'scnn')), 'Type of model not supported'
    params_path = os.path.join('data', 'params', typeof, f'{typeof}_class{num_class}.json')

    return json.loads(open(params_path).read())

def set_model(model_name: str, num_class: int, params: dict):
    '''
    Set the model with its parameters, given the name of the model, the class number and the parameters.

    Args:
        model_name (str): name of the model.
        num_class (int): class index.
        params (dict): dictionary with the parameters of the model.
    
    Returns:
        model (nn.Module): model with the parameters set.
    '''
    filter_numbers = [params['nf1'], params['nf2'], params['nf3']]
    convolution_windows = [params['cw1'], params['cw2'], params['cw3']]
    max_pooling_windows = [params['pw1'], params['pw2'], params['pw3']]
    dropout = [params['dropout_0'], params['dropout_1']]
    final_nf = params['nf4']
    
    if model_name.startswith('cnn'):
        model = CNN(num_class, filter_numbers, convolution_windows, max_pooling_windows, final_nf, dropout)
    else:
        model = SCNN(num_class, filter_numbers, convolution_windows, max_pooling_windows, final_nf, dropout)
    model.to(device)
    return model

def load_model(model_path: str, typeof: str, num_class: int, params: dict):
    '''
    Load the model given the path, the type and the class number.

    Args:
        model_path (str): path of the model.
        typeof (str): type of the model.
        num_class (int): class index.

    Returns:
        model (nn.Module): model loaded.
    '''
    assert typeof.startswith(('cnn', 'scnn')), 'Type of model not supported'
    filepath = os.path.join(model_path, f'{typeof}_class{num_class}.pth')
    model = set_model(typeof, num_class, params)
    model.load_state_dict(torch.load(filepath))
    return model

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    return correct

def experiment(model_name: str, params: dict, metalabel: int, labels_of_metaclass, epochs:int, mode: str = 'train'):
    num_classes = len(labels_of_metaclass)

    print(f'METACLASS LABELS: {labels_of_metaclass}')
    train_dataloader, test_dataloader = load_dataset(name='cancer', batch_size=params['batch_size'],
                                                     metalabel=metalabel, labels_of_metaclass=labels_of_metaclass)

    if mode == 'train':
        model = set_model(model_name, num_classes, params)
    else:
        model = load_model(os.path.join('data', 'models', model_name), model_name, metalabel, params)
    epochs = epochs  # TODO: be careful
    loss_fn = nn.CrossEntropyLoss()
    # optimizer = torch.optim.SGD(model.parameters(), lr=params['lr'])
    optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])

    for t in range(epochs):
        print(f"Epoch {t + 1}\n-------------------------------")
        if mode == 'train':
            train(train_dataloader, model, loss_fn, optimizer)
        accuracy = test(test_dataloader, model, loss_fn)
        print(accuracy)
    print("Done!")
    print(f"Accuracy for superclass {metalabel}: {accuracy}")

    
    # input('Premi un tasto per concludere l esperimento...')
    save_filepath = os.path.join('data', 'models', model_name)
    if not os.path.exists(save_filepath):
        os.makedirs(save_filepath)

    if mode == 'train':    
        torch.save(model.state_dict(), os.path.join(save_filepath, f'{model_name}_class{metalabel}.pth'))

    # Call the save_model function to save the model



In [None]:
model_name = 'cnn'
metalabel = 0
epochs = 10

In [None]:

for metalabel in range(5):
    params = load_params(model_name, metalabel)
    metaclass_labels = superclasses[metalabel]
    experiment(
        model_name=model_name,
        params=params,
        metalabel= metalabel,
        labels_of_metaclass=metaclass_labels,
        epochs=epochs
    )

In [None]:
params = load_params(model_name, metalabel)
metaclass_labels = superclasses[metalabel]

experiment(
    model_name=model_name,
    params=params,
    metalabel= metalabel,
    labels_of_metaclass=metaclass_labels,
    epochs=epochs,
    mode='test'
)