# Aditya Sawant's Version of SPN_IP_5Shot.ipynb

### Libraries Used

- tensorflow
- sklearn
- numpy
- matplotlib
- pandas
- scipy
- tensorflow_probability


## Initial setup

### Install Libraries

In [None]:
%pip install easyfsl tensorflow sklearn numpy matplotlib scipytensorflow_probability pandas 

### Timer Function

In [None]:
from time import time

def timeIt(func):
    """
    timeIt is a decorator function to time the execution of a function.
    
    :param func: function to be timed
    :return: wrapper function
    """
    def wrap_func(*args, **kwargs):
        t1 = time()
        result = func(*args, **kwargs)
        t2 = time()
        print(f'Function {func.__name__!r} executed in {(t2-t1):.4f}s')
        return result
    return wrap_func

### Plot Data

In [36]:
# trainingData = pd.DataFrame(columns=['Loss', 'Accuracy'])
def plotData(data, testing=False):
    
    if testing:
        accuracy1_values = [item[0] for item in data]
        accuracy2_values = [item[1] for item in data]
        loss1_values = [item[2] for item in data]
        loss2_values = [item[3] for item in data]

        # Create the plot
        plt.figure(figsize=(10, 6))
        plt.plot(loss1_values, color='red', label='Loss')
        plt.plot(accuracy1_values, color='blue', label='Accuracy')
        plt.xlabel('Epoch/Episode')
        plt.ylabel('Value')
        plt.title('Loss and Overall Accuracy 1')
        plt.legend()
        plt.grid(True)
        plt.show()
        
        plt.figure(figsize=(10, 6))
        plt.plot(loss2_values, color='red', label='Loss')
        plt.plot(accuracy2_values, color='blue', label='Accuracy')
        plt.xlabel('Epoch/Episode')
        plt.ylabel('Value')
        plt.title('Loss and Overall Accuracy 2')
        plt.legend()
        plt.grid(True)
        plt.show()
    else:
        loss_values = [item[0] for item in data]
        accuracy_values = [item[1] for item in data]

        # Create the plot
        plt.figure(figsize=(10, 6))
        plt.plot(loss_values, color='red', label='Loss')
        plt.plot(accuracy_values, color='blue', label='Accuracy')
        plt.xlabel('Epoch/Episode')
        plt.ylabel('Value')
        plt.title('Loss and Accuracy')
        plt.legend()
        plt.grid(True)
        plt.show()
    clear_output(wait=True)

### Import Libraries

In [None]:
import random
import statistics
import os
from IPython.display import clear_output
  
from operator import truediv

import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import pandas as pd
import scipy.io as sio
import matplotlib.pyplot as plt
from tensorflow.python.keras import backend as K

from tensorflow.keras import Sequential, layers
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.compat.v1.distributions import Bernoulli

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score

from plotly.offline import init_notebook_mode
init_notebook_mode(connected=True)



## Global Variables

In [None]:

# Test each code block individually
TEST_BLOCKS: bool = False


# Data Loading and Preprocessing

# Dataset Used : Indian Pines
DATASET: str = 'indian_pines'
BASE_PATH: str = 'D:\\HSI FSL BE-10 Major Project\\'
PATH_TO_DATASET: str = BASE_PATH + 'Datasets\\'
NUM_CLASSES: int

# PCA
PCA_COMPONENTS: int = 30 # Number of components to keep after PCA reduction

# Window size for forming image cubes
WINDOW_SIZE: int = 11

# Image dimensions after forming image cubes
IMAGE_WIDTH: int
IMAGE_HEIGHT: int
IMAGE_DEPTH: int
IMAGE_CHANNEL: int 
IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_DEPTH, IMAGE_CHANNEL = 11, 11, 30, 1

# Model Parameters

N_TIMES = 1 # Number of times to run the model. Internally, the model is runs each episode N_TIMES times

# Learning Rate
LEARNING_RATE: float = 0.00001


# Training & Testing Parameters
TRAINING_CLASSES: list # Classes to be used for training
TRAINING_LABELS: list # Labels to be used for training

TESTING_CLASSES: list # Classes to be used for testing
TESTING_LABELS: list # Labels to be used for training

TUNNING_CLASSES: list # Declared in Main Block. Classes to be used for tunning
TUNNING_LABELS: list  # Labels to be used for tunning


TRAIN_C: int # Number of samples per class to be used for training
TRAIN_K: int # Number of patches per class to be used for support during training
TRAIN_N: int # Number of patches per class to be used for query during training

TUNE_C: int # Number of samples per class to be used for testing
TUNE_K: int # Number of patches per class to be used for support during testing
TUNE_N: int # Number of patches per class to be used for query during testing

TEST_C: int # Number of samples per class to be used for testing
TEST_K: int # Number of patches per class to be used for support during testing
TEST_N: int # Number of patches per class to be used for query during testing
# ===================================
# DO NOT REMOVE THIS.
tC = 3   # classes in a test episode 
# Don't know this yet, probably used in the model to calculate loss
MC_LOSS_WEIGHT: int = 5 
# DIRECTLY USED IN PROTOTYPICAL NETWORK CLASS IN TESTING CASE
# ===================================



