<a href="https://colab.research.google.com/github/alebjanes/fire-susceptibility-mapping/blob/main/SiameseNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from numpy import load
from sklearn.model_selection import train_test_split
from keras.models import Sequential, Model
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import BatchNormalization
from keras.layers import Conv2D, MaxPooling2D, Dense, Flatten, Dropout, Activation, Input
from tensorflow.keras import regularizers
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.layers.merge import concatenate
import random
from sklearn.model_selection import KFold

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
Dataset = np.load('/content/drive/My Drive/MT/Samples/samples5x5_v3.npy')

X = Dataset[:,:,:,1:21]
target = Dataset[:,2,2,0]
y = np.expand_dims(target, axis=1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
X=X_train
y=y_train

del y_test
del X_test
del target


In [None]:
#CV
num_folds = 5

# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=False)

# Define per-fold score containers <-- these are new
acc_per_fold = []
loss_per_fold = []

acc_per_fold_train = []
loss_per_fold_train = []

In [None]:
#Create batch with (batch_size) number of pairs
def create_batch(batch_size, x, target):
    Left_inputs = np.zeros((batch_size, 5, 5, 20))
    Right_inputs = np.zeros((batch_size, 5, 5, 20))
    Label = np.zeros((batch_size,), dtype = np.int)
    
    for i in range(batch_size):

      random_index1 = random.randint(0, x.shape[0] - 1)
      random_index2 = random.randint(0, x.shape[0] - 1)
        
      while random_index1 == random_index2:
        random_index1 = random.randint(0, x.shape[0]-1)
        random_index2 = random.randint(0, x.shape[0]-1)

      left = x[random_index1]
      right = x[random_index2]

      #Label
      if target[random_index1] == target[random_index2]:
        Label[i] = int(1)
      else: Label[i] = int(0)

      Left_inputs[i] = left
      Right_inputs[i] = right

    return [Left_inputs, Right_inputs], Label

In [None]:
def data_generator(batch_size, Xtrain, ytrain):
    while True:
        x, y = create_batch(batch_size, Xtrain, ytrain)
        yield x, y


def testdata_generator(batch_size, X_test, y_test):
    while True:
        x, y = create_batch(batch_size, X_test, y_test)
        yield x, y

In [None]:
from keras.layers import Dense, Conv2D, MaxPooling2D, Dropout
from keras.models import Sequential
#Embedding model

#encoding_size = 32
def create_Net():
  Net = Sequential([
    BatchNormalization(momentum=0.99, input_shape = (5, 5, 20)),
    Conv2D(64, 3, activation = 'relu', padding = 'same'),
    MaxPooling2D(2),
    Conv2D(128, 3, activation = 'relu', padding = 'same'),
    BatchNormalization(momentum = 0.99),
    MaxPooling2D(2),
    Conv2D(128, 3, activation = 'relu', padding = 'same'),
    Flatten(),
    Dense(2048, activation='sigmoid', kernel_regularizer = tf.keras.regularizers.L1L2(l1=0.001, l2=0.01)),

  ], name = 'Net')
  return Net
#Net.summary()

In [None]:
from keras.layers import Input, Lambda, subtract, MaxPooling2D, concatenate, Activation, Dense, Conv2D, Dropout, Flatten
from keras.preprocessing import image
from keras.applications.densenet import preprocess_input
import tensorflow as tf
from keras.models import Model
from keras import backend as K

# create model
def create_model(input_shape):
  left_input = Input(input_shape, name="left_input")
  right_input = Input(input_shape, name="right_input") 

  # create the inputs
  enc_left = Net(left_input)
  enc_right = Net(right_input)

  Euc_layer = Lambda(lambda tensor : K.abs(tensor[0] - tensor[1]), name = 'Distance')

  # use and add the distance function
  Euc_distance = Euc_layer([enc_left, enc_right])

  #identify the prediction
  prediction = Dense(1, activation='sigmoid')(Euc_distance)
  siamese_net = Model(inputs = [left_input, right_input], outputs = [prediction], name = 'SiameseNet')
  
  #MODO1
  #output = concatenate([enc_left, enc_right], axis=1)
  #siamese_net = Model(inputs=[left_input, right_input], outputs=output)
  #siamese_net.summary()

  siamese_net.compile(loss='binary_crossentropy', optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), metrics=['accuracy'])
        
  return siamese_net

