# Подключение основных библиотек и загрузка данных


### Для Google Colaboratory

In [1]:
# Подключение Google Drive к виртуальной машине
from google.colab import drive
drive.mount('/content/drive')

# Копирование данных с Google Drive на локальный диск виртуальной машины.
!cp -r /content/drive/MyDrive/practice_2022-2023/data/ICBEBnpy/ .
#!cp -r /content/drive/MyDrive/practice_2022-2023/data/ptbxlnpy/ .

Mounted at /content/drive


### Подключение пакетов

In [2]:
# Для работы с данными
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt   # plotting
import seaborn as sns   # plotting heatmap

# Для работы с моделями
import tensorflow as tf
from tensorflow import keras
from keras import layers

# Для метрик
from keras import backend as K
from keras.metrics import AUC, Recall, Precision, Accuracy, TruePositives, TrueNegatives, FalsePositives, FalseNegatives
from sklearn.metrics import fbeta_score, precision_score, recall_score, accuracy_score, roc_auc_score
from sklearn.metrics import auc, roc_curve

# Функции
# Загрузка ICBEB
def load_ICBEB(task):
  if task == 'diag':
    X_train = np.load('ICBEBnpy/X_train_ICBEB_diag.npy')
    y_train = np.load('ICBEBnpy/y_train_ICBEB_diag.npy')
    X_test = np.load('ICBEBnpy/X_val_ICBEB_diag.npy')
    y_test = np.load('ICBEBnpy/y_val_ICBEB_diag.npy')
  elif task == 'superdiag':
    X_train = np.load('ICBEBnpy/X_train_ICBEB_superdiag.npy')
    y_train = np.load('ICBEBnpy/y_train_ICBEB_superdiag.npy')
    X_test = np.load('ICBEBnpy/X_val_ICBEB_superdiag.npy')
    y_test = np.load('ICBEBnpy/y_val_ICBEB_superdiag.npy')
  elif task == 'subdiag':
    X_train = np.load('ICBEBnpy/X_train_ICBEB_subdiag.npy')
    y_train = np.load('ICBEBnpy/y_train_ICBEB_subdiag.npy')
    X_test = np.load('ICBEBnpy/X_val_ICBEB_subdiag.npy')
    y_test = np.load('ICBEBnpy/y_val_ICBEB_subdiag.npy')
  elif task == 'rhythm':
    X_train = np.load('ICBEBnpy/X_train_ICBEB_rhythm.npy')
    y_train = np.load('ICBEBnpy/y_train_ICBEB_rhythm.npy')
    X_test = np.load('ICBEBnpy/X_val_ICBEB_rhythm.npy')
    y_test = np.load('ICBEBnpy/y_val_ICBEB_rhythm.npy')
  elif task == 'form':
    X_train = np.load('ICBEBnpy/X_train_ICBEB_form.npy')
    y_train = np.load('ICBEBnpy/y_train_ICBEB_form.npy')
    X_test = np.load('ICBEBnpy/X_val_ICBEB_form.npy')
    y_test = np.load('ICBEBnpy/y_val_ICBEB_form.npy')
  #print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
  return X_train, y_train, X_test, y_test

# Загрузка ptbxl
def load_ptbxl(task):
  if task == 'diag':
    X_train = np.load('ptbxlnpy/X_train_ptbxl_diag.npy')
    y_train = np.load('ptbxlnpy/y_train_ptbxl_diag.npy')
    X_test = np.load('ptbxlnpy/X_val_ptbxl_diag.npy')
    y_test = np.load('ptbxlnpy/y_val_ptbxl_diag.npy')
  elif task == 'superdiag':
    X_train = np.load('ptbxlnpy/X_train_ptbxl_superdiag.npy')
    y_train = np.load('ptbxlnpy/y_train_ptbxl_superdiag.npy')
    X_test = np.load('ptbxlnpy/X_val_ptbxl_superdiag.npy')
    y_test = np.load('ptbxlnpy/y_val_ptbxl_superdiag.npy')
  elif task == 'subdiag':
    X_train = np.load('ptbxlnpy/X_train_ptbxl_subdiag.npy')
    y_train = np.load('ptbxlnpy/y_train_ptbxl_subdiag.npy')
    X_test = np.load('ptbxlnpy/X_val_ptbxl_subdiag.npy')
    y_test = np.load('ptbxlnpy/y_val_ptbxl_subdiag.npy')
  elif task == 'rhythm':
    X_train = np.load('ptbxlnpy/X_train_ptbxl_rhythm.npy')
    y_train = np.load('ptbxlnpy/y_train_ptbxl_rhythm.npy')
    X_test = np.load('ptbxlnpy/X_val_ptbxl_rhythm.npy')
    y_test = np.load('ptbxlnpy/y_val_ptbxl_rhythm.npy')
  elif task == 'form':
    X_train = np.load('ptbxlnpy/X_train_ptbxl_form.npy')
    y_train = np.load('ptbxlnpy/y_train_ptbxl_form.npy')
    X_test = np.load('ptbxlnpy/X_val_ptbxl_form.npy')
    y_test = np.load('ptbxlnpy/y_val_ptbxl_form.npy')
  #print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
  return X_train, y_train, X_test, y_test