# Training Epochs
TRAINING_EPOCH: int = 10

# Training Episode
TRAINING_EPISODE: int = 50

# Tunning Epochs
TUNNING_EPOCH: int = 41

# Tunning Episode
TUNNING_EPISODE: int = 100

# Testing Epochs
TESTING_EPOCH: int = 1000

# Metrics to be used for evaluation
train_loss = tf.metrics.Mean(name='train_loss')
train_acc = tf.metrics.Mean(name='train_accuracy')
tune_loss = tf.metrics.Mean(name='tune_loss')
tune_acc = tf.metrics.Mean(name='tune_accuracy')
test_loss = tf.metrics.Mean(name='test_loss')
test_acc = tf.metrics.Mean(name='test_accuracy')

trainingData = []
tunningData = []
testingData = []



checkpoint_dir = BASE_PATH + 'saves\\checkpoints\\' + DATASET + '\\' + TRAIN_C + '\\' + 'Train'
checkpoint_prefix_train = os.path.join(checkpoint_dir, "ckpt")

checkpoint_dir1 = BASE_PATH + 'saves\\checkpoints\\' + DATASET + '\\' + TRAIN_C + '\\' + 'Train\\Tune'
checkpoint_prefix_tune = os.path.join(checkpoint_dir1, "ckpt")

checkpoint = None  # To be used for loading checkpoints. Declared in the Main Block
ProtoModel = None  # Prototypical Network Object. Declared in the Main Block
model = None  # Model Object. Declared in the Main Block
optimizer = None  # Optimizer Object. Declared in the Main Block

## Data Loading and Preprocessing

In [None]:
def loadData(name: str) -> (np.ndarray, np.ndarray):
    '''
     loadData loads the data from the .mat files
     
     :param name: name of the dataset 
     :return: (data, labels)
    '''
    if(name == 'IP'):
      NUM_CLASSES: int = 16      
      # Training & Testing Parameters
      TRAINING_CLASSES: list = [1,2,4,5,7,9,10,11,13,14] # Classes to be used for training
      TRAINING_LABELS: list = list(map(lambda x: x+1, TRAINING_CLASSES)) # Labels to be used for training

      TESTING_CLASSES: list = [0,3,6,8,12,15] # Classes to be used for testing
      TESTING_LABELS: list = list(map(lambda x: x+1, TESTING_CLASSES)) # Labels to be used for training

      TUNNING_CLASSES = None # Declared in Main Block. Classes to be used for tunning
      TUNNING_LABELS = TESTING_LABELS # Labels to be used for tunning


      TRAIN_C: int = 5 # Number of samples per class to be used for training
      TRAIN_K: int = 5 # Number of patches per class to be used for support during training
      TRAIN_N: int = 15 # Number of patches per class to be used for query during training

      TUNE_C: int = 3 # Number of samples per class to be used for testing
      TUNE_K: int = 1 # Number of patches per class to be used for support during testing
      TUNE_N: int = 4 # Number of patches per class to be used for query during testing

      TEST_C: int = 3 # Number of samples per class to be used for testing
      TEST_K: int = 5 # Number of patches per class to be used for support during testing
      TEST_N: int = 5 # Number of patches per class to be used for query during testing
      data = sio.loadmat(f'{PATH_TO_DATASET}indian_pines_corrected.mat')[ f'indian_pines_corrected']
      labels = sio.loadmat(f'{PATH_TO_DATASET}indian_pines_gt.mat')[ f'indian_pines_gt'] 



    return data, labels

if(TEST_BLOCKS): 
  print(loadData('indian_pines'))

In [None]:
def applyPCA(X: np.ndarray, n_components: int = 30) -> np.ndarray:
    """
    applyPCA reduces the dimensionality of the data using PCA.
    
    :param X: The data to be reduced.
    :param n_components: The number of components to keep.
    :return: The data with reduced dimensionality.
    """
    
    pca = PCA(n_components=n_components, whiten=True) # create a PCA object
    new_X = np.reshape(X, (-1, X.shape[2])) # reshape the data into a 2D matrix
    new_X = pca.fit_transform(new_X) # fit the PCA object
    new_X = np.reshape(new_X, (X.shape[0], X.shape[1], n_components)) # reshape the data into a 3D matrix
    del pca # delete the PCA object
    return new_X
    
if(TEST_BLOCKS):
    # TODO: Implement a test for applyPCA
    pass 

In [None]:

def padWithZeros(X: np.ndarray, margin:int) -> np.ndarray:
    """
    padWithZeros pads the input array X with zero margins in height and width dimensions.
    
    :param X: input array of shape (W, H, C).
    :param margin: number of zeros to pad on each side of the height and width dimensions.
    :return: X padded with zeros of shape (W + 2*margin, H + 2*margin, C).
    """
    return np.pad(X, ((margin,margin), (margin,margin), (0,0)), 'constant', constant_values=0)
    # Previous implementation
    # newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
    # x_offset = margin
    # y_offset = margin
    # newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
    # return newX