In [None]:
#Callbacks

early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=100, min_delta=0.01, restore_best_weights=True)
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.9, patience=25, min_lr=0.0001, verbose=1)
steps = 1000

fold_no = 1
for train, test in kfold.split(X, y):
  
 # Define the model architecture
  Net = create_Net()
  model = create_model((5,5,20))

  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  history = model.fit(data_generator(16, X[train], y[train]), 
                      validation_data = testdata_generator(16, X[test], y[test]), 
                      validation_steps = 500, 
                      epochs = 200, 
                      steps_per_epoch = steps, 
                      callbacks = [early_stop], 
                      verbose = 0)
  
  Net.save('/content/drive/My Drive/MT/CV_models/SN5'+ str(fold_no))

  del model
  del Net
  K.clear_session()
  

  # Generate generalization metrics on VALIDATION
  #scores = SN.evaluate(X[test], y[test], verbose=0)
  #print(f'TEST: Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
  #acc_per_fold.append(scores[1] * 100)
  #loss_per_fold.append(scores[0])

  # Generate generalization metrics on TRAINING
  #scores_train = SN.evaluate(X[train], y[train], verbose=0)
  #print(f'TRAIN: Score for fold {fold_no}: {model.metrics_names[0]} of {scores_train[0]}; {model.metrics_names[1]} of {scores_train[1]*100}%')
  #acc_per_fold_train.append(scores_train[1] * 100)
  #loss_per_fold_train.append(scores_train[0])

  # Increase fold number
  fold_no = fold_no + 1



In [None]:
def build_model(lr, fold_no):
  reconstructed_model = keras.models.load_model('/content/drive/My Drive/MT/CV_models/SN5'+ str(fold_no))
  reconstructed_model.trainable = False
  model2 = Sequential()
  model2.add(reconstructed_model),
  model2.add(Dense(1, activation = 'sigmoid'))
  
  model2.compile(loss=tf.keras.losses.binary_crossentropy, optimizer=tf.keras.optimizers.Adam(learning_rate=lr, beta_1=0.9, beta_2=0.9999, epsilon=1e-07), metrics=['accuracy'])

  return model2

In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=100, min_delta=0.01, restore_best_weights=True)
import keras

fold_no = 1
for train, test in kfold.split(X, y):

  # Define the model architecture
  model = build_model(0.0001, fold_no)

  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  history = model.fit(X[train], y[train],
              batch_size=32,
              epochs=200,
              callbacks=[early_stop],
              validation_data = (X[test], y[test]), verbose=1)


  # Generate generalization metrics on VALIDATION
  scores = model.evaluate(X[test], y[test], verbose=0)
  print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
  acc_per_fold.append(scores[1] * 100)
  loss_per_fold.append(scores[0])

  # Generate generalization metrics on TRAINING
  scores_train = model.evaluate(X[train], y[train], verbose=0)
  print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores_train[0]}; {model.metrics_names[1]} of {scores_train[1]*100}%')
  acc_per_fold_train.append(scores_train[1] * 100)
  loss_per_fold_train.append(scores_train[0])

  del model
  K.clear_session()
  # Increase fold number
  fold_no = fold_no + 1


  # == Provide average scores ==
print('------------------------------------------------------------------------')
print('Score per fold on validation set')
for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print('------------------------------------------------------------------------')


print('------------------------------------------------------------------------')
print('Score per fold on training set')
for i in range(0, len(acc_per_fold_train)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold_train[i]} - Accuracy: {acc_per_fold_train[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold_train)} (+- {np.std(acc_per_fold_train)})')
print(f'> Loss: {np.mean(loss_per_fold_train)}')
print('------------------------------------------------------------------------')