# Imports

In [None]:
!pip install tensorflow==2.10.1  -q gwpy

In [None]:
!pip uninstall matplotlib
!pip install matplotlib==3.1.3

In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
from google.colab import drive
import tensorflow as tf
import numpy as np
from typing import Tuple
import albumentations as A
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.image as img
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import ImageDataGenerator,load_img,img_to_array,array_to_img
from sklearn.metrics import confusion_matrix
from keras.callbacks import *
import random
from keras import backend as K
from PIL import Image
from skimage.measure import label as label_fn
import cv2
import shutil
from tensorflow.keras.preprocessing.image import ImageDataGenerator
tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)
import albumentations as A
from itertools import combinations
import json
from sklearn.utils import class_weight
from sklearn.metrics import classification_report
from sklearn import svm
from skimage import filters
import scipy
from sklearn.model_selection import StratifiedKFold

# Mount the My Drive folder

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Env setup

In [None]:
%cd /content/drive/MyDrive/AAIB
PATH = '/content/drive/MyDrive/tuberculosis-pneumonia-classification'

In [None]:
SEED = 4224
tf.random.set_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)

labels_path = 'data/labels_train_clean.csv'
all_data_no_duplicates_path = 'data/train_all_no_duplicates'
clean_data_path = 'data/train_clean/'
noisy_data_path = 'data/train_noisy/'

train_percentage = 0.8
validation_percentage = 0.15
test_percentage = 0.2
img_size = (224,224)
batch_size = 16

#Data generator

