In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
file_path = '/content/drive/My Drive/ProyectoTesis/AllFeatureData.pkl'


# Modelo

In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np

In [None]:
allData = pd.read_pickle(file_path)

In [None]:
allData.head()

In [None]:
import matplotlib.pyplot as plt

In [None]:
labels = ['Spoof', 'Bonafide']
labels2 = ['Antes de Limpieza', 'Despues de limpieza']
data1 = [589212, 22617]
data2 = allData['Label'].value_counts()


plt.bar(labels, data1)
plt.bar(labels, data2)

plt.legend(['Antes de limpieza', 'Despues de limpieza'])

plt.show()

In [None]:
features = allData.columns[:-1]
X = np.array([np.array([row[feature] for feature in features]) for _, row in allData.iterrows()])

In [None]:
X

In [None]:
y = allData['Label'].values

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler



In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)


In [None]:
def scale_sequences(X, scalers):
    for i in range(X.shape[2]):
        X[:, :, i] = scalers[i].fit_transform(X[:, :, i])
    return X

In [None]:
def scale_sequences2(X, a=None):

  reshaped_data = X.reshape(-1, X.shape[-1])

  scaler = StandardScaler()
  scaled_data = scaler.fit_transform(reshaped_data)

  scaled_data = scaled_data.reshape(X.shape)
  return scaled_data

In [None]:
scalers = [StandardScaler() for _ in range(X_train.shape[2])] 
X_train_scaled = scale_sequences(X_train, scalers)
X_test_scaled = scale_sequences(X_test, scalers)


X_train_scaled.shape, X_test_scaled.shape


In [None]:
X_train_scaled = np.transpose(X_train_scaled, (0,2,1))
X_test_scaled = np.transpose(X_test_scaled, (0,2,1))

In [None]:
X_train_scaled.shape

In [None]:
from sklearn.preprocessing import LabelEncoder


In [None]:
label_encoder = LabelEncoder()

y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import matplotlib.pyplot as plt

In [None]:
def create_lstm_model(input_shape):
    model = Sequential()
    model.add(LSTM(128, return_sequences=True, input_shape=input_shape))
    model.add(Dropout(0.5))
    model.add(LSTM(64))
    model.add(Dense(1, activation='sigmoid'))  # For binary classification

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
input_shape = (X_train_scaled.shape[1], X_train_scaled.shape[2])
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

model = create_lstm_model(input_shape)

model.summary()

In [None]:
from io import BytesIO
from PIL import Image
from tensorflow.keras.utils import plot_model


In [None]:
def display_model(model):

    img_data = BytesIO()
    plot_model(model, to_file=img_data, show_shapes=True, show_layer_names=True)
    img_data.seek(0)  


    img = Image.open(img_data)
    plt.imshow(img)
    plt.axis('off')  
    plt.show()

In [None]:
plot_model(model, to_file='prueba.png', show_shapes=True, show_layer_names=True)


In [None]:
history = model.fit(X_train_scaled, y_train_encoded, epochs=20, batch_size=32,
                    validation_split=0.2, callbacks=[early_stopping])

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
y_pred = model.predict(X_test_scaled)
conf_matrix = confusion_matrix(y_test_encoded, y_pred.round())
#print(classification_report(y_test_encoded, y_pred))

In [None]:
test_loss, test_acc = model.evaluate(X_test_scaled, y_test_encoded)


In [None]:
unique, counts = np.unique(y_test_encoded, return_counts=True)

for label, count in zip(unique, counts):
    print(f"Label {label}: {count} samples")

In [None]:
test_loss, test_acc

In [None]:
conf_matrix

In [None]:
model.save('/content/drive/My Drive/ProyectoTesis/FirstLSTMModel.keras')

In [None]:
new_model = tf.keras.models.load_model('/content/drive/My Drive/ProyectoTesis/LSTM/FirstLSTMModel.keras')


In [None]:
from sklearn.metrics import classification_report, confusion_matrix
y_pred = new_model.predict(X_test_scaled)
conf_matrix = confusion_matrix(y_test_encoded, y_pred.round())


In [None]:
conf_matrix

In [None]:
print(classification_report(y_test_encoded, y_pred.round()))

In [None]:
load_models = [(new_model, "LSTM_WITH_ADAM")]

In [None]:
import time

## Response Time

In [None]:
def MeasureResponseTime(loaded_models):
  import time
  rtimes = []
  for model, model_name in loaded_models:
    ttime = 0
    for i in range(1000):
      start_time = time.time()

      first_instance = X_test_scaled[i:i+1]
      first_prediction = model.predict(first_instance)

      end_time = time.time()

      execution_time = end_time - start_time
      ttime += execution_time
    rtimes.append((model_name, ttime/1000))
    print(f"Model: {model_name}\t\t Execution time: {ttime/1000:.6f} seconds")

  return rtimes

