<a href="https://colab.research.google.com/github/JamesBolt22/Supervised_Contrastive_learning_for_onset_detection/blob/main/Contrastive_model_and_encodings.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Import Libraries
%matplotlib inline
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import numpy as np
import math
import h5py
import keras.backend as K
import glob

In [None]:
#@title Plot Spec
def plot_spec(spectrogram):
    
    fig, ax = plt.subplots(nrows=1, sharex=True, figsize=(10,10))
    cax = ax.matshow(spectrogram, aspect='auto', origin='lower')

In [None]:
#@title Load fold data

with h5py.File('/content/drive/MyDrive/data_fold_0/train_data.h5', 'r') as hf:
    train_data = hf["input_data"][:]
with h5py.File('/content/drive/MyDrive/data_fold_0/train_labels.h5', 'r') as hf:
    train_label = hf["input_data"][:]

with h5py.File('/content/drive/MyDrive/data_fold_0/val_data.h5', 'r') as hf:
    val_data = hf["input_data"][:]
with h5py.File('/content/drive/MyDrive/data_fold_0/val_label.h5', 'r') as hf:
    val_label = hf["input_data"][:]

with h5py.File('/content/drive/MyDrive/data_fold_0/weights_data.h5', 'r') as hf:
    train_weights = hf["input_data"][:]    

In [None]:
#@title Combine val data
val_full = (val_data, val_label)

In [None]:
#@title Encoder

#Creates the encoder network
def new_encoder(input_shape, projection_size):  

  inputs = keras.Input(shape=input_shape)

  x = tf.keras.layers.Conv2D(filters=10, kernel_size=(3,7), strides=1, activation="relu")(inputs)

  x = tf.keras.layers.MaxPooling2D(pool_size=(3,1))(x)

  x = tf.keras.layers.Conv2D(filters=20, kernel_size=(3,3), strides=1, activation="relu")(x)

  x = tf.keras.layers.MaxPooling2D(pool_size=(3,1))(x)

  x = tf.keras.layers.Flatten()(x)

  x = tf.keras.layers.Dense(2048, activation = "relu")(x)

  x = tf.keras.layers.Lambda(lambda x: K.l2_normalize(x,axis=1))(x)
  
  x = tf.keras.layers.Dense(projection_size, activation = "relu")(x)

  outputs = tf.keras.layers.Lambda(lambda x: K.l2_normalize(x,axis=1))(x)

  model = tf.keras.Model(inputs=inputs, outputs=outputs)

  return model

In [None]:
#@title Classifier

#Creates the classifier network
def create_classifier(encoder, trainable=True):

    #determines whether encoder weights should be frozen
    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = tf.keras.Input(shape=input_shape)
    features = encoder(inputs)
    features  = tf.keras.layers.Dropout(0.5)(features)
    features = tf.keras.layers.Dense(128, activation="relu")(features)
    outputs = tf.keras.layers.Dense(1, activation="sigmoid")(features)
    class_model = tf.keras.Model(inputs=inputs, outputs=outputs, name="classifier")
    class_model.compile(optimizer = tf.keras.optimizers.Adam() ,loss = tf.keras.losses.BinaryCrossentropy(),metrics=tf.keras.metrics.BinaryAccuracy())
    return class_model