In [None]:
class CustomGenerator(tf.keras.utils.Sequence):
  """
    CustomGenerator inheriting from tf.keras.utils.Sequence.

    We have to implement 3 main methods:
      - __init__: save dataset params like directory, filenames, etc.
      - __len__: return the total number of samples in the dataset (number of batches)
      - __getitem__: return a single batch of paired images masks
  """

  def __init__(self, 
               dataframe, # dataframe of the dataset  
               base_path,
               preprocessing_function=None, # Preprocessing function (e.g., the one used for transfer learning)
               batch_size=16, # Batch size
               out_shape = (100,100),
               shuffle=False,
               categorical = True,
               augment = False,
               seed = SEED,
               flow_from_directory = True,
               preprocess_input = False):
    
    # Get all filenames
    if isinstance(base_path, Tuple):
      self.filenames = []
      for p in base_path:

        paths = self.folderToPaths(p, full_path = False)

        for pa in paths:
          if pa in set(dataframe.file):
            self.filenames.append(os.path.join(p, pa))


    else:
        self.filenames = [os.path.join(base_path, img_path) for img_path in list(dataframe.file)]

    self.labels = tfk.utils.to_categorical(list(dataframe.label)) if categorical else list(dataframe.label)

    # Set indices list in [0, len(subset_filenames)]
    self.indices = np.arange(len(self.filenames))

    # Save dataset parameters as class attributes
    self.base_path = base_path
    self.preprocessing_function = preprocessing_function
    self.out_shape = out_shape
    self.batch_size = batch_size
    self.shuffle = shuffle
    self.augment = augment
    self.seed = seed
    self.flow_from_directory =flow_from_directory
    self.data_augmentation = A.Compose([
    A.RandomBrightnessContrast(brightness_limit = 0.05, contrast_limit=0.05, p=0.5),
    A.ShiftScaleRotate(p = 0.8, rotate_limit = 20, scale_limit = 0.3, border_mode =  cv2.BORDER_CONSTANT, value = 0),
    A.CLAHE(p=0.2)
    ])
    self.preprocess_input = preprocess_input
    
    if preprocess_input:
      self.noisyClahe = cv2.createCLAHE(clipLimit = 300, tileGridSize = (50, 50))
      self.blurredClahe = cv2.createCLAHE(clipLimit = 1.8, tileGridSize = (4, 4))

    if not self.flow_from_directory:
      self.images = self.load_all_imgs()

  def augmentation(self, images):
    return self.data_augmentation(image = images)


  def __filterNoisyOnClahe(self, image):
    im1 = cv2.resize(image, (400, 400))
    im1 = scipy.ndimage.gaussian_laplace(im1, sigma = 6)
    im1 = self.noisyClahe.apply(im1)
    var1 = np.var(im1)
    if var1 > 800:
      image= cv2.medianBlur(image, ksize=5)
      return scipy.ndimage.uniform_filter(image, size=3)
    else:
      return image


  def __sharpenImage(self, image):
    sharpen_kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])

    sharpened = cv2.filter2D(image, -1, sharpen_kernel)

    return sharpened

  def __invert_image(self, img):

    otsu_thresh = filters.threshold_otsu(img)
    masked_image = (img > otsu_thresh) * 1.0
    valsROI1, _ = np.histogram(mask[220:244, 220:244], bins=2, range=(0, 1))
    valsROI2, _ = np.histogram(mask[0:25, 0:25], bins=2, range=(0, 1))
    valsROI3, _ = np.histogram(mask[0:25, 220:244], bins=2, range=(0, 1))
    valsROI4, _ = np.histogram(mask[220:244, 0:25], bins=2, range=(0, 1))
    
    valsTot = valsROI1 + valsROI2 + valsROI3 + valsROI4
    
    labels = label_fn(masked_image)

    if len(np.unique(labels)) < 100:
        if valsTot[0] > valsTot[1]:
            return img
        else:
            return 255 - img
    return img


  def __filterBlurred(self, image):
    minThresh = 2
    im1 = cv2.resize(image, (400, 400))
    im1 = scipy.ndimage.gaussian_laplace(im1, sigma = 2)
    im1 = self.blurredClahe.apply(im1)
    _, im1 = cv2.threshold(im1, minThresh, 255, cv2.THRESH_BINARY)
    var1 = np.var(im1)
    if var1 < 5:
        return self.__sharpenImage(image)
    else:
        return image

  def __filterUnderexposed(self, image):
    im1 = cv2.resize(image, (400, 400))
    mean1 = np.mean(im1)
    if mean1 < 71:
      clahe = cv2.createCLAHE(clipLimit = 2, tileGridSize = (2, 2))
      image = clahe.apply(image)
      return image
    else:
      return image


  def preprocess(self, image):

    image = self.__invert_image(image)
    image = self.__filterUnderexposed(image)
    image = self.__filterNoisyOnClahe(image)
    image = self.__filterBlurred(image)
   

    return image

  def __len__(self):
    # Return the length of the dataset (number of batches)
    # that is given by #images // batch_size
    return len(self.filenames) // self.batch_size

  def on_epoch_begin(self):
    # Shuffle indices after each epoch
    if self.shuffle == True:
        np.random.shuffle(self.indices)

  def load_all_imgs(self):
      images = []
      for f in self.filenames:
        image = cv2.imread(f, 0)
        image = cv2.resize(image, (self.out_shape))
        if self.preprocess_input:
          image = self.preprocess(image)
        images.append(image)

      return np.array(images)

  def get_image_and_label(self, index):

    if not self.flow_from_directory:
      image = self.images[index]
      if self.augment:
        image = self.augmentation(image)
      image = np.squeeze(image)
      image = np.stack([image, image, image], axis = -1)
      curr_label = self.labels[index]
    else:
      curr_filename = self.filenames[index] # Get filename at index
      curr_label = self.labels[index]
      image = cv2.imread(curr_filename, 0)
      image = cv2.resize(image, (self.out_shape))
      if self.preprocess_input:
        image = self.preprocess(image)

      if self.augment:
        image = self.augmentation(image)['image']

      image = np.stack([image, image, image], axis = -1)


    return image, curr_label

  def __getitem__(self, index):
    # In this function we generate a batch (of size self.batch_size) of images and corresponding masks
    
    # Get 'self.batch_size' indices
    current_indices = self.indices[index*self.batch_size:(index*self.batch_size)+self.batch_size]

    """if len(current_indices) == 0:
      current_indices = self.indices[len(self.indices)-self.batch_size:len(self.indices)]"""

    # Init lists that will contain images and masks
    batch_images = []
    batch_labels = []

    # Cycle over the indices
    for idx in current_indices:
      # Get single image/mask at index 'idx'
      image, label = self.get_image_and_label(idx)

      # Apply the preprocessing function
      if self.preprocessing_function is not None:
        image = self.preprocessing_function(image)

      # Append both image and mask (with added batch dimension) to the corresponding batch lists
      batch_images.append(np.expand_dims(image, 0))
      batch_labels.append(label)
     
    # Finally, obtain a final batch by concatenating all the images over the batch dimension
    batch_images = np.concatenate(batch_images, axis=0)
    batch_labels = np.array(batch_labels)

    return batch_images, batch_labels


  def folderToPaths(
        self,
        full_img_dir,
        full_path = True
):

    x_paths_list = []

    full_img_dir = full_img_dir

    for full in os.listdir(full_img_dir):
         if full_path:
            x_paths_list.append(os.path.join(full_img_dir, full))
         else:
          x_paths_list.append(full)
    
    x_paths_list.sort()
    return x_paths_list

#Cyclical LR