if(TEST_BLOCKS):
    test_X = np.random.randn( 3, 3, 2)
    test_margin = 2
    print(padWithZeros(test_X, test_margin))

In [None]:
@timeIt
def createImageCubes(X: np.ndarray, Y: np.ndarray, windowSize: int) -> (np.ndarray, np.ndarray):
    """
    createImageCubes creates image cubes from the given image and label data. Only height and width are considered for the image cube creation. The depth of the image cube is the number of bands in the image. The depth is preserved from the input image.
    
    :param X: input image
    :param Y: input label
    :param windowSize: size of the image cube to be created. Height and width of the image cube is (windowSize, windowSize)
    :return (dataPatches, dataLabels): dataPatches is a list of image cubes. dataLabels is a list of labels corresponding to the image cubes in dataPatches
    
    Algorithm:
    - Calculate the margin to be padded to the image.
    - Pad the image with zeros.
    - Create image cubes from the padded image.
    - Expand the dimensions of the image cubes to include the channel dimension.
    - Create labels for the image cubes.
    - Return the image cubes and labels.
    """

    margin = int(windowSize // 2)
    zeroPaddedX = padWithZeros(X, margin)
    dataPatches = [zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1] for r in range(margin, zeroPaddedX.shape[0] - margin) for c in range(margin, zeroPaddedX.shape[1] - margin)]
    dataPatches = np.expand_dims(dataPatches, axis=-1)
    dataLabels = [Y[r-margin, c-margin] for r in range(margin, zeroPaddedX.shape[0] - margin) for c in range(margin, zeroPaddedX.shape[1] - margin)]
    return dataPatches, np.array(dataLabels)

if(TEST_BLOCKS):
    ip_x1, ip_y = loadData(DATASET)                            
    ip_x2 = applyPCA(ip_x1,n_components=30)                   
    ip_X,ip_Y = createImageCubes(ip_x2, ip_y, windowSize=IMAGE_WIDTH)
    print(ip_X.shape, ip_Y.shape)

In [None]:
def classWisePatches(X:np.ndarray, Y:np.ndarray) -> list:
    """
    classWisePatches function takes in the input image and its corresponding label and returns the patches of the image classwise.
    
    It will return a list of patches of the image classwise. For example, if the image has 3 classes, it will return a list of 3 elements, where each element is a 5D array of shape (num_patches, patch_size, patch_size, num_channels, 1)
    
    :param X: Input image
    :param Y: Corresponding label
    :return: Classwise patches of the image
    """
    patches =  [ X[Y==i,:,:,:,:] for i in range(1,NUM_CLASSES+1) ]
    return patches
    

## Model

### Model Construction


In [None]:
def createModel():
    """
    createModel() function creates the model architecture for the 3D CNN model.
    :return: model 
    
    The model architecture is as follows:
    1. Input layer
    2. 3D Convolution layer with 8 filters, kernel size (3,3,7), activation function 'relu' and padding 'same'
    3. Spatial Dropout layer with dropout rate 0.3
    4. 3D Convolution layer with 16 filters, kernel size (3,3,5), activation function 'relu' and padding 'same'
    5. Spatial Dropout layer with dropout rate 0.3
    6. 3D Convolution layer with 32 filters, kernel size (3,3,3), activation function 'relu'
    7. Reshape layer to reshape the output of 3D Convolution layer to 2D
    8. 2D Convolution layer with 64 filters, kernel size (3,3), activation function 'relu'
    9. Flatten layer to flatten the output of 2D Convolution layer
    10. Dropout layer with dropout rate 0.4
    11. Dense layer with 256 neurons and activation function 'relu'
    12. Dropout layer with dropout rate 0.4
    13. Dense layer with 128 neurons and activation function 'relu'
    14. Output layer with 128 neurons and activation function 'relu'
    
    """

    input_layer = layers.Input(shape=(IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH, IMAGE_CHANNEL))
    
    output_layer_1_conv = layers.Conv3D(filters=8, kernel_size=(3,3,7), activation='relu',input_shape=(IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_DEPTH, IMAGE_CHANNEL),padding='same')(input_layer)
    
    output_layer_1_drop3d = layers.SpatialDropout3D(rate=0.3, data_format='channels_last')(output_layer_1_conv,training=True)
    
    output_layer_2_conv = layers.Conv3D(filters=16, kernel_size=(3,3,5), activation='relu',padding='same')(output_layer_1_drop3d)
    
    output_layer_2_drop3d = layers.SpatialDropout3D(rate=0.3, data_format='channels_last')(output_layer_2_conv,training=True)
    
    output_layer_3_conv = layers.Conv3D(filters=32, kernel_size=(3,3,3), activation= 'relu')(output_layer_2_drop3d)
    
    output_layer_3_reshaped = layers.Reshape((output_layer_3_conv.shape[1], output_layer_3_conv.shape[2], output_layer_3_conv.shape[3]*output_layer_3_conv.shape[4]))(output_layer_3_conv)
    
    output_layer_4_conv = layers.Conv2D(filters=64, kernel_size=(3,3), activation='relu')(output_layer_3_reshaped)
    
    output_layer_4_flatten = layers.Flatten()(output_layer_4_conv)
    
    output_layer_4_drop = layers.Dropout(rate=0.4)(output_layer_4_flatten,training=True)
    
    output_layer_4_dense = layers.Dense(256, activation='relu')(output_layer_4_drop)
    
    output_layer_5_conv = layers.Dropout(0.4)(output_layer_4_dense,training=True)
    
    output_layer_5_dense = layers.Dense(128, activation='relu')(output_layer_5_conv)
    
    model = Model(inputs=input_layer, outputs=output_layer_5_dense)
    
    # print(model.summary())
    return model
    
    