In [None]:
def GetConfMatrix(model):
  from sklearn.metrics import classification_report, confusion_matrix
  y_pred = model.predict(X_test_scaled)
  conf_matrix = confusion_matrix(y_test_encoded, y_pred.round())
  return conf_matrix

In [None]:
def GetConfMatrix2(model, X_test, y_test_encoded):
  from sklearn.metrics import classification_report, confusion_matrix
  y_pred = model.predict(X_test)
  conf_matrix = confusion_matrix(y_test_encoded, y_pred.round())
  return conf_matrix

In [None]:
def MetricsDfConverter(classes, metrics):
  cat1, cat2, macro = metrics
  columns = ["Precision", "Recall", "F1-Score"]
  df = pd.DataFrame(metrics, classes, columns)

  for column in columns:
    df[column] = df[column].apply(lambda x: f"{x:.4f}")

  return df

In [None]:
def MetricsCalculator(confusion_matrix, model=None):
  bonafide, spoof = confusion_matrix

  ps = spoof[1]/(spoof[1]+bonafide[1])
  rs = spoof[1]/sum(spoof)
  f1s = 2*(ps*rs)/(ps+rs)

  s_metrics = [ps,rs,f1s]


  pb = bonafide[0]/(bonafide[0]+spoof[0])

  if np.isnan(pb):
    pb = 0

  rb = bonafide[0]/sum(bonafide)
  f1b = 2*(pb*rb)/(pb+rb)

  if np.isnan(f1b):
    f1b = 0

  b_metrics = [pb,rb,f1b]


  macro_p = (ps+pb)/2
  macro_r = (rs + rb)/2
  macro_f1 = (f1s + f1b)/2

  macro_metrics = [macro_p, macro_r, macro_f1]

  classes = ["spoof", "bonafide", "Macro-Avg"]
  metrics = [s_metrics, b_metrics, macro_metrics]

  print(f"\n************ {model} ************")
  print(f"Accuracy: {(spoof[1]+bonafide[0])/(sum(bonafide)+sum(spoof)):.4f}\n")
  print(MetricsDfConverter(classes, metrics))
  print("\n**********************************\n")

  return macro_metrics


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
#categories = ['Kernel radial', 'Kernel polinomial', 'Kernel sigmoide']
#modelos = [svm_rad, svm_pol, svm_sig]


def GraphDifferences(categories, modelos):
  categories = categories
  colors = sns.color_palette("rocket", 3)
  modelos = modelos
  metricas = [[modelo[i] for modelo in modelos] for i in range(3)]

  precision, recall, f1_score = metricas  

  x = np.arange(len(categories))  

  width = 0.2

  fig, ax = plt.subplots(figsize=(8, 6))

  bars1 = ax.bar(x - width, precision, width, label='Precision', color=colors[0])
  bars2 = ax.bar(x, recall, width, label='Recall', color=colors[1])
  bars3 = ax.bar(x + width, f1_score, width, label='F1-Score', color=colors[2])

  ax.set_xlabel('Modelos')
  ax.set_ylabel('Puntaje')
  ax.set_xticks(x)
  ax.set_xticklabels(categories, rotation=90)
  ax.legend(loc='lower left', framealpha=0.95)

  plt.tight_layout()
  plt.show()


In [None]:
MeasureResponseTime(load_models)

In [None]:
start_time = time.time()

first_prediction = new_model.predict(X_test_scaled)

end_time = time.time()

execution_time = end_time - start_time

print(f"Model: LSTM_WITH_ADAM\t\t Execution time: {execution_time} seconds")



In [None]:
len(X_test_scaled)

# More models

In [None]:
from keras.optimizers import Adam
from keras.layers import LSTM, Dropout, Dense, Input
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
def CreateModel(input_shape, learning_rate):
    model = Sequential()
    model.add(Input(shape=input_shape))
    model.add(LSTM(128, return_sequences=True))
    model.add(Dropout(0.5))
    model.add(LSTM(64))
    model.add(Dense(1, activation='sigmoid'))  

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

    return model

In [None]:
models_lrs = []

In [None]:
lrs = [0.0001, 0.0005, 0.001, 0.005, 0.01]
for lr in lrs:
  models_lrs.append((CreateModel(input_shape, lr), lr))

In [None]:
models_lrs

In [None]:
histories = []

In [None]:
import time

In [None]:
for model in models_lrs:
  print(f"**************** {model[1]} *****************")
  st = time.time()
  early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
  history = model[0].fit(X_train_scaled, y_train_encoded, epochs=20, batch_size=32,
                    validation_split=0.2, callbacks=[early_stopping])
  et = time.time()
  training_time = et-st
  print(training_time)
  print(f"**************** {model[1]} *****************\n")


  histories.append((history, *model, training_time))
  del history, early_stopping