In [None]:
class CyclicLR(Callback):
    """This callback implements a cyclical learning rate policy (CLR).
    The method cycles the learning rate between two boundaries with
    some constant frequency, as detailed in this paper (https://arxiv.org/abs/1506.01186).
    The amplitude of the cycle can be scaled on a per-iteration or 
    per-cycle basis.
    This class has three built-in policies, as put forth in the paper.
    "triangular":
        A basic triangular cycle w/ no amplitude scaling.
    "triangular2":
        A basic triangular cycle that scales initial amplitude by half each cycle.
    "exp_range":
        A cycle that scales initial amplitude by gamma**(cycle iterations) at each 
        cycle iteration.
    For more detail, please see paper.
    
    # Example
        ```python
            clr = CyclicLR(base_lr=0.001, max_lr=0.006,
                                step_size=2000., mode='triangular')
            model.fit(X_train, Y_train, callbacks=[clr])
        ```
    
    Class also supports custom scaling functions:
        ```python
            clr_fn = lambda x: 0.5*(1+np.sin(x*np.pi/2.))
            clr = CyclicLR(base_lr=0.001, max_lr=0.006,
                                step_size=2000., scale_fn=clr_fn,
                                scale_mode='cycle')
            model.fit(X_train, Y_train, callbacks=[clr])
        ```    
    # Arguments
        base_lr: initial learning rate which is the
            lower boundary in the cycle.
        max_lr: upper boundary in the cycle. Functionally,
            it defines the cycle amplitude (max_lr - base_lr).
            The lr at any cycle is the sum of base_lr
            and some scaling of the amplitude; therefore 
            max_lr may not actually be reached depending on
            scaling function.
        step_size: number of training iterations per
            half cycle. Authors suggest setting step_size
            2-8 x training iterations in epoch.
        mode: one of {triangular, triangular2, exp_range}.
            Default 'triangular'.
            Values correspond to policies detailed above.
            If scale_fn is not None, this argument is ignored.
        gamma: constant in 'exp_range' scaling function:
            gamma**(cycle iterations)
        scale_fn: Custom scaling policy defined by a single
            argument lambda function, where 
            0 <= scale_fn(x) <= 1 for all x >= 0.
            mode paramater is ignored 
        scale_mode: {'cycle', 'iterations'}.
            Defines whether scale_fn is evaluated on 
            cycle number or cycle iterations (training
            iterations since start of cycle). Default is 'cycle'.
    """

    def __init__(self, base_lr=0.001, max_lr=0.006, step_size=2000., mode='triangular',
                 gamma=1., scale_fn=None, scale_mode='cycle'):
        super(CyclicLR, self).__init__()

        self.base_lr = base_lr
        self.max_lr = max_lr
        self.step_size = step_size
        self.mode = mode
        self.gamma = gamma
        if scale_fn == None:
            if self.mode == 'triangular':
                self.scale_fn = lambda x: 1.
                self.scale_mode = 'cycle'
            elif self.mode == 'triangular2':
                self.scale_fn = lambda x: 1/(2.**(x-1))
                self.scale_mode = 'cycle'
            elif self.mode == 'exp_range':
                self.scale_fn = lambda x: gamma**(x)
                self.scale_mode = 'iterations'
        else:
            self.scale_fn = scale_fn
            self.scale_mode = scale_mode
        self.clr_iterations = 0.
        self.trn_iterations = 0.
        self.history = {}
        self._reset()

    def _reset(self, new_base_lr=None, new_max_lr=None,
               new_step_size=None):
        """Resets cycle iterations.
        Optional boundary/step size adjustment.
        """
        if new_base_lr != None:
            self.base_lr = new_base_lr
        if new_max_lr != None:
            self.max_lr = new_max_lr
        if new_step_size != None:
            self.step_size = new_step_size
        self.clr_iterations = 0.
        
    def clr(self):
        cycle = np.floor(1+self.clr_iterations/(2*self.step_size))
        x = np.abs(self.clr_iterations/self.step_size - 2*cycle + 1)
        if self.scale_mode == 'cycle':
            return self.base_lr + (self.max_lr-self.base_lr)*np.maximum(0, (1-x))*self.scale_fn(cycle)
        else:
            return self.base_lr + (self.max_lr-self.base_lr)*np.maximum(0, (1-x))*self.scale_fn(self.clr_iterations)
        
    def on_train_begin(self, logs={}):
        logs = logs or {}

        if self.clr_iterations == 0:
            K.set_value(self.model.optimizer.lr, self.base_lr)
        else:
            K.set_value(self.model.optimizer.lr, self.clr())        
            
    def on_batch_end(self, epoch, logs=None):
        
        logs = logs or {}
        self.trn_iterations += 1
        self.clr_iterations += 1

        self.history.setdefault('lr', []).append(K.get_value(self.model.optimizer.lr))
        self.history.setdefault('iterations', []).append(self.trn_iterations)

        for k, v in logs.items():
            self.history.setdefault(k, []).append(v)
        
        K.set_value(self.model.optimizer.lr, self.clr())

#Data loading (all data, no duplicates)

In [None]:
def encode(x):
  if x == 'N':
    return 0
  elif x == 'P':
    return 1
  else:
    return 2

In [None]:
def folderToPaths(
        full_img_dir,
):

    x_paths_list = []

    full_img_dir = full_img_dir

    for full in os.listdir(full_img_dir):
         x_paths_list.append(os.path.join(full_img_dir, full))
    
    x_paths_list.sort()
    return x_paths_list