if(TEST_BLOCKS):
    model = createModel()
    print(model.summary())
    

### Prototypical Network

In [None]:
def calc_euclidian_dists(x, y):
    """
    calc_euclidian_dists: Calculates the euclidian distance between two tensors
    :param x: Tensor of shape (n, d)
    :param y: Tensor of shape (m, d)
    :return: Tensor of shape (n, m) with euclidian distances
    """
    n = x.shape[0]
    m = y.shape[0]
    x = tf.tile(tf.expand_dims(x, 1), [1, m, 1])
    y = tf.tile(tf.expand_dims(y, 0), [n, 1, 1])
    return tf.reduce_mean(tf.math.pow(x - y, 2), 2)   

In [None]:
class Prototypical(Model):
    def __init__(self, model, w, h, d, c):
        super(Prototypical, self).__init__()
        self.w, self.h, self.d, self.c = w, h, d, c
        self.encoder = model

    def call(self, support, query, support_labels, query_labels, K, C, N,n_times,training=True):
      # support : support images (25, 11, 11, 30, 1)
      # query : query images (75, 11, 11, 30, 1)
    
      n_class = C                                                               #5
      n_support = K                                                             #5
      n_query = N                                                               #15 

      if training == True : 
        loss = 0
        mc_predictions = []                                                     # list of predictions for multiple passes
        for i in range(n_times) :     
          y = np.zeros((int(C*N),C))                                              #(75, 5)
          
          
          for i in range(int(C*N)) :                                             # 75
            x = support_labels.index(query_labels[i])                           # creation of 1-hot for true labels
            y[i][x] = 1.                                               # n_times passing every query image for calculating variance 
            #  basically we are creating OMR sheet for each query image where each column represents the class and each row represents the query image, where the true class is 1 and rest are 0
            
          cat = tf.concat([support,query], axis=0)       
        #   print('cat', cat.shape, cat)                                       # [100, 11, 11, 30, 1]
          z = self.encoder(cat)    
        #   print('z', z.shape, z)                                             # [100, 128]   # build a new computational graph from the provided inputs
          # Divide embedding into support and query
          z_prototypes = tf.reshape(z[:n_class * n_support],[n_class, n_support, z.shape[-1]])   #[5, 5, 128])
        #   print('z_p', z_prototypes.shape, z_prototypes)   
          # Prototypes are means of n_support examples
          z_prototypes = tf.math.reduce_mean(z_prototypes, axis=1)              #[5, 128]
        #   print('z_p', z_prototypes.shape, z_prototypes)   
          z_query = z[n_class * n_support:]                                     #[75, 128]                         
        #   print('z_q', z_query.shape, z_query)   
          # Calculate distances between query and prototypes
          dists = calc_euclidian_dists(z_query, z_prototypes)                   #[75, 5]
        #   print('dist', dists)   
          # log softmax of calculated distances
          log_p_y = tf.nn.log_softmax(-dists, axis=-1)                          #[75, 5]     this activation function heavily penalizes wrong class prediction as compared to its Softmax counterpart    
        #   print('log', log_p_y)   
          loss1 = -tf.reduce_mean((tf.reduce_sum(tf.multiply(y, log_p_y), axis=-1)))   #loss for the current pass                     
        #   print('loss1', loss1)                                                 # []
          loss += loss1                                                         # adding loss for each pass                   
          predictions = tf.nn.softmax(-dists, axis=-1)                         # [75, 5] prediction probability for the search-space classes per query image(for current pass)
        #   print('pred', predictions)   
          mc_predictions.append(predictions)                                          

        y = np.zeros((int(C*N),C))
        for i in range(int(C*N)) :
            x = support_labels.index(query_labels[i])                           # creation of 1-hot for true labels
            y[i][x] = 1. 
        mc_predictions = tf.convert_to_tensor(np.reshape(np.asarray(mc_predictions),(n_times,int(C*N),C)))  #(n_times,75,5)
        std_predictions = tf.math.reduce_std(mc_predictions,axis=0)                                         # (75,5)
        std = tf.reduce_sum(tf.reduce_sum(tf.multiply(std_predictions,y),axis=1))
        # print('std', std)        
        loss += MC_LOSS_WEIGHT*std
        
        # calculating mean accuracy
        mean_predictions = tf.reduce_mean(mc_predictions,axis=0)                # mean prediction probability for each class (75,5)
        mean_eq = tf.cast(tf.equal(                                             # accuracy for the current pass  c
            tf.cast(tf.argmax(mean_predictions, axis=-1), tf.int32),            # check if the index of max probability is equal to the true class index
            tf.cast(tf.argmax(y,axis=-1), tf.int32)), tf.float32)               # argmax returns the index of max probability
        mean_accuracy = tf.reduce_mean(mean_eq)
        mean_predictions = tf.reduce_mean(mc_predictions,axis=0)                # mean prediction probability for each class (5)
        return loss, mean_accuracy, mean_predictions   
      
      if training == False :
        loss = 0
        mc_predictions = []                                                     # list of predictions for multiple passes  
        for i in range(n_times) :                                               # n_times passing the query images for variance calculation
          y = np.zeros((int(C*N),C))                                            # (150,10)
          for i in range(int(C*N)) :
            x = support_labels.index(query_labels[i])                           # creation of 1-hot for the true labels
            y[i][x] = 1.  
          # merge support and query to forward through encoder
          cat = tf.concat([support,query], axis=0)                              # [200,9,9,20,1]   
          z = self.encoder(cat)                                                 # [200, 320]
          # Divide embedding into support and query
          z_prototypes = tf.reshape(z[:n_class * n_support],[n_class, n_support, z.shape[-1]])   #[10, 5, 320])
          # Prototypes are means of n_support examples
          z_prototypes = tf.math.reduce_mean(z_prototypes, axis=1)              #[10, 320]
          z_query = z[n_class * n_support:]                                     #[150, 320]                         
          # Calculate distances between query and prototypes
          dists = calc_euclidian_dists(z_query, z_prototypes)                   #[150, 10]
          # log softmax of calculated distances
          log_p_y = tf.nn.log_softmax(-dists, axis=-1)                          #[150, 10]        
          loss1 = -tf.reduce_mean((tf.reduce_sum(tf.multiply(y, log_p_y), axis=-1)))        
          loss += loss1
          predictions = tf.nn.softmax(-dists, axis=-1)                                 # prediction probabilities for the classes for current pass
          mc_predictions.append(predictions)               
          
                                        
        y = np.zeros((int(C*N),C))                                            # (150,10)
        for i in range(int(C*N)) :
            x = support_labels.index(query_labels[i])                           # creation of 1-hot for the true labels
            y[i][x] = 1.  
        mean_predictions = tf.reduce_mean(mc_predictions,axis=0)                # mean prediction probability for each class (150,10)
        mean_eq = tf.cast(tf.equal(                                             # accuracy for the current pass
            tf.cast(tf.argmax(mean_predictions, axis=-1), tf.int32), 
            tf.cast(tf.argmax(y,axis=-1), tf.int32)), tf.float32)
        mean_accuracy = tf.reduce_mean(mean_eq)
        mean_pred_index = tf.argmax(mean_predictions,axis=1)
        # mean class-wise accuracies
        mean_correct_class = [[] for i in range(tC)]
        mean_correct_pred = [[] for i in range(tC)]
        classwise_mean_acc = [[] for i in range(tC)]
        for i in range(int(C*N)):
          x = support_labels.index(query_labels[i])
          mean_correct_class[x].append('4')
          if(mean_pred_index[i] == x) :
            mean_correct_pred[x].append('4')
        for i in range(tC) :
           z = len(mean_correct_pred[i])/len(mean_correct_class[i])
           classwise_mean_acc[i].append(z)  
        #std calculation
        std = 0
        for i in range(int(C*N)) :
           x = support_labels.index(query_labels[i])
           p_i = np.array([p[i,:] for p in mc_predictions])
           std_i = tf.math.reduce_std(p_i,axis=0) 
           std_i_true = std_i[x]
           std += std_i_true                                                    # adding std of each class
        # print('std',std)
        loss += MC_LOSS_WEIGHT*std 
        y = np.zeros((int(C*N),C))                                            # (150,10)
        for i in range(int(C*N)) :
            x = support_labels.index(query_labels[i])                           # creation of 1-hot for the true labels
            y[i][x] = 1.                                                                
        return loss, mc_predictions, mean_accuracy, classwise_mean_acc, y


      def save(self, model_path):
        self.encoder.save(model_path)

      def load(self, model_path):
        self.encoder(tf.zeros([1, self.w, self.h, self.c]))
        self.encoder.load_weights(model_path)

