# Imports and Data Location

In [9]:
import timm
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import StratifiedKFold
from tqdm.notebook import tqdm
import tensorflow as tf
from PIL import Image
import re
import math
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
import os
import pandas as pd
!pip install torch-summary
from torchsummary import summary
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
import json
from tabulate import tabulate
from sklearn.manifold import TSNE
import pandas as pd
import seaborn as sns
from sklearn.decomposition import PCA
from keras.preprocessing.image import img_to_array
import operator
from mpl_toolkits.axes_grid1 import ImageGrid
colab = False
if colab:
    MODEL_PATH ='/content/gdrive/Shareddrives/520 Project/Saved Models/ViT/best_ViT_one_layer.pth'
    DATA_PATH = '/content/gdrive/Shareddrives/520 Project/Data/wikipaintings_test'
    DICT_PATH = '/content/gdrive/Shareddrives/520 Project/styles_dictionary.json'
else:
    MODEL_PATH ='/projectnb2/dl523/projects/Sarcasm/520 Project/Saved_Models/best_ViT_one_layer.pth'
    MODEL_PATH_EFF = '/projectnb2/dl523/projects/Sarcasm/520 Project/effnetv2_model'
    DATA_PATH = '/projectnb/dl523/projects/Sarcasm/wikipaintings_full/wikipaintings_test'
    DICT_PATH = '/projectnb/dl523/projects/Sarcasm/styles_dictionary.json'
    
# Enable GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/share/pkg.7/python3/3.8.6/install/bin/python3.8 -m pip install --upgrade pip' command.[0m


In [2]:
!pip install tensorflow_addons

import os
import sys
import tensorflow.compat.v1 as tf

# Download source code.
if "efficientnetv2" not in os.getcwd():
    !git clone --depth 1 https://github.com/google/automl
    os.chdir('automl/efficientnetv2')
    sys.path.append('.')
else:
    !git pull

def download(m):
    if m not in os.listdir():
        !wget https://storage.googleapis.com/cloud-tpu-checkpoints/efficientnet/v2/{m}.tgz
        !tar zxf {m}.tgz
    ckpt_path = os.path.join(os.getcwd(), m)
    return ckpt_path

import effnetv2_model

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/share/pkg.7/python3/3.8.6/install/bin/python3.8 -m pip install --upgrade pip' command.[0m
fatal: destination path 'automl' already exists and is not an empty directory.


# ViT Loading and Helper Functions

In [None]:
# hook for the selected input layer [name] in ViT model
features = {}
def get_features(name):
    def hook(model, input, output):
        features[name] = output.detach()
    return hook