In [None]:
labelsDF = pd.read_csv(labels_path)
display(labelsDF.head(20))

In [None]:
labelsDF.label = labelsDF.label.apply(lambda x: encode(x))
display(labelsDF.head(20))

In [None]:
len(set(labelsDF.file)) # 1 acquisition per patienty

In [None]:
all_data_no_duplicates_path_list = folderToPaths(full_img_dir = all_data_no_duplicates_path)
"""clean_data_path_list = folderToPaths(full_img_dir = clean_data_path)
noisy_data_path_lust = folderToPaths(full_img_dir = noisy_data_path)"""

In [None]:
len(all_data_no_duplicates_path_list)

In [None]:
train_val, test = train_test_split(labelsDF, test_size = test_percentage, shuffle = True, stratify = labelsDF.label, random_state=SEED)

In [None]:
test_gen = CustomGenerator(dataframe = test, base_path = 'data/train_all_no_duplicates', batch_size = batch_size, out_shape = img_size, shuffle = True, flow_from_directory=True, preprocess_input = True, categorical = True, augment = False)

In [None]:
dataset_labels = np.array(list(set(labelsDF.label)), dtype=int)

In [None]:
iterator = iter(test_gen)
images, labels = next(iterator)
fig, axis = plt.subplots(4, 4, figsize = (20, 20))

axis = axis.flatten()

for i in range(images.shape[0]):
  axis[i].imshow(images[i].squeeze(), cmap='gray')
  axis[i].set_axis_off()

plt.show()

# SVM
code adapted from https://github.com/AryaAftab/svm-tensorflow

In [None]:
import sys
if 'ipykernel' in sys.modules:
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm

import math
import tensorflow as tf
from tensorflow.keras import callbacks


class ShowProgress(callbacks.Callback):
    def __init__(self, epochs, step_show=1, metric="accuracy"):
        super(ShowProgress, self).__init__()
        self.epochs = epochs
        self.step_show = step_show
        self.metric = metric

    def on_train_begin(self, logs=None):
        self.pbar = tqdm(range(self.epochs))

    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % self.step_show == 0:

            self.pbar.set_description(f"""Epoch : {epoch + 1} / {self.epochs}, 
            Train {self.metric} : {round(logs[self.metric], 4)}, 
            Valid {self.metric} : {round(logs['val_' + self.metric], 4)}""")

            self.pbar.update(self.step_show)

            
class BestModelWeights(callbacks.Callback):
    def __init__(self, metric="val_accuracy", metric_type="max"):
        super(BestModelWeights, self).__init__()
        self.metric = metric
        self.metric_type = metric_type
        if self.metric_type not in ["min", "max"]:
                raise NameError('metric_type must be min or max')

    def on_train_begin(self, logs=None):
        if self.metric_type == "min":
            self.best_metric = math.inf
        else:
            self.best_metric = -math.inf
        self.best_epoch = 0
        self.model_best_weights = None
        
    def on_epoch_end(self, epoch, logs=None):
        if self.metric_type == "min":
            if self.best_metric >= logs[self.metric]:
                self.model_best_weights = self.model.get_weights()
                self.best_metric = logs[self.metric]
                self.best_epoch = epoch
        else:
            if self.best_metric <= logs[self.metric]:
                self.model_best_weights = self.model.get_weights()
                self.best_metric = logs[self.metric]
                self.best_epoch = epoch

    def on_train_end(self, logs=None):
        self.model.set_weights(self.model_best_weights)
        print(f"\nBest weights is set, Best Epoch was : {self.best_epoch+1}\n")

In [None]:
import types

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers



#classes

class LinearSVC(layers.Layer):
    def __init__(self, num_classes=2, **kwargs):
        super(LinearSVC, self).__init__(**kwargs)
        self.num_classes = num_classes
    
        self.reg_loss = lambda weight : 0.5 * tf.reduce_sum(tf.square(weight))

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.num_classes),
            initializer=tf.random_normal_initializer(stddev=0.1),
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.num_classes,), initializer=tf.constant_initializer(value=0.1),
            trainable=True
        )

        

    def call(self, inputs):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        loss = self.reg_loss(self.w)
        self.add_loss(loss)
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(LinearSVC, self).get_config()
        config.update({"num_classes": self.num_classes})
        return config