## Data Loaders for Model

### For Training

In [None]:
def createTrainingEpisode(patches:list, labels:list, K:int, C:int, N:int ):
    """
    createTrainingEpisode creates a training episode for the N-way K-shot learning task.
    
    :param patches: list of all patches classified into different classes.
    :param labels: list of classes from which the traning episode is to be created.
    :param K: number of patches per class in the support set.
    :param C: number of classes in the training episode.
    :param N: number of patches per class in the query set.
    :return queryPatches, queryLabels, supportPatches, supportLabels: training episode
    
    Algorithm:
    - Select N classes from the list of labels. They should be unique.
    - For each class, select K+Q patches. They should be unique.
        - First K patches are support patches.
        - Last Q patches are query patches.
        - Append the support patches to supportPatches.
        - Append the query patches to queryPatches.
        - Append the class label to queryLabels Q times.
    - Shuffle the queryPatches and queryLabels in the same order.
    - Convert the queryPatches and supportPatches to tensors.
    
    """
    
    selectedLabels = random.sample(labels, C)
    supportPatches = []
    supportLabels = list(selectedLabels)
    queryPatches = []
    queryLabels = []
    
    for n in selectedLabels:
        sran_indices = np.random.choice(len(patches[n-1]),K,replace=False)  # for class no X-1: select K samples 
        supportPatches.extend( patches[n-1][sran_indices,:,:,:,:])
        qran_indices = np.random.choice(len(patches[n-1]),N,replace=False)  # N Samples for Query
        queryPatches.extend(patches[n-1][qran_indices,:,:,:,:])
        queryLabels.extend([n]*N)
    
    shuffled = list(zip(queryPatches, queryLabels))
    random.shuffle(shuffled)
    queryPatches, queryLabels = zip(*shuffled)
    
    queryPatches = tf.convert_to_tensor(np.reshape(np.asarray(queryPatches),(C*N,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_DEPTH,IMAGE_CHANNEL)),dtype=tf.float32)
    supportPatches = tf.convert_to_tensor(np.reshape(np.asarray(supportPatches),(C*K,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_DEPTH,IMAGE_CHANNEL)),dtype=tf.float32)
    
    return queryPatches, queryLabels, supportPatches, supportLabels