# Компиляция и обучение модели
def AUC_Keras(y_true, y_pred):
    auc = keras.metrics.AUC(y_true, y_pred)[1]
    K.get_session().run(tf.local_variables_initializer())
    return auc

# Компиляция и обучение модели
def compile_fit(model, X_train, y_train, X_val = None, y_val = None, validation_split = 0.0, early_stopping = None, model_checkpoint = None):
  model.compile(loss = keras.losses.CategoricalCrossentropy(),
                optimizer=tf.optimizers.Adam(),
                metrics=['AUC'])
  
  if X_val == None:
    history = model.fit(X_train, y_train, 
                        epochs = 30, 
                        validation_data = None, 
                        validation_split=validation_split, 
                        callbacks=[model_checkpoint, early_stopping])
  else:
    history = model.fit(X_train, y_train, 
                        epochs = 30, 
                        validation_data = (X_val, y_val), 
                        validation_split=0.0, 
                        callbacks=[model_checkpoint, early_stopping])
  return history

# TP TN FP FN
def tp_tn_fp_fn(y_true, y_pred):
  TP = TruePositives()
  TN = TrueNegatives()
  FP = FalsePositives()
  FN = FalseNegatives()
  TP.update_state(y_true, y_pred)
  TN.update_state(y_true, y_pred)
  FP.update_state(y_true, y_pred)
  FN.update_state(y_true, y_pred)
  return TP.result().numpy(),  TN.result().numpy(),  FP.result().numpy(), FN.result().numpy() 

# Подсчет метрик
def calc_metrics(t, p, flag = 0): # t - y_true, p - y_pred
  y_true=np.argmax(t, axis=1)
  y_pred=np.argmax(p, axis=1)
  beta = 2

  f2_score = fbeta_score(y_true, y_pred, average='macro', beta=2)
  precision = precision_score(y_true, y_pred, average='macro')
  recall = recall_score(y_true, y_pred, average='macro')
  TP, TN, FP, FN = tp_tn_fp_fn(t, p)
  g2_score = TP/(TP+FP+beta*FN)

  if flag == 0:
    return f2_score, g2_score
  elif flag == 1:
    return f2_score, g2_score, precision, recall

  #return f2_score, g2_score, AUC_sklearn

# Таблица результатов
table_res_ICBEB = pd.DataFrame(columns = ('AUC', 'F2', 'G2'))

# Занесение новых результатов в таблицу
def edit_table(table, model, X, y, index_name): # X - X_test, y - y_test 
  score = model.evaluate(X, y)
  y_pr = model.predict(X) # y_pr - y_test_pred
  f2_score, g2_score = calc_metrics(y, y_pr, flag = 0)
  list_metrics = [f2_score, g2_score, score[1]]
  table.loc[index_name] = list_metrics
  return table

# График loss и accuracy
def plot_loss_and_accuracy_curves(_history):
  fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(18,6))
  axs[0].plot(_history.history['loss'], color='b', label='Training loss')
  axs[0].plot(_history.history['val_loss'], color='r', label='Validation loss')
  axs[0].set_title("Loss curves")
  axs[0].legend(loc='best', shadow=True)
  axs[1].plot(_history.history['auc'], color='b', label='Training accuracy')
  axs[1].plot(_history.history['val_auc'], color='r', label='Validation accuracy')
  axs[1].set_title("Accuracy curves")
  axs[1].legend(loc='best', shadow=True)
  plt.show()