class SVMTrainer(tf.keras.Model):
    def __init__(
        self,
        num_class,
        C=1.0,
        bone=None,
        name="SVMTrainer",
        **kwargs
    ):
        super(SVMTrainer, self).__init__(name=name, **kwargs)
    
        self.num_class = num_class

        if bone is None:
            self.bone = lambda x: tf.identity(x)
        else:
            self.bone = bone

        self.linear_svc = LinearSVC(self.num_class)
        self.C = C
        
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")
    
    
    def svc_loss(self, y_true, y_pred, sample_weight, reg_loss):
        
        loss = tf.keras.losses.categorical_hinge(y_true ,y_pred)
        if sample_weight is not None:
            loss = sample_weight * loss
        
        return reg_loss + self.C * loss
    
    
    def compile(self, **kwargs):
        super(SVMTrainer, self).compile(**kwargs)
        self.compiled_loss = None
    
    
    def call(self, x, training=False):
        x = self.bone(x)
        x = self.linear_svc(x)
        return x

    
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.svc_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                reg_loss=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        if self.num_class == 2:
            y = y[..., 1]
            y_pred = tf.sigmoid(y_pred[..., 1])

        self.loss_tracker.update_state(loss)
        self.compiled_metrics.update_state(y, y_pred, sample_weight)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}
    
    
    def test_step(self, data):
        # Unpack the data
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        loss = self.svc_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                reg_loss=self.losses,
        )
        # Update the metrics.
        if self.num_class == 2:
            y = y[..., 1]
            y_pred = tf.sigmoid(y_pred[..., 1])
        
        self.loss_tracker.update_state(loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}

    
    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch or at the start of `evaluate()`.
        return [self.loss_tracker] + self.compiled_metrics.metrics


    def save(self, model_path=None, input_shape=None):
        input_shape = [1] + input_shape 
        dumy_input = np.random.rand(*input_shape)


        dumy_body_output = self.bone(dumy_input)
        dumy_head_output = self.linear_svc(dumy_body_output)


        head_part = layers.Dense(units=dumy_head_output.shape[-1], activation="sigmoid")
        _ = head_part(dumy_body_output)
        head_part.set_weights(self.linear_svc.get_weights())


        if isinstance(self.bone, types.FunctionType):
            body_part = layers.Lambda(lambda x: self.bone(x))
        else:
            body_part = self.bone


        input_shape.pop(0)
        inputs = layers.Input(shape=input_shape)
        x = body_part(inputs)
        x = head_part(x)


        model = tf.keras.models.Model(inputs, x)
        model.save(model_path)

# Learning model no k fold

In [None]:
train, val = train_test_split(train_val, test_size = validation_percentage, shuffle = True, stratify = train_val.label, random_state=SEED)
train_gen = CustomGenerator(dataframe = train, base_path = 'data/train_all_no_duplicates', batch_size = batch_size, out_shape = img_size, shuffle = True, flow_from_directory=True, preprocess_input = True, categorical = True, augment = False)
valid_gen = CustomGenerator(dataframe = val, base_path = 'data/train_all_no_duplicates', batch_size = batch_size, out_shape = img_size, shuffle = True, flow_from_directory=True, preprocess_input = True, categorical = True, augment = False)
test_gen = CustomGenerator(dataframe = test, base_path = 'data/train_all_no_duplicates', batch_size = batch_size, out_shape = img_size, shuffle = True, flow_from_directory=True, preprocess_input = True, categorical = True, augment = False)

In [None]:
def get_supernet():
  supernet1 = tf.keras.applications.efficientnet_v2.EfficientNetV2B3(
    include_top=False,
    weights="imagenet",
    input_shape=(224,224,3)
)


  count = 1
  print(len(supernet1.layers))
  for layer in supernet1.layers:
      if count < 80:
          layer.trainable = False
      else:
          layer.trainable = True
      count = count + 1
  
  return supernet1

In [None]:
# Define metrics
METRICS = [tf.keras.metrics.BinaryAccuracy(name='accuracy'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
]

  # Define Bone, if you want linear svm, you can pass None to SVMTrainer as bone
Bone = tf.keras.models.Sequential([
      tfk.Input((224,224,3)),
      get_supernet(),
      tf.keras.layers.GlobalAveragePooling2D()
  ])

training_samples = int(len(train_gen)*batch_size)
step_size = 6*training_samples // batch_size

clr = CyclicLR(
      mode='triangular',
      base_lr=1e-5, 
      max_lr=1e-4,
      step_size= step_size
      )


svm_model = SVMTrainer(num_class=3, bone=Bone, C = 10)
svm_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
                    metrics=METRICS)
 
epochs = 100

  # Train
history = svm_model.fit(train_gen,
                          epochs=epochs,  validation_data = valid_gen,
                          callbacks = [tfk.callbacks.EarlyStopping(monitor= 'val_accuracy', mode='max', patience=15, restore_best_weights=True), clr], workers =8, use_multiprocessing = True
                          )