In [None]:
def train_step(support, query, support_labels, query_labels, K, C, N):
    # Forward & update gradients
    with tf.GradientTape() as tape:
        loss, mean_accuracy, mean_predictions = ProtoModel(support, query, support_labels, query_labels, K, C, N,N_TIMES,training=True)
    gradients = tape.gradient(loss, model.trainable_variables)
    
    # A gradient simply measures the change in all weights with regard to the change in error. You can also think of a gradient as the slope of a function. The higher the gradient, the steeper the slope and the faster a model can learn. But if the slope is zero, the model stops learning
    
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Log loss and accuracy for step
    train_loss(loss)
    train_acc(mean_accuracy)


In [37]:
@timeIt
def trainingEpochs(patches, labels, n_epochs, n_episodes):
    """
    trainingEpochs function trains the model for n_epochs and n_episodes.
    
    :param patches: image patches to be trained
    :param labels: corresponding labels to be used
    :param n_epochs: number of epochs
    :param n_episodes: number of episodes
    :return: None
    """
    
    template = 'Epoch {}/{}, Episode {}/{}, Train Loss: {:.2f}, Train Accuracy: {:.2f}'
    
    for epoch in range(n_epochs):
        train_loss.reset_states()
        train_acc.reset_states()
        
        for episode in range(n_episodes):
            queryPatches, queryLabels, supportPatches, supportLabels = createTrainingEpisode(patches, labels, TRAIN_K, TRAIN_C, TRAIN_N)
            train_step(supportPatches, queryPatches,supportLabels,  queryLabels, TRAIN_K, TRAIN_C, TRAIN_N)
            trainingData.append([train_loss.result(),  train_acc.result()*100])
            print(template.format(epoch+1, n_epochs, episode+1, n_episodes, train_loss.result(), train_acc.result()*100))
            plotData(trainingData)
        if(epoch and epoch % 5 == 0):
            checkpoint.save(file_prefix=checkpoint_prefix_train)    
        

### For Tuning

In [None]:
def createTunningEpisodes(patches:list, labels:list, K:int, C:int, N:int):
    """
    createTuningEpisodes creates a tuning episode for the N-way K-shot learning task.
    
    :param patches: list of all patches classified into different classes.
    :param labels: list of classes from which the tuning episode is to be created.
    :param K: number of patches per class in the support set.
    :param C: number of classes in the tuning episode.
    :param N: number of patches per class in the query set.
    :return queryPatches, queryLabels, supportPatches, supportLabels: tuning episode
    
    Algorithm:
    - Select C classes from the list of labels. They should be unique.
    - For each selected class.
        - Shuffle the patches of that class.
        - First K patches are support patches.
        - Next N patches are query patches. 
        - Append the support patches to supportPatches.
        - Append the query patches to queryPatches.
        - Append the class label to queryLabels N times.
    - Shuffle the queryPatches and queryLabels in the same order.
    - Convert the queryPatches and supportPatches to tensors.
    
    """

    selected_classes = np.random.choice(labels,C,replace=False)
    supportLabels  = list(selected_classes)
    queryLabels = []
    supportPatches = []
    queryPatches = []
    
    for x in selected_classes :
        y = labels.index(x)
        np.random.shuffle(patches[y])    
        supportPatches.extend(patches[y][:K,:,:,:,:])  # 1st K patches for support set
        queryPatches.extend(patches[y][K:K+N,:,:,:,:])   # next N patches for query set
        queryLabels.extend([x]*N)            
          # next 5 labels for query set
    
    shuffled = list(zip(queryPatches, queryLabels))
    random.shuffle(shuffled)
    queryPatches, queryLabels = zip(*shuffled)
    
    queryPatches = tf.convert_to_tensor(np.reshape(np.asarray(queryPatches),(C*N,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_DEPTH,IMAGE_CHANNEL)),dtype=tf.float32)
    supportPatches = tf.convert_to_tensor(np.reshape(np.asarray(supportPatches),(C*K,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_DEPTH,IMAGE_CHANNEL)),dtype=tf.float32)
    
    return queryPatches, queryLabels, supportPatches, supportLabels
    