In [None]:
# Evaluate the model to extract intermediate weights
def test_vit(testloader):
    our_ViT.eval()
    correct = 0
    total = 0
    predictions = []
    feats = []
    lab = []
    m = nn.Sigmoid()

    with torch.no_grad():
        for idx,(images,labels) in enumerate(tqdm(testloader,total = len(testloader))):

            images, labels = images.to(device), labels.to(device)

            outputs = our_ViT(images)
            probabilities = m(outputs)

            predicted = torch.argmax(outputs,1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            predictions.append(predicted.detach().cpu().numpy())
            # Extracting intermediate weights
            feats.append(features['final_layer'].cpu().numpy())
            lab.extend(labels.cpu())
        acc = correct/total * 100
    # Getting output values into correct format
    predictions = np.concatenate(predictions)
    feats = np.concatenate(feats)
    lab = np.array(lab)
    predictions.squeeze()
    lab.squeeze()
    return acc,predictions,feats,lab

In [None]:
def load_vitdata(DATA_PATH,batch_size = 128):


    test_dataset = torchvision.datasets.ImageFolder(DATA_PATH,transform = vit_transform)

    testloader = torch.utils.data.DataLoader(test_dataset, shuffle=False, num_workers=2, batch_size=batch_size)
    
    styles = list(test_dataset.class_to_idx.keys())
    styles_for_labels = [re.sub(r'[^A-Za-z0-9 "()" "ï" -]+', ' ', i) for i in styles]

    return testloader,styles_for_labels

In [None]:
def load_vit(MODEL_PATH):
    our_ViT = timm.create_model('vit_huge_patch14_224_in21k', pretrained = True, num_classes = 25)
    # basic pre-processing tasks for proper ViT data ingestion
    our_ViT.load_state_dict(torch.load(MODEL_PATH))
    config = resolve_data_config({}, model=our_ViT)
    vit_transform = create_transform(**config)


    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    our_ViT.to(device)
    our_ViT.eval()
    return our_ViT,vit_transform

# Effnet Load and Helper Functions

In [None]:
def predict_and_save(model, test_generator):
    preds = []
    true_labels = []
    progbar = tf.keras.utils.Progbar(len(test_generator))

    print("Predicting....")
    for batch in range(len(test_generator)):
        images, labels = test_generator.next()
        for i in range(len(images)):
            image = images[i, :, :, :]
            label = np.argmax(labels[i])
            prediction_scores = model(np.expand_dims(image, axis=0))
            pred = np.argmax(prediction_scores)
            preds.append(pred)
            true_labels.append(label)
        progbar.update(batch)

    return preds, true_labels

In [None]:
# Effnet model helper functions
def get_effnetv2(do_fine_tuning, model_name, weights = 'imagenet', unfreeze = 0):
    tf.keras.backend.clear_session()
    
    base_model = effnetv2_model.get_model(model_name, weights = weights, include_top=False)
    
    if unfreeze > 0:
        base_model = unfreeze_effnet(base_model, unfreeze)
    else:
        base_model.trainable = do_fine_tuning
        
    return base_model

#func to get overall model
def get_new_model(unfreeze = 0):
    model_name = 'efficientnetv2-l' #@param {type:'string'}
    do_fine_tuning = False
    weights = 'imagenet21k'
    image_size = 223
    tf.keras.backend.clear_session()
    model = tf.keras.models.Sequential([
        tf.keras.layers.InputLayer(input_shape=[image_size, image_size, 3]),
        get_effnetv2(do_fine_tuning, model_name, weights = weights, unfreeze = unfreeze),
        tf.keras.layers.Dropout(rate=0.2),
        tf.keras.layers.Dense(25, activation='softmax'),
    ])
    model.build((None, image_size, image_size, 3))
    model.summary()
    return model


#restore model from checkpoint
def restore_model(checkpoint_path, compile_m = True, learning_rate = 0.001, momentum = 0.9):
    model_new = get_new_model()
    
    if compile_m:
        model_new.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=momentum), 
          loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False, label_smoothing=0.1),
          metrics=['accuracy', 'top_k_categorical_accuracy'])
        
    model_new.load_weights(checkpoint_path)
    return model_new



In [6]:
def eff_transform(img):
    eff_tensor = img.resize((223, 223))
    eff_tensor = img_to_array(eff_tensor)
    eff_tensor = eff_tensor*(1/255)
    eff_tensor = eff_tensor.reshape((1,eff_tensor.shape[0],eff_tensor.shape[1],eff_tensor.shape[2]))
    return eff_tensor

In [None]:
def load_effnet(MODEL_PATH_EFF):
    effnet_model = restore_model(MODEL_PATH_EFF)
    return effnet_model

In [None]:
def load_effnetdata(DATA_PATH, batch_size = 64, image_size = 223):
    # Preprocessing test data for effnet

    datagen_kwargs = dict(rescale=1./255)
    dataflow_kwargs = dict(target_size=(image_size, image_size),batch_size=batch_size, interpolation="bilinear")

    test_datagen = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1./255)
    test_generator = test_datagen.flow_from_directory(DATA_PATH, shuffle=False, **dataflow_kwargs)
    label_map = (test_generator.class_indices)
    styles = list(label_map.keys())
    styles_for_labels = [re.sub(r'[^A-Za-z0-9 "()" "ï" -]+', ' ', i) for i in styles]

    return test_generator,styles_for_labels

# Functions for Calculating and Graphing Final Results

In [None]:
def Calc_Results(labels,predictions,art_style):
    """
    Takes in model predictions,labels, and styles, calculates and displays:
    F-score
    Accuracy (overall and by class)
    Precision
    Recall (overall and by class)
    """
    #Confusion matrix and overall model accuracy
    conf_mat = confusion_matrix(predictions,labels)
    disp = ConfusionMatrixDisplay(confusion_matrix=conf_mat,display_labels=art_style)
    fig, ax = plt.subplots(figsize=(20,20))
    ax.xaxis.set_ticks_position('top')
    ax.xaxis.set_label_position('top')
    disp.plot(ax = ax)
    plt.xticks(rotation=90)

    #Precision Recall F1 score and Accuracy
    class_report = classification_report(labels,predictions,target_names =art_style)
    print('\033[1m'+'Precision, Recall and Accuracy for All Classes:\n')
    print(class_report)

    #Table of accuracy info by class
    col_names = ["Art Style","Accuracy"]
    class_acc = confusion_matrix(labels,predictions,normalize = "true").diagonal()
    # print(class_acc)
    combined_list = list(zip(art_style,class_acc))
    combined_list = sorted(combined_list, key = operator.itemgetter(1),reverse = True)
    print('Accuracy by Class')
    print(tabulate(combined_list, headers = col_names,tablefmt="fancy_grid"))