In [None]:
# Plot the training
plt.figure(figsize=(20,5))
plt.plot(history.history['loss'], label='Training', alpha=.8, color='#ff7f0e')
plt.plot(history.history['val_loss'], label='Validation', alpha=.8, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Category Crossentropy')
plt.grid(alpha=.3)

plt.figure(figsize=(20,5))
plt.plot(history.history['accuracy'], label='Training', alpha=.8, color='#ff7f0e')
plt.plot(history.history['val_accuracy'], label='Validation', alpha=.8, color='#4D61E2')
plt.legend(loc='upper left')
plt.title('Accuracy')
plt.grid(alpha=.3)

plt.show()

In [None]:
#prediction on test set
predictions = svm_model.predict(test_gen)

y = np.argmax(tfk.utils.to_categorical(list(test.label))[:-2], axis = -1)
pred = np.argmax(predictions, axis=-1)
target_names = ['N', 'P', 'T']
cm = confusion_matrix(y, pred, normalize="true")


# Compute the classification metrics
accuracy = accuracy_score(y, pred)
precision = precision_score(y, pred, average='macro')
recall = recall_score(y, pred, average='macro')
f1 = f1_score(y, pred, average='macro')
print('Accuracy:',accuracy.round(4))
print('Precision:',precision.round(4))
print('Recall:',recall.round(4))
print('F1:',f1.round(4))
print(classification_report(y, pred, target_names=target_names, digits=4))
# Plot the confusion matrix
plt.figure(figsize=(10,10))
hm = sns.heatmap(cm.T, xticklabels=[0,1,2], yticklabels=[0,1,2])
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

hm.get_figure().savefig("data_for_report/heatmap_eff_svm_all_data.pdf")

# Model saving
model_directory = './'
filename = 'svm_all_data'
filename_chosen = os.path.join(model_directory, filename)
svm_model.save(filename_chosen + '.h5', input_shape=[224, 224, 3])


with open('data_for_report/train_all_data_eff_svm_history.json' , 'w') as fp:
    json.dump(history.history, fp)

with open('data_for_report/test_all_data_eff_svm_report.json' , 'w') as fp:
    json.dump(classification_report(y, pred, target_names=target_names, digits=4, output_dict = True), fp)

#Learning model k fold

In [None]:
def get_supernet():
  supernet1 = tf.keras.applications.efficientnet_v2.EfficientNetV2B3(
    include_top=False,
    weights="imagenet",
    input_shape=(224,224,3)
)


  count = 1
  print(len(supernet1.layers))
  for layer in supernet1.layers:
      if count < 80:
          layer.trainable = False
      else:
          layer.trainable = True
      count = count + 1
  
  return supernet1

In [None]:
acc_per_fold = []
loss_per_fold = []
val_acc_per_fold = []
val_loss_per_fold = []
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state = SEED)

for i, (train_index, valid_index) in enumerate(skf.split(train_val.file.to_numpy(), train_val.label.to_numpy())): 
  trainDF = train_val.filter(items= train_index, axis=0)
  validDF = train_val.filter(items= valid_index, axis=0)

  train_gen = CustomGenerator(dataframe = trainDF, base_path = 'data/train_all_no_duplicates', batch_size = batch_size, out_shape = img_size, shuffle = True, flow_from_directory=True, preprocess_input = True, categorical = True, augment = False)
  valid_gen = CustomGenerator(dataframe = validDF, base_path = 'data/train_all_no_duplicates', batch_size = batch_size, out_shape = img_size, shuffle = True, flow_from_directory=True, preprocess_input = True, categorical = True, augment = False)
  # Define metrics
  METRICS = [tf.keras.metrics.BinaryAccuracy(name='accuracy'),
        tf.keras.metrics.Precision(name='precision'),
        tf.keras.metrics.Recall(name='recall'),
  ]

  # Define Bone, if you want linear svm, you can pass None to SVMTrainer as bone
  Bone = tf.keras.models.Sequential([
      tfk.Input((224,224,3)),
      get_supernet(),
      tf.keras.layers.GlobalAveragePooling2D()
  ])

  training_samples = int(len(train_gen)*batch_size)
  step_size = 6*training_samples // batch_size

  clr = CyclicLR(
      mode='triangular',
      base_lr=1e-5, 
      max_lr=1e-4,
      step_size= step_size
      )


  svm_model = SVMTrainer(num_class=3, bone=Bone, C = 10)
  svm_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
                    metrics=METRICS)
 
  epochs = 100

  # Train
  history = svm_model.fit(train_gen,
                          epochs=epochs,  validation_data = valid_gen,
                          callbacks = [tfk.callbacks.EarlyStopping(monitor= 'val_accuracy', mode='max', patience=15, restore_best_weights=True), clr], workers =8, use_multiprocessing = True
                          )
  
  val_acc_per_fold.append(history.history["val_accuracy"] * 100)
  val_loss_per_fold.append(history.history["val_loss"])
  acc_per_fold.append(history.history["accuracy"] * 100)
  loss_per_fold.append(history.history["loss"])

  # Plot the training
  plt.figure(figsize=(20,5))
  plt.plot(history.history['loss'], label='Training', alpha=.8, color='#ff7f0e')
  plt.plot(history.history['val_loss'], label='Validation', alpha=.8, color='#4D61E2')
  plt.legend(loc='upper left')
  plt.title('Category Crossentropy')
  plt.grid(alpha=.3)

  plt.figure(figsize=(20,5))
  plt.plot(history.history['accuracy'], label='Training', alpha=.8, color='#ff7f0e')
  plt.plot(history.history['val_accuracy'], label='Validation', alpha=.8, color='#4D61E2')
  plt.legend(loc='upper left')
  plt.title('Accuracy')
  plt.grid(alpha=.3)

  plt.show()

  #prediction on test set
  predictions = svm_model.predict(test_gen)

  y = np.argmax(tfk.utils.to_categorical(list(test.label))[:-2], axis = -1)
  pred = np.argmax(predictions, axis=-1)
  target_names = ['N', 'P', 'T']
  cm = confusion_matrix(y, pred, normalize="true")


  # Compute the classification metrics
  accuracy = accuracy_score(y, pred)
  precision = precision_score(y, pred, average='macro')
  recall = recall_score(y, pred, average='macro')
  f1 = f1_score(y, pred, average='macro')
  print('Accuracy:',accuracy.round(4))
  print('Precision:',precision.round(4))
  print('Recall:',recall.round(4))
  print('F1:',f1.round(4))
  print(classification_report(y, pred, target_names=target_names, digits=4))
  # Plot the confusion matrix
  plt.figure(figsize=(10,10))
  sns.heatmap(cm.T, xticklabels=[0,1,2], yticklabels=[0,1,2])
  plt.xlabel('True labels')
  plt.ylabel('Predicted labels')
  plt.show()

  # Model saving
  model_directory = './'
  filename = 'svm_kFold_' + str(i+1)
  filename_chosen = os.path.join(model_directory, filename)
  svm_model.save(filename_chosen + '.h5', input_shape=[224, 224, 3])