In [None]:
def tune_step(support, query, support_labels, query_labels, K, C, N):
    # Forward & update gradients
    with tf.GradientTape() as tape:
        loss, mean_accuracy, mean_predictions = ProtoModel(support, query, support_labels, query_labels, K, C, N,N_TIMES,training=True)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Log loss and accuracy for step
    tune_loss(loss)
    tune_acc(mean_accuracy)

In [None]:
@timeIt
def tunningEpochs(patches, labels, n_epochs, n_episodes):
    """
    trainingEpochs function trains the model for n_epochs and n_episodes.
    
    :param patches: image patches to be trained
    :param labels: corresponding labels to be used
    :param n_epochs: number of epochs
    :param n_episodes: number of episodes
    :return: None
    """
    template = 'Epoch {}/{}, Tune Loss: {:.2f}, Tune Accuracy: {:.2f}'

    for epoch in range(n_epochs): 
        tune_loss.reset_states()  
        tune_acc.reset_states()    
        for epi in range(n_episodes+1):  
            queryPatches, queryLabels, supportPatches, supportLabels = createTunningEpisodes(patches, labels, TUNE_K, TUNE_C, TUNE_N)    
            tune_step(supportPatches, queryPatches,supportLabels, queryLabels, TUNE_K, TUNE_C, TUNE_N)      
            
        print(template.format(epoch+1, n_epochs,tune_loss.result(),tune_acc.result()*100))
        tunningData.append([tune_loss.result(),  tune_acc.result()*100])
        plotData(tunningData)
        if (epoch+1)%5 == 0 :
            checkpoint.save(file_prefix = checkpoint_prefix_tune) 

### For Testing

In [None]:
def createTestingEpisode(patches, labels, K, C, i, f):
    selected_classes = labels[i:f]   # [1, 2, 3, 4, 5, 6, 7, 8]
    support_labels = list(selected_classes)
    query_labels = []
    support_patches = []
    query_patches = []
    for x in selected_classes :
        y = labels.index(x)
        support_imgs = patches[y][:K,:,:,:,:]
        query_imgs = patches[y][K:,:,:,:,:]
        support_patches.extend(support_imgs)
        query_patches.extend(query_imgs)
        for i in range(query_imgs.shape[0]) :
            query_labels.append(x)
    temp1 = list(zip(query_patches, query_labels)) 
    random.shuffle(temp1) 
    query_patches, query_labels = zip(*temp1)
    x = len(query_labels)
    query_patches = tf.convert_to_tensor(np.reshape(np.asarray(query_patches),(x,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_DEPTH,IMAGE_CHANNEL)),dtype=tf.float32)
    support_patches = tf.convert_to_tensor(np.reshape(np.asarray(support_patches),(C*K,IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_DEPTH,IMAGE_CHANNEL)),dtype=tf.float32)
    return query_patches, support_patches, query_labels, support_labels,x   

In [None]:
def test_step(support, query, support_labels, query_labels, K, C, y):
    loss, mc_predictions, mean_accuracy, classwise_mean_acc, y = ProtoModel(support, query, support_labels, query_labels, K, C, y,N_TIMES,training=False)
    return loss, mc_predictions, mean_accuracy, classwise_mean_acc, y

In [None]:
@timeIt
def testingEpochs(patches, labels, n_epochs):
    """
    testingEpochs function tests the model for n_epochs.
    
    :param patches: image patches to be trained
    :param labels: corresponding labels to be used
    :param n_epochs: number of epochs
    :return: None
    """
    for epoch in range(n_epochs):
        test_loss.reset_states()  
        test_acc.reset_states()     
        
        tquery_patches1, tsupport_patches1, query_labels1, support_labels1, x1 = createTestingEpisode(patches,labels,TEST_K,TEST_C,0,3)    
        loss1, mc_predictions1, mean_accuracy1, classwise_mean_acc1, y1 = test_step(tsupport_patches1, tquery_patches1,support_labels1, query_labels1, TEST_K, TEST_C, y=x1/3) 
        tquery_patches2, tsupport_patches2, query_labels2, support_labels2, x2 = createTestingEpisode(patches,labels,TEST_K,TEST_C,3,6)    
        loss2, mc_predictions2, mean_accuracy2, classwise_mean_acc2, y2 = test_step(tsupport_patches2, tquery_patches2,support_labels2, query_labels2, 5, 3, x2/3)
        print("=========================================")
        print(f"Epoch {epoch+1}/{n_epochs}")
        print("-----------------------------------------")
        print(f"Overall Accuracy 1 (OA1): {mean_accuracy1}")
        # Class Wise Accuracy
        for i in range(TEST_C):
            print(f"Class {i+1} Accuracy: {classwise_mean_acc1[i]}")
        print(f"Loss: {loss1.numpy():.3f}")
        print("-----------------------------------------")
        print(f"Overall Accuracy 2 (OA2): {mean_accuracy2}")
        # Class Wise Accuracy
        for i in range(TEST_C):
            print(f"Class {i+1+TEST_C} Accuracy: {classwise_mean_acc2[i]}")
        print(f"Loss: {loss2.numpy():.3f}")
        print("=========================================")
        
        testingData.append([mean_accuracy1*100, mean_accuracy2*100, loss1.numpy(), loss2.numpy()])
        plotData(testingData, testing=True)
        
    return mc_predictions1, mc_predictions2, y1, y2

