# Importing the Required Modules

In [None]:
# Standard Libraries
import re
import pickle

# Data Libraries
import pandas as pd
import numpy as np

# Functionality
from typing import List, Dict, Union

In [None]:
from torch.utils.data.dataset import Dataset
from torch.utils.data.dataloader import DataLoader
from torchvision import transforms
import torchvision.models as models
import torch.optim as optim
import torch.nn as nn
import torch
import time
import os
import copy
import argparse

In [None]:
from sklearn.model_selection import train_test_split
from torch.autograd import Variable, Function
from sklearn import metrics
from tqdm import tqdm

In [None]:
from pathlib import Path
PATH=Path("drive/MyDrive/ACS_AI_A1/") 

In [None]:
PATH_SHARED=Path("drive/MyDrive/research/") 
!ls $PATH_SHARED

# Loading the Dataset

In [None]:
data_news_all = pd.read_csv(f"{PATH}/Research/Combined/sarcastic_nonsarcastic_img.csv")

In [None]:
images = data_news_all['photo_path'].values
label = data_news_all['sarcastic'].values

In [None]:
images, image_rem, label, label_rem = train_test_split(images, label, train_size=0.17, random_state=42)

In [None]:
image_train, image_rem, label_train, label_rem = train_test_split(images, label, train_size=0.8, random_state=42) 
image_valid, image_test, label_valid, label_test = train_test_split(image_rem, label_rem, test_size=0.6, random_state=42) 

In [None]:
print('Shape of training data: ')
print(image_train.shape)
print(label_train.shape)

print('Shape of val data: ')
print(image_valid.shape)
print(label_valid.shape)

print('Shape of test data: ')
print(image_test.shape)
print(label_test.shape)

In [None]:
dataset_train = {
    "photo_path": image_train,
    "image_label": label_train
}
dataframe_train = pd.DataFrame(dataset_train)

In [None]:
dataset_test = {
    "photo_path": image_test,
    "image_label": label_test
}
dataframe_test = pd.DataFrame(dataset_test)

In [None]:
dataset_valid = {
    "photo_path": image_valid,
    "image_label": label_valid
}
dataframe_valid = pd.DataFrame(dataset_valid)

In [None]:
from PIL import Image
class CustomDatasetFromCSV(Dataset):
    def __init__(self, csv, transforms=None):
        self.data = csv
        self.labels = np.asarray(self.data.iloc[:, 1])
        self.transforms = transforms

    def __getitem__(self, index):
        single_image_label = self.labels[index]
        single_image_path = self.data.photo_path[index]
        
        
        im_as_im = Image.open(rf"{single_image_path}")
        
        img_as_np = np.asarray(im_as_im)

        img_as_img = Image.fromarray(img_as_np.astype(np.uint8))
        img_as_img = img_as_img.convert('RGB')
        
        if self.transforms is not None:
            img_as_tensor = self.transforms(img_as_img)
        return (img_as_tensor, single_image_label)

    def __len__(self):
        return len(self.data.index)

In [None]:
transformations = transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])


In [None]:
train_dataset = \
    CustomDatasetFromCSV(dataframe_train, transformations)
loader_train = torch.utils.data.DataLoader(dataset=train_dataset,
                                                    batch_size=10,
                                                    shuffle=False)
test_dataset = \
    CustomDatasetFromCSV(dataframe_test, transformations)
loader_test = torch.utils.data.DataLoader(dataset=test_dataset,
                                                    batch_size=10,
                                                    shuffle=False)
valid_dataset = \
    CustomDatasetFromCSV(dataframe_valid, transformations)
loader_valid = torch.utils.data.DataLoader(dataset=valid_dataset,
                                                    batch_size=10,
                                                    shuffle=False)

In [None]:
# Detect if we have a GPU available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

# Alexnet


## Training

In [None]:
import torch.nn.functional as F
class Alexnet(nn.Module):
    def __init__(self):
        super(Alexnet, self).__init__()

        alexnet = models.alexnet(pretrained=True)
        params = []
        for param in alexnet.parameters():
            param.requires_grad = False

            params.append(param)

        params[-1].requires_grad = True # retrain last dense layer's bias
        params[-2].requires_grad = True # retrain last dense layer's weights    

        num_ftrs = alexnet.classifier._modules['6'].out_features
        self.vgg = alexnet
        self.image_fc1 = nn.Linear(num_ftrs, 64)
        self.image_adv = nn.Linear(64, int(64))
        self.image_encoder = nn.Linear(64, 64)

        self.class_classifier = nn.Sequential()
        self.class_classifier.add_module('c_fc1', nn.Linear(64, 2))
        self.class_classifier.add_module('c_softmax', nn.Softmax(dim=1))
      
    def forward(self, image):
      image = self.vgg(image) 
      image = F.relu(self.image_fc1(image))

      class_output = self.class_classifier(image)

      return class_output