### Guardar modelos

In [None]:
path="/content/drive/My Drive/ProyectoTesis/LSTM/LRModelHistories.pkl"

In [None]:
import pickle as pkl

In [None]:
# with open(path, 'wb') as file:  # Open the file in write-binary mode
#   pkl.dump(histories, file)

## Arreglar set de datos

In [None]:
indexes_for_human_dataset = [42253, 44952, 70840, 115974, 164782, 170033, 172663, 253854, 269530, 290934]

In [None]:
new_data = allData.iloc[indexes_for_human_dataset]

In [None]:
new_data

### Pruebas con filtros

In [None]:
import numpy as np
from scipy.ndimage import uniform_filter1d, median_filter

window_size = 11

In [None]:
datos = new_data.copy()

In [None]:
features_media = datos.columns[:-1]

In [None]:
def MeanFilter(row, window_size):
  smoothed_features = uniform_filter1d(row, size=window_size, axis=0)
  return smoothed_features

In [None]:
from scipy.ndimage import median_filter

def MedianFilter(row, window_size):
    smoothed_features = median_filter(row, size=window_size, mode='nearest')
    return smoothed_features

In [None]:
for feature in features_media:
  datos[feature] = datos[feature].apply(MedianFilter, window_size=window_size)

In [None]:
features = datos.columns[:-1]
new_x = np.array([np.array([row[feature] for feature in features]) for _, row in datos.iterrows()])

In [None]:
new_x = scale_sequences2(new_x, scalers)

In [None]:
new_x.shape

In [None]:
# new_x[0][0] #87 frames para el primer dato en la columna zcr

In [None]:
# for i in range(len(new_x[0])):
#   print(new_x[0][i][0]) #Lo mismo que sacar transpuesta y despues new_x[0][0]

In [None]:
new_y = new_data['Label'].values

In [None]:
label_encoder = LabelEncoder()

new_y_encoded = label_encoder.fit_transform(new_y)

In [None]:
new_x = np.transpose(new_x, (0,2,1))

In [None]:
# new_x[1][0] #Cada valor del feature para el primer frame para el primer dato

### Cargar Modelos

In [None]:
with open(path, 'rb') as file:
  lr_models = pkl.load(file)

In [None]:
lr_models

In [None]:
for *_, time in lr_models:
  print(f"Model {_[-1]}: {time/60:.4f}")

In [None]:
conf_matrices = []

In [None]:
for data in lr_models:
  history, model, model_name, training_time = data
  conf_matrix = GetConfMatrix2(model, new_x, new_y_encoded)
  conf_matrices.append((conf_matrix, model_name))

In [None]:
conf_matrices

In [None]:
#Prueba Filtro de media con window_size 7
# [(array([[3, 2],
#          [0, 5]]),
#   0.0001),
#  (array([[2, 3],
#          [0, 5]]),
#   0.0005),
#  (array([[4, 1],
#          [0, 5]]),
#   0.001),
#  (array([[3, 2],
#          [0, 5]]),
#   0.005),
#  (array([[1, 4],
#          [0, 5]]),
#   0.01)]

#Median filter window_size 3
# [(array([[3, 2],
#          [0, 5]]),
#   0.0001),
#  (array([[3, 2],
#          [0, 5]]),
#   0.0005),
#  (array([[3, 2],
#          [0, 5]]),
#   0.001),
#  (array([[3, 2],
#          [0, 5]]),
#   0.005),
#  (array([[2, 3],
#          [0, 5]]),
#   0.01)]

#Median filter window_size 11
# [(array([[2, 3],
#          [0, 5]]),
#   0.0001),
#  (array([[3, 2],
#          [0, 5]]),
#   0.0005),
#  (array([[4, 1],
#          [0, 5]]),
#   0.001),
#  (array([[2, 3],
#          [0, 5]]),
#   0.005),
#  (array([[2, 3],
#          [0, 5]]),
#   0.01)]

In [None]:
cats = ['bonafide', 'spoof', 'macro-avg']
metrics_lrs = []

In [None]:
for model in conf_matrices:
  metrics_lrs.append(MetricsCalculator(*model))

In [None]:
lr_models

In [None]:
ld_mod = [(item[1], item[2]) for item in lr_models]

In [None]:
rtimes = MeasureResponseTime(ld_mod)

In [None]:
rtimes

In [None]:
for model_name, time in rtimes:
  print(f"Model {model_name}, Execution time: {time:.6f}")

In [None]:
GraphDifferences(lrs, metrics_lrs)