In [None]:
#@title Supervised Contrastive Loss
class SupervisedContrastiveLoss(tf.keras.losses.Loss):
    
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):

        #feature_vectors = tf.math.l2_normalize(feature_vectors, axis = 1)
        batch_size = tf.shape(labels)[0]
        #creates positive anchor mask
        mask = tf.cast(tf.equal(labels, tf.transpose(labels)), tf.float32)

        #creates diagonal mask
        diagonal_mask = tf.ones_like(mask) - tf.eye(batch_size)
        mask = mask * diagonal_mask
    

        #creates dot product between the feature vectors
        dot_tensor = tf.math.divide(tf.linalg.matmul(feature_vectors,tf.transpose(feature_vectors)),self.temperature)
        max_dot_tensor = tf.reduce_max(dot_tensor, axis=1, keepdims=True)
        dot_tensor = dot_tensor - max_dot_tensor
        
        #take exponential of all values in tensor
        exp_tensor = tf.math.exp(dot_tensor) 
        
        #removes the diagonals
        exp_tensor_no_diagonal = exp_tensor * diagonal_mask
        
        
        #take summation of rows of tensor
        denominator_summations = tf.reduce_sum(exp_tensor_no_diagonal, 1, keepdims = True)

        #Creates the final fraction
        log_prob = dot_tensor - tf.math.log(denominator_summations) * diagonal_mask
        
        #summates across all positives
        mean_log_prob = tf.divide(tf.reduce_sum(log_prob * mask, 1, keepdims = True), tf.reduce_sum(mask,1, keepdims = True))
        total_loss = - mean_log_prob * (self.temperature/0.07)

        return total_loss

In [None]:
#@title Create encoder

#Creates and compiles the encoded with projection layers
def contrastive_encoder(input_shape, projection_size): 

    encoder = new_encoder(input_shape, projection_size)
    optimizer = tf.keras.optimizers.Adam(0.005) 
    encoder.compile(optimizer=optimizer,loss=SupervisedContrastiveLoss(temperature),metrics=tf.keras.metrics.CosineSimilarity())
    
    return encoder

In [None]:
#@title Set Params
hop_len = 441
nfft = 256
num_of_bins = 80
size_of_slice = 14
input_shape = (num_of_bins,size_of_slice+1,3)
projection_size = 128
temperature = 0.5

#calculates 1% of total training data for batch size
total_data = train_labels.shape[0]
batch_size = math.ceil(train_labels.shape[0]/100)
num_of_epochs = 300

In [None]:
#@title create model

#creates the final model for supervised contrastive training
model = contrastive_encoder(input_shape, projection_size)
model.summary()

In [None]:
#@title Adds callback
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath="/content/drive/MyDrive/Checkpoints",save_weights_only=True,monitor='val_loss',mode='min',save_best_only=True)

In [None]:
#@title Fit model
history = model.fit(x = train_data, y = train_label, batch_size = batch_size, epochs=num_of_epochs, validation_data = val_full, shuffle=True, callbacks=[cp_callback],sample_weight=train_weights)

In [None]:
#@title Load previous weights
model.load_weights("/content/drive/MyDrive/Checkpoints")

In [None]:
#@title Import More libraries for producing grouped vectors
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.manifold import MDS
%matplotlib inline

In [None]:
#@title Plot embeddings
def plot_embeddings(emb,labels):
    tl=TSNE()
    embedding=tl.fit_transform(emb)
    return embedding

In [None]:
#@title create encoded vectors
encoded_vector=model.predict(val_data)
print(encoded_vector.shape)

In [None]:
#@title Calculate Tsne
embedding = plot_embeddings(encoded_vector,val_label)

In [None]:
#@title Plot Tsne
fig = plt.figure(figsize = (10, 10))
labels_name = "/content/drive/MyDrive/Processed_data/labels_1"
sns.scatterplot(x = embedding[:,0], y = embedding[:,1], hue= np.loadtxt(labels_name))
plt.show()

In [None]:
#@title Create classifier

#If training cross-entropy model set trainable to true
new_model = tf.keras.models.Sequential(model.layers[1:-2])
final_model = create_classifier(new_model, trainable=False)
final_model.summary()

In [None]:
#@title Set callback
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath="/content/drive/MyDrive/Checkpoints3",save_weights_only=True,monitor='val_loss',mode='min',save_best_only=True)

In [None]:
#@title Fit Classifier
history = final_model.fit(x = train_data,  y = train_label, batch_size = 256, epochs=25, validation_data = val_full, shuffle=True,callbacks=[cp_callback],sample_weight=train_weights)

In [None]:
#@title Load weights
final_model.load_weights("/content/drive/MyDrive/Checkpoints3")

In [None]:
#@title Save Model
final_model.save("/content/drive/MyDrive/Masters/Models/Hyperparam/temp/temp_0.6")