In [None]:
best_train_acc_fold = []
best_train_loss_fold = []
best_val_acc_fold = []
best_val_loss_fold = []
for i in range(0, len(acc_per_fold)):
  best_acc_train_index = np.argmax(acc_per_fold[i])
  best_acc_train = acc_per_fold[i][best_acc_train_index]
  best_loss_train = loss_per_fold[i][best_acc_train_index]

  best_acc_val_index = np.argmax(val_acc_per_fold[i])
  best_acc_val = acc_per_fold[i][best_acc_val_index]
  best_loss_val = loss_per_fold[i][best_acc_val_index]

  best_train_acc_fold.append(best_acc_train)
  best_train_loss_fold.append(best_loss_train)
  best_val_acc_fold.append(best_acc_val)
  best_val_loss_fold.append(best_loss_val)

In [None]:
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - TLoss: {best_train_loss_fold[i]} - TAccuracy: {best_train_acc_fold[i]}%')
  print(f'> Fold {i+1} - VLoss: {best_val_loss_fold[i]} - VAccuracy: {best_val_acc_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Training Accuracy: {np.mean(best_train_acc_fold)} (+- {np.std(best_train_acc_fold)})')
print(f'> Training Loss: {np.mean(best_train_loss_fold)} (+- {np.std(best_train_loss_fold)})')
print(f'> Validation Accuracy: {np.mean(best_val_acc_fold)} (+- {np.std(best_val_acc_fold)})')
print(f'> Validation Loss: {np.mean(best_val_loss_fold)} (+- {np.std(best_val_loss_fold)})')
print('------------------------------------------------------------------------')

## Save training statistics

In [None]:
s = """ ------------------------------------------------------------------------
Score per fold
------------------------------------------------------------------------
> Fold 1 - TLoss: 0.06644788384437561 - TAccuracy: 0.998160183429718%
> Fold 1 - VLoss: 0.0760158970952034 - VAccuracy: 0.9977272748947144%
------------------------------------------------------------------------
> Fold 2 - TLoss: 0.04653624817728996 - TAccuracy: 0.9995659589767456%
> Fold 2 - VLoss: 0.06799011677503586 - VAccuracy: 0.9987521767616272%
------------------------------------------------------------------------
> Fold 3 - TLoss: 0.05929543823003769 - TAccuracy: 0.9968156218528748%
> Fold 3 - VLoss: 0.18274760246276855 - VAccuracy: 0.9939550757408142%
------------------------------------------------------------------------
> Fold 4 - TLoss: 0.05379674956202507 - TAccuracy: 0.9990285038948059%
> Fold 4 - VLoss: 0.1970527321100235 - VAccuracy: 0.9954123497009277%
------------------------------------------------------------------------
> Fold 5 - TLoss: 0.046857770532369614 - TAccuracy: 0.9990285038948059%
> Fold 5 - VLoss: 0.12490435689687729 - VAccuracy: 0.995088517665863%
------------------------------------------------------------------------
Average scores for all folds:
> Training Accuracy: 0.99851975440979 (+- 0.000963904054787102)
> Training Loss: 0.05458681806921959 (+- 0.007589862249069231)
> Validation Accuracy: 0.9961870789527894 (+- 0.001774306115726234)
> Validation Loss: 0.1297421410679817 (+- 0.053034932931961924)
------------------------------------------------------------------------"""