In [None]:

def PCA_and_TSNE(feats,lab,style,pca_dimension = 50):
    '''
    Takes in predictions, true labels, art style, and desired pca dimensions from model
    Returns T-SNE graph for input classes, as well as percent variation explained by the input dimensions for PCA
    '''
     
    # Normalizing data for proper variance results
    scaler = StandardScaler()
    scaled_feats = scaler.fit_transform(feats)
    # Reduce dimensions through pca, then apply tsne to result
    pca = PCA(n_components=50)
    pca_reduction = pca.fit_transform(scaled_feats)
    var_exp = pca.explained_variance_ratio_
    (plt.figure());
    plt.bar(range(len(var_exp)),var_exp)
    plt.title('Percent of variance explianed by the nth component')
    plt.xlabel('Component');
    plt.ylabel('Percent');
    plt.show()
    
    tsne = TSNE(n_components = 2).fit_transform(pca_reduction)
    test = [style[i] for i in lab]
    df = pd.DataFrame(dict(Dimension_1 = tsne[:,0],Dimension_2 = tsne[:,1],Style = test))
    (plt.figure());
    sns.lmplot('Dimension_1','Dimension_2',data = df,hue = 'Style',fit_reg = False)
    if len(np.unique(lab)) == len(style):
        plt.title('T-SNE for All Classes')
    else:
        plt.title(f'T-SNE for Top %d Classes'%len(np.unique(lab)))
    plt.show()

In [None]:
def top_k_recall_tsne(labels,predictions,feats,styles,top_k,pca_dimension = 50):
    '''
    Takes model predictions, true labels, intermediate layer weights (feats), art style, and number of top styles
    defined by model recall, returns T-SNE for selected classes
    
    pca_dimension can be passed as optional argument to augment the level of dimension reduction in PCA_and_TSNE
    '''
    
    recalls = []

    find_tsne  = classification_report(labels,predictions,target_names =styles,output_dict=True)

    # Parsing out the top_k classes by model recall
    for i in range(len(styles)):
        recalls.append(find_tsne[styles[i]]['recall'])

    recall_index = np.argsort(recalls)
    recall_index = recall_index[-top_k:]
    recall_index = np.flip(recall_index)
    top_recalls = np.array(recalls)[recall_index]

    
    index = np.isin(labels,recall_index)
    new_feats = feats[index]
    new_label = labels[index]
    
    PCA_and_TSNE(new_feats,new_label,styles)
    

# ViT Model 

In [None]:
# Loading ViT model
our_ViT,vit_transform = load_vit(MODEL_PATH)

# set hook on final linear layer
our_ViT.pre_logits.register_forward_hook(get_features('final_layer'))

# Processing data for testing vit
testloader,styles = load_vitdata(DATA_PATH)


In [None]:
# Testing vit and getting all outputs into the right format
acc,predictions,feats,lab = test_vit(testloader)


In [None]:
# Results for ViT
Calc_Results(lab,predictions,styles)

top_k_recall_tsne(lab,predictions,feats,styles,3)

# Effnet Model

In [None]:
# Loading Effnetv2 model (tensorflow 2.7>)

effnet_model = load_effnet(MODEL_PATH_EFF)
test_generator,styles = load_effnetdata(DATA_PATH)


In [None]:
# Hooking the output of the final effnet model layer 

feature_extractor = tf.keras.Model(
    inputs=effnet_model.inputs,
    outputs=[effnet_model.layers[1].output,effnet_model.output]
)
features = feature_extractor.predict(test_generator)


In [None]:
# T-SNE for effnet_model

eff_labels = test_generator.labels
eff_predictions = np.argmax(features[1],axis = 1)
eff_features = features[0]

Calc_Results(eff_labels,eff_predictions,styles)

top_k_recall_tsne(eff_labels,eff_predictions,eff_features,styles,3)