# Import Data

In [None]:
import pickle as pkl

# Load the data with pickle
with open('X_train_single', 'rb') as f:
    X_train_single = pkl.load(f)
with open('y_train_single', 'rb') as f:
    y_train_single = pkl.load(f)
with open('X_test_single', 'rb') as f:
    X_test_single = pkl.load(f)
with open('y_test_single', 'rb') as f:
    y_test_single = pkl.load(f)
with open('X_val_single', 'rb') as f:
    X_val_single = pkl.load(f)
with open('y_val_single', 'rb') as f:
    y_val_single = pkl.load(f)

# Pair Creation

In [None]:
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from keras.models import Model
from keras.layers import Input, Conv1D, MaxPooling1D, Dropout, GlobalAveragePooling1D, Dense, Lambda, Bidirectional, LSTM, TimeDistributed, Flatten, BatchNormalization
import keras.backend as K
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.metrics import classification_report

In [None]:
# Beat pairs creation with label
def create_pairs(beat, labels):
  labelPairs = []
  baseBeat = []
  comparedBeat = []

  #Getting the indices of each class
  numclasses = len(np.unique(labels))
  idx = [np.where(labels ==i)[0] for i in range(numclasses)]

  for ind in range(len(beat)):
    #Getting current beat with index
    currBeat = beat[ind]
    #getting the label of the beat from labels.
    label = labels[ind]

    #Randomly choosing another labels from the same class
    indB = np.random.choice(idx[label])
    #corresponding beat for this randomly selected label
    indBeat = beat[indB]

    baseBeat.append(currBeat)
    comparedBeat.append(indBeat)


    labelPairs.append([1])


    #Getting a label where label is different than the current beat
    diss_idx = np.where(labels != label)[0]

    #finding a beat for this label
    diss_beat = beat[np.random.choice(diss_idx)]

    baseBeat.append(currBeat)
    comparedBeat.append(diss_beat)
    labelPairs.append([0])

  return np.array(baseBeat), np.array(comparedBeat), np.array(labelPairs)

In [None]:
# Initialize LabelEncoder
label_encoder = LabelEncoder()
# Fit and transform the string labels to integer labels
y_train_encoded = label_encoder.fit_transform(y_train_single)
y_val_encoded = label_encoder.fit_transform(y_val_single)
y_test_encoded = label_encoder.fit_transform(y_test_single)

In [None]:
# training and validation pairs creation
training_pairs_base, training_pairs_comp, training_labels = create_pairs(X_train_single, y_train_encoded)
val_pairs_base, val_pairs_comp, val_labels = create_pairs(X_val_single, y_val_encoded)

In [None]:
# Define Loss function
def contrastiveLoss(y, y_preds, margin=1):
 y = tf.cast(y, y_preds.dtype)
 y_preds_squared = K.square(y_preds)
 margin_squared = K.square(K.maximum(margin - y_preds, 0))
 loss = K.mean(y * y_preds_squared + (1 - y) * margin_squared)
 return loss

In [None]:
# Define distance metric
def euclidean_distance(vecs):
    beatA, beatB = vecs
    ss = K.sum(K.square(beatA - beatB), axis=1, keepdims=True)
    return K.sqrt(K.maximum(ss, K.epsilon()))

# Convolutional Model

In [None]:
# Definition of siamese model based on Convolutional layers
def siamese_model(input_shape, embeddingDim=48):
    inputs = Input(input_shape)
    x = Conv1D(128, 2, padding="same", activation="relu")(inputs)
    x = MaxPooling1D(pool_size=2)(x)
    x = Dropout(0.4)(x)

    # Trying a deeper model
    x = Conv1D(256, 2, padding="same", activation="relu")(x)  # Additional Conv1D layer
    x = MaxPooling1D(pool_size=2)(x)
    x = Dropout(0.4)(x)

    pooling = GlobalAveragePooling1D()(x)
    outputs = Dense(embeddingDim)(pooling)
    model = Model(inputs, outputs)
    return model

In [None]:
beat_shape = (100, 1)
batch_size = 1024
epochs = 50

beatA = Input(shape=beat_shape)
beatB = Input(shape=beat_shape)

model_build = siamese_model(beat_shape)
beatA_embedding = model_build(beatA)
beatB_embedding = model_build(beatB)

distance = Lambda(euclidean_distance)([beatA_embedding, beatB_embedding])
model = Model(inputs=[beatA, beatB], outputs=distance)
model.summary()

# BiLSTM Model

In [None]:
# Definition of siamese model based on BiLSTM layer
def siamese_model(input_shape, embeddingDim=48):
    inputs = Input(input_shape)

    # Replace Conv1D with Bidirectional LSTM
    x = Bidirectional(LSTM(64, return_sequences=True))(inputs)

    x = Dropout(0.4)(x)

    x = Flatten()(x)

    outputs = Dense(embeddingDim)(x)

    model = Model(inputs, outputs)
    return model