# Работа с моделями lstm и lstm_bidir
def type_comp_fit_save_model_score(table, X_train, y_train, X_test, y_test, type_model, save_name, index_model_task):
  # Уточняю количество классов
  num_classes = y_train.shape[1]

  # Выбор архитектуры модели
  if type_model == 'lstm':
    model = keras.Sequential()
    model.add(layers.LSTM(input_shape=(1000, 12), units=256,
                   return_sequences=True,
                   stateful=False, unroll=False
    ))
    model.add(layers.LeakyReLU())
    model.add(layers.LSTM(units=256,
                   return_sequences=False,
                   stateful=False, unroll=False
    ))
    model.add(layers.LeakyReLU())
    model.add(layers.Dense(units=num_classes, activation='softmax'))
    print(model.summary())

    # Реализация раннего прекращения.
    checkpoint_filepath = './checkpoint_lstm/'
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath,
                                                          save_weights_only=True,
                                                          save_best_only=True)
    early_stopping = keras.callbacks.EarlyStopping(patience=15,
                                                  restore_best_weights=True)
  elif type_model == 'lstm_bidir':
    model = keras.Sequential()
    model.add(layers.Bidirectional(layers.LSTM(input_shape=(1000, 12), units=256,
                         return_sequences=True,
                         stateful=False, unroll=False
                         )))
    model.add(layers.LeakyReLU())
    model.add(layers.Bidirectional(layers.LSTM(units=256,
                         return_sequences=False,
                         stateful=False, unroll=False
                         )))
    model.add(layers.LeakyReLU())
    model.add(layers.Dense(units=num_classes, activation='softmax'))

    model.build(input_shape = (None, 1000, 12)) # `input_shape` is the shape of the input data
    print(model.summary())

    # Реализация раннего прекращения.
    checkpoint_filepath = './checkpoint_lstm_bidir/'
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath,
                                                          save_weights_only=True,
                                                          save_best_only=True)
    early_stopping = keras.callbacks.EarlyStopping(patience=15,
                                                  restore_best_weights=True)
  
  # Обучение
  History = compile_fit(model, X_train, y_train, validation_split=0.1 ,early_stopping=early_stopping, model_checkpoint=model_checkpoint)

  # Сохранение модели
  model.save_weights(save_name)

  # Построение графика
  plot_loss_and_accuracy_curves(History)

  # Сохранение в таблицу
  table = edit_table(table, model, X_test, y_test, index_model_task)
  return table
                        
tf.random.set_seed(42)
%matplotlib inline

# Работа с lstm и lstm_bidir

### lstm

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'diag')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm', 'lstm_diag.h5', 'lstm_diag')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'superdiag')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm', 'lstm_superdiag.h5', 'lstm_superdiag')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'subdiag')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm', 'lstm_subdiag.h5', 'lstm_subdiag')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'rhythm')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm', 'lstm_rhythm.h5', 'lstm_rhythm')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'form')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm', 'lstm_diag.h5', 'lstm_form')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

### lstm_bidir

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'diag')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm_bidir', 'lstm_bidir_diag.h5', 'lstm_bidir_diag')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'superdiag')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm_bidir', 'lstm_bidir_superdiag.h5', 'lstm_bidir_superdiag')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'subdiag')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm_bidir', 'lstm_bidir_subdiag.h5', 'lstm_bidir_subdiag')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'rhytm')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm_bidir', 'lstm_bidir_rhytm.h5', 'lstm_bidir_rhytm')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

In [None]:
X_train, y_train, X_test, y_test = load_ICBEB(task = 'form')
table_res_ICBEB = type_comp_fit_save_model_score(table_res_ICBEB, X_train, y_train, X_test, y_test, 'lstm_bidir', 'lstm_bidir_form.h5', 'lstm_bidir_form')
del(X_train)
del(y_train)
del(X_test)
del(y_test)

### Сохранение результатов в формат .csv

In [None]:
table_res_ICBEB.to_csv('table_res_ICBEB.csv')