## Statistics

### Overall Accuracy

In [None]:
@timeIt
def calculateAccuracy(mc_predictions1, mc_predictions2, y1, y2):
    """
    Calculates the accuracy of the model, given the model's predictions and the ground truth labels.
    """
    mean_predictions1 =  tf.reduce_mean(mc_predictions1,axis=0)
    mean_predictions2 =  tf.reduce_mean(mc_predictions2,axis=0)
    overall_predictions = tf.concat([mean_predictions1,mean_predictions2],axis=0)
    overall_true_labels = tf.concat([y1,y2],axis=0)
    correct_pred = tf.cast(tf.equal(                                             # accuracy for the current pass
                tf.cast(tf.argmax(overall_predictions, axis=-1), tf.int32), 
                tf.cast(tf.argmax(overall_true_labels,axis=-1), tf.int32)), tf.float32)
    o_acc = tf.reduce_mean(correct_pred) 
    print(f"Overall accuracy:{o_acc.numpy():.3f}")

### Confusion Matrix

In [None]:
@timeIt
def calculateConfusionMatrix(mc_predictions1, mc_predictions2, y1, y2):
    mean_predictions1 =  tf.reduce_mean(mc_predictions1,axis=0)
    cm_pred1 = tf.argmax(mean_predictions1, axis=-1)
    mean_predictions2 =  tf.reduce_mean(mc_predictions2,axis=0)
    cm_pred2 = tf.argmax(mean_predictions2, axis=-1) + 3
    overall_predictions = tf.concat([cm_pred1,cm_pred2],axis=0)
    cm_true1 = tf.argmax(y1,axis=-1)
    cm_true2 = tf.argmax(y2,axis=-1) + 3
    overall_true_labels = tf.concat([cm_true1,cm_true2],axis=0)
    results = confusion_matrix(overall_true_labels,overall_predictions) 
    print ('Confusion Matrix :')
    print(results) 
    print ('Report : ')
    print (classification_report(overall_true_labels, overall_predictions))    
    return overall_true_labels, overall_predictions

### Cohen Kappa Score

In [None]:
@timeIt
def calculateKappaScore(overall_true_labels, overall_predictions):
    print("Cohen's Kappa Score: ", cohen_kappa_score(overall_true_labels, overall_predictions))

## Main

In [None]:
# Load Dataset
X, Y = loadData(DATASET)

In [None]:

# Apply PCA to reduce dimensionality of the data
X_pca = applyPCA(X, n_components=PCA_COMPONENTS)

In [38]:
# clear_output(wait=True)


# Apply PCA to reduce dimensionality of the data
X_pca = applyPCA(X, n_components=PCA_COMPONENTS)

# Create Image Cubes from data, and labels
X_cubes, Y_cubes = createImageCubes(X_pca, Y, windowSize=WINDOW_SIZE)

# Creates class wise patches from the image cubes
patches = classWisePatches(X_cubes, Y_cubes)

# Seperate the patches into train and test sets
TRAINING_PATCHES: list = [patches[i] for i in TRAINING_CLASSES]
TESTING_PATCHES: list = [patches[i] for i in TESTING_CLASSES]

# Create instance of the model
model = createModel()

# Create instance of the Prototypical Network
ProtoModel = Prototypical(model, IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_DEPTH, IMAGE_CHANNEL)

# Create instance of the Optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)

# Create instance of the Checkpoint
checkpoint = tf.train.Checkpoint(optimizer=optimizer, ProtoModel = ProtoModel)

# Train the model
trainingEpochs(patches, TRAINING_LABELS, TRAINING_EPOCH, TRAINING_EPISODE)

TUNNING_PATCHES = [TESTING_PATCHES[i][:5,:,:,:,:] for i in range(6)]

tunningEpochs(TUNNING_PATCHES, TESTING_LABELS, TUNNING_EPOCH, TUNNING_EPISODE)

mc_predictions1, mc_predictions2, y1, y2 =  testingEpochs(TESTING_PATCHES, TESTING_LABELS, TESTING_EPOCH)

calculateAccuracy(mc_predictions1, mc_predictions2, y1, y2)

overall_true_labels, overall_predictions = calculateConfusionMatrix(mc_predictions1, mc_predictions2, y1, y2)

calculateKappaScore(overall_true_labels, overall_predictions)


KeyboardInterrupt: 