In [None]:
 def train_loop(model: Alexnet, train_loader, valid_loader, num_epochs = 10, lr = 0.001, verbose = True):
    
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(
        filter(lambda p: p.requires_grad, list(model.parameters())),
        lr=lr, 
        weight_decay=0.1
    )

    best_valid_acc = 0.0

    for epoch in tqdm(range(num_epochs)):

        p = float(epoch) / num_epochs

        optimizer.lr = 0.001 / (1. + 10 * p) ** 0.75
        cost_vector = []
        class_cost_vector = []
        domain_cost_vector = []
        acc_vector = []

        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            class_outputs = model(inputs)

            class_loss = criterion(class_outputs, labels)

            loss = class_loss

            loss.backward()
            optimizer.step()

            _, argmax = torch.max(class_outputs, 1)
            accuracy = (labels == argmax.squeeze()).float().mean()

            class_cost_vector.append(class_loss.item())
            cost_vector.append(loss.item())
            acc_vector.append(accuracy.item())

        model.eval()
        results = evaluate_loop(model, valid_loader)
        model.train()

        best = False
        if results['label']['accuracy'] > best_valid_acc:
            best_valid_acc = results['label']['accuracy']
            best = True

        if not os.path.exists(f'{PATH}/Research/Alexnet'):
            os.makedirs(f'{PATH}/Research/Alexnet')

        model_name = str(epoch + 1)
        if best:
            model_name = model_name + '-best'
            best_model_path = os.path.join(f'{PATH}/Research/Alexnet', model_name)

        torch.save(model.state_dict(), os.path.join(f'{PATH}/Research/Alexnet', model_name))

        if verbose:
            print('Epoch [%d/%d],  Loss: %.4f, Class Loss: %.4f, Train_Acc: %.4f,  Validate_Acc: %.4f.' % \
                (
                    epoch + 1, 
                    num_epochs, 
                    np.mean(cost_vector), 
                    np.mean(class_cost_vector),
                    np.mean(acc_vector), 
                    results['label']['accuracy']
                )
            )

    return best_model_path

In [None]:
def evaluate_loop(model, test_dataset):

    label_pred = []
    label_true = []
    for i in range(test_dataset.__len__()):

        label_outputs = model(test_dataset.__getitem__(i)[0].view(1, 3, 224, 224))
        _, label_argmax = torch.max(label_outputs, 1)

        
        label_pred.append(label_argmax.squeeze().cpu().numpy())
        label_true.append(test_dataset.__getitem__(i)[1])



    preds = {
        'label': {
            'pred': label_pred,
            'true': label_true
        }
    }

    results = {name: {} for name in preds}

    pred_name = 'label'
    pred = preds[pred_name]['pred']
    true = preds[pred_name]['true']

    results[pred_name]['accuracy'] = metrics.accuracy_score(true, pred)
    results[pred_name]['f1_score'] = metrics.f1_score(true, pred, average='macro')
    results[pred_name]['precision'] = metrics.precision_score(true, pred, average='macro')
    results[pred_name]['recall'] = metrics.recall_score(true, pred, average='macro')
    results[pred_name]['confusion_matrix'] = metrics.confusion_matrix(true, pred)
    results[pred_name]['report'] = metrics.classification_report(true, pred)

    return results

In [None]:
model = Alexnet()
model.to(device)

In [None]:
model_path = train_loop(
    model=model, 
    train_loader=loader_train, 
    valid_loader=valid_dataset,
    num_epochs=1
)
model_path

In [None]:
def predict(model, test_image_tensor):
    with torch.no_grad():
        model.eval()
        out = model(test_image_tensor)
        ps = torch.exp(out)
        topk, topclass = ps.topk(1, dim=1)
        print("Output class :  ", topclass.cpu().numpy()[0][0])
    return topclass

In [None]:
results = evaluate_loop(model, test_dataset)
results

{'label': {'accuracy': 0.6929460580912863,
  'confusion_matrix': array([[176,  79],
         [ 69, 158]]),
  'f1_score': 0.6925172413793104,
  'precision': 0.6925170068027211,
  'recall': 0.6931156603610606,
  'report': '              precision    recall  f1-score   support\n\n           0       0.72      0.69      0.70       255\n           1       0.67      0.70      0.68       227\n\n    accuracy                           0.69       482\n   macro avg       0.69      0.69      0.69       482\nweighted avg       0.69      0.69      0.69       482\n'}}

## Saving the model

In [None]:
torch.save(model.state_dict(), f"{PATH}/Research/Alexnet/weights.h5")

In [None]:
torch.save(model, f"{PATH}/Research/Alexnet/model.pth")

In [None]:
model_scripted = torch.jit.script(model) 
model_scripted.save(f"{PATH}/Research/Alexnet/model.pt")