In [None]:
# Build the model
beat_shape = (100, 1)
batch_size = 1024
epochs = 50

beatA = Input(shape=beat_shape)
beatB = Input(shape=beat_shape)

model_build = siamese_model(beat_shape)
beatA_embedding = model_build(beatA)
beatB_embedding = model_build(beatB)

distance = Lambda(euclidean_distance)([beatA_embedding, beatB_embedding])
model = Model(inputs=[beatA, beatB], outputs=distance)
model.summary()

In [None]:
# Compile the model with the contrastive loss function
model.compile(loss = contrastiveLoss, optimizer="adam")

In [None]:
# Change dimensionality to required format
x_train_expand = np.expand_dims(training_pairs_base, axis = -1)
x_train_c_expand = np.expand_dims(training_pairs_comp, axis = -1)

x_val_expand = np.expand_dims(val_pairs_base, axis = -1)
x_val_c_expand = np.expand_dims(val_pairs_comp, axis = -1)

In [None]:
# Define early stopping callback
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience = 30, start_from_epoch = 10)

In [None]:
# Train the model
history = model.fit(
    [x_train_expand, x_train_c_expand], training_labels,
    validation_data=([x_val_expand, x_val_c_expand], val_labels),
    batch_size = batch_size,
    epochs = epochs,
    callbacks=[early_stop]
    )

# Model Assessment

Definition of a template for comparison and perform prediction on test set in the fastest possible way

In [None]:
# Extract beats from training set
label_0_indices = np.where(y_train_encoded == 0)[0]
label_1_indices = np.where(y_train_encoded == 1)[0]
label_2_indices = np.where(y_train_encoded == 2)[0]

X_train_N = X_train_single[label_0_indices]
X_train_S = X_train_single[label_1_indices]
X_train_V = X_train_single[label_2_indices]

len(X_train_N),len(X_train_S),len(X_train_V)

In [None]:
X_train_N_expanded = np.expand_dims(X_train_N, axis=-1)
X_train_S_expanded = np.expand_dims(X_train_S, axis=-1)
X_train_V_expanded = np.expand_dims(X_train_V, axis=-1)
# Check the shape of the expanded data
X_train_N_expanded.shape

In [None]:
X_train_N_expanded_list = X_train_N_expanded.tolist()
X_train_S_expanded_list = X_train_S_expanded.tolist()
X_train_V_expanded_list = X_train_V_expanded.tolist()
# Check the length of the list
len(X_train_N_expanded_list)

In [None]:
def generate_template(beats_list):

    # Stack the beats along a new axis (axis=0)
    stacked_beats = np.stack(beats_list, axis=0)

    # Compute the mean or median along the first axis (axis=0) to get the template
    # template_beat = np.mean(stacked_beats, axis=0)
    # Alternatively, you can use median
    template_beat = np.median(stacked_beats, axis=0)

    return template_beat

In [None]:
# Generate the template for each class
template_N = generate_template(X_train_N_expanded_list)
template_S = generate_template(X_train_S_expanded_list)
template_V = generate_template(X_train_V_expanded_list)

In [None]:
# Visualize the templates
plt.plot(template_N)
plt.plot(template_S)
plt.plot(template_V)

In [None]:
# Duplicate template to match the length of the test set
ref_len = len(X_test_single)
template_N_copied = np.tile(template_N, (ref_len, 1, 1))
template_S_copied = np.tile(template_S, (ref_len, 1, 1))
template_V_copied = np.tile(template_V, (ref_len, 1, 1))

template_N_copied.shape

In [None]:
# Make predictions
prediction_N = model.predict([template_N_copied, X_test_single])
prediction_S = model.predict([template_S_copied, X_test_single])
prediction_V = model.predict([template_V_copied, X_test_single])

In [None]:
Prediction = np.concatenate([prediction_N, prediction_S, prediction_V], axis=1)
# Check the shape of the prediction
Prediction.shape

In [None]:
# Assign the class with the minimum distance as the predicted class
y_pred = np.argmin(Prediction, axis=1)
# Check the shape of the prediction
y_pred.shape

In [None]:
# Replace every occurrence of 2 with 1 for binary prediction
y_binary = np.where(y_pred == 2, 1, y_pred)
y_test_binary = np.where(y_test_encoded == 2, 1, y_test_encoded)

In [None]:
# Visualize model performance
confusion_mtx = confusion_matrix(y_test_binary, y_binary, normalize='true')

# plot the confusion matrix
plt.figure(figsize=(8,5))
sns.heatmap(confusion_mtx, annot=True, fmt='.2f')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.title('Confusion Matrix')
plt.show()

# Print classification report
print(classification_report(y_test_binary, y_binary))

In [None]:
save_path = '/content/drive/MyDrive/AI_project'

# Save the model
model.save(save_path)