In [None]:
training_data = {}

training_data['summary'] = s
training_data['best_training_losses'] = best_train_loss_fold
training_data['best_validation_losses'] = best_val_loss_fold
training_data['best_training_acc'] = best_train_acc_fold
training_data['best_validation_acc'] = best_val_acc_fold


with open('data_for_report/kfold_eff_svm_training_stats.json', 'w') as fp:
    json.dump(training_data, fp)

## Load trained models

In [None]:
from keras.models import load_model

# load models 
total_model = 5
model_directory = './'
trained_models = list()
for model_n in range(total_model):
  filename = 'svm_kFold_' + str(model_n+1)
  filename_chosen = os.path.join(model_directory, filename)
  svm_model_loaded = load_model(filename_chosen + '.h5')
  svm_model_loaded.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
                    metrics=METRICS)
  trained_models.append(svm_model_loaded)

## Evaluate trained models (for storing statistics)

In [None]:
validation_stats = {}
plt.get_current_fig_manager().full_screen_toggle() # toggle fullscreen mode

for idx, svm_model in enumerate(trained_models):  
  predictions = svm_model.predict(test_gen)
  y = np.argmax(tfk.utils.to_categorical(list(test.label))[:-2], axis = -1)
  pred = np.argmax(predictions, axis=-1)
  target_names = ['N', 'P', 'T']
  cm = confusion_matrix(y, pred, normalize="true")


  # Compute the classification metrics
  accuracy = accuracy_score(y, pred)
  precision = precision_score(y, pred, average='macro')
  recall = recall_score(y, pred, average='macro')
  f1 = f1_score(y, pred, average='macro')


  validation_stats['fold_' + str(idx+1) ] = {
      'accuracy' : accuracy,
      'precision' :precision,
      'recall' :recall,
      'f1' :f1,
      'classification_report' :classification_report(y, pred, target_names=target_names, digits=4, output_dict = True)}


  print('Accuracy:',accuracy.round(4))
  print('Precision:',precision.round(4))
  print('Recall:',recall.round(4))
  print('F1:',f1.round(4))
  print(classification_report(y, pred, target_names=target_names, digits=4))
  # Plot the confusion matrix
  plt.figure(figsize=(10,10))
  hm = sns.heatmap(cm.T, xticklabels=[0,1,2], yticklabels=[0,1,2])
  plt.xlabel('True labels')
  plt.ylabel('Predicted labels')
  plt.show()

  hm.get_figure().savefig("data_for_report/heatmap_eff_svm_fold_"+str(idx+1)+".pdf")

with open('data_for_report/test_stats_eff_svm.json' , 'w') as fp:
    json.dump(validation_stats, fp)



## MAJORITY VOTING

In [None]:
preds = []

for idx, svm_model in enumerate(trained_models):  
  predictions = svm_model.predict(test_gen)
  y = np.argmax(tfk.utils.to_categorical(list(test.label))[:-2], axis = -1)
  pred = np.argmax(predictions, axis=-1)

  preds.append(pred)

In [None]:
preds_mv = scipy.stats.mode(preds, axis=0)[0][0]

In [None]:
print(preds_mv)

In [None]:
y = np.argmax(tfk.utils.to_categorical(list(test.label))[:-2], axis = -1)
pred = preds_mv
target_names = ['N', 'P', 'T']
cm = confusion_matrix(y, pred, normalize="true")


# Compute the classification metrics
accuracy = accuracy_score(y, pred)
precision = precision_score(y, pred, average='macro')
recall = recall_score(y, pred, average='macro')
f1 = f1_score(y, pred, average='macro')

test_stats = {
      'accuracy' : accuracy,
      'precision' :precision,
      'recall' :recall,
      'f1' :f1,
      'classification_report' :classification_report(y, pred, target_names=target_names, digits=4, output_dict = True)}


print('Accuracy:',accuracy.round(4))
print('Precision:',precision.round(4))
print('Recall:',recall.round(4))
print('F1:',f1.round(4))
print(classification_report(y, pred, target_names=target_names, digits=4))
# Plot the confusion matrix
plt.figure(figsize=(10,10))
hm = sns.heatmap(cm.T, xticklabels=[0,1,2], yticklabels=[0,1,2])
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

hm.get_figure().savefig("data_for_report/eff_svm_ensemble_heatmap.pdf")

with open('data_for_report/test_stats_ensemble_eff_svm.json' , 'w') as fp:
    json.dump(test_stats, fp)