In [None]:
import pandas as pd
import numpy as np
import ast
import matplotlib.pyplot as plt
import neurokit2 as nk
import json
import os

In [None]:
df = pd.read_pickle('/path_to_heartbeats.pkl')

In [None]:
total_db_new = df[df['label']==0].head(0)
for i in range(4):
 total_db_new = pd.concat([total_db_new,df[df['label']==i].sample(n=5000)],ignore_index=True)

In [None]:
def resample_signal(row):
    original_signal = row['signal']
    resampled_signal = nk.signal_resample(original_signal, sampling_rate=250, desired_sampling_rate= 360, method="FFT")
    return pd.Series({'signal': resampled_signal, 'label': row['label']})

In [None]:
resampled_df = total_db_new.apply(resample_signal, axis=1)
del total_db_new
total_db_new = resampled_df

# Split and Normalization

In [None]:
from sklearn.model_selection import train_test_split

train_df, temp_df = train_test_split(total_db_new, test_size=0.3, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=(2/3), random_state=42)

train_df.reset_index(drop=True, inplace=True)
val_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)

In [None]:
def find_max_min(current_df):
  min_value = current_df['signal'].apply(lambda x: min(x)).min()
  max_value = current_df['signal'].apply(lambda x: max(x)).max()
  return min_value, max_value

min_train_df, max_train_df = find_max_min(train_df)
print(min_train_df, max_train_df)

min_val_df, max_val_df = find_max_min(val_df)
print(min_val_df, max_val_df)

min_test_df, max_test_df = find_max_min(test_df)
print(min_test_df, max_test_df)

In [None]:
train_df['signal'] = train_df['signal'].apply(lambda x: [(item - min_train_df) / (max_train_df - min_train_df) for item in x])

val_df['signal'] = val_df['signal'].apply(lambda x: [(item - min_val_df) / (max_val_df - min_val_df) for item in x])

test_df['signal'] = test_df['signal'].apply(lambda x: [(item - min_test_df) / (max_test_df - min_test_df) for item in x])


#Quantization


In [None]:
# from https://github.com/joaomrcarvalho/diffquantizer.git

class DiffQuantizer:
    def __init__(self, alphabet_size, average_over=1, filter=False, breakpoints=None, use_diffs=True):

        self.alphabet_size = alphabet_size
        self.average_over = average_over
        self.use_filter = filter
        self.breakpoints = breakpoints
        self.use_diffs = use_diffs
        # print('self.breakpoints:',self.breakpoints)

    def preprocess(self, tmp):
        if self.average_over != 1:
            tmp = self._average_over_n(tmp, self.average_over)

        if self.use_filter:
            tmp = self._filter_signal(tmp)

        if self.use_diffs:
            tmp = self._diff_signal(tmp)

        return tmp

    def perform_quantization(self, tmp, breakpoints=None):
        self.breakpoints = breakpoints
        # print('perform_quantization, self.breakpoints:',self.breakpoints)
        tmp = self.preprocess(tmp)
        result = self._quantize_with_breakpoints(tmp)
        return result

    def learn_breakpoints(self, arr):
        res = self.preprocess(arr)

        sorted_array = np.sort(res)

        length = len(sorted_array)

        probs = [1 / self.alphabet_size for _ in range(self.alphabet_size)]
        cum_sum_breakpoints = [int(sum(probs[0:i + 1]) * length - 1) for i in range(len(probs))]
        cum_sum_breakpoint_values = sorted_array[cum_sum_breakpoints]

        cum_sum_breakpoint_values[-1] = 1e+100

        self.breakpoints = cum_sum_breakpoint_values
        # print('learn_breakpoints,self.breakpoints',self.breakpoints)

        return cum_sum_breakpoint_values

    # vectorized use
    @staticmethod
    def _breakpoint_to_letter(float_num, breakpoints):
        # print('float_num:',float_num,'breakpoints:',breakpoints)
        # print(list((breakpoints.index(obj) for obj in breakpoints if float_num < obj)))
        int_val = next((breakpoints.index(obj) for obj in breakpoints if float_num < obj))
        # print(int_val,list((breakpoints.index(obj) for obj in breakpoints if float_num < obj)))
        # A + int_val
        return chr(65 + int_val)

    def _quantize_with_breakpoints(self, tmp):
        breakpoints = self.breakpoints
        vect_breakpoint_to_letter = np.vectorize(self._breakpoint_to_letter, excluded=['breakpoints'])
        # print(tmp,breakpoints,self._breakpoint_to_letter,vect_breakpoint_to_letter)
        tmp = vect_breakpoint_to_letter(tmp, breakpoints=list(breakpoints))
        # tmp = vect_breakpoint_to_letter(tmp, breakpoints=breakpoints)
        return tmp

    @staticmethod
    def _read_csv_file(input_file):
        tmp_file_content = pd.read_csv(input_file, sep="\n", header=None, dtype=np.float64)
        return np.array(tmp_file_content)

    ## @staticmethod
    # def _filter_signal(tmp):
    #     return butter_lowpass_filter(tmp)

    @staticmethod
    def _average_over_n(tmp, n):
        return np.array([np.average(tmp[i:i + n]) for i in range(0, len(tmp), n)])

    @staticmethod
    def _diff_signal(tmp):
        res = np.diff(tmp)
        return np.insert(res, 0, 0.0)

In [None]:
import math
import csv

class Preprocessing():

  def __init__(self, input_file, discretizition_factor, max_window_size):
    self.input_file = input_file
    self.discretizition_factor = discretizition_factor
    self.max_window_size = max_window_size


  def create_window(self):

    ecg_window = [] #ecg_window: list of windows

    mlen=0
    ecg_list = self.input_file['signal']
    if len(ecg_list) > mlen:
      mlen= len(ecg_list)

    if mlen > self.max_window_size:
      window_size = self.max_window_size
    else:
      window_size = mlen

    num_lines = math.floor(len(ecg_list)/window_size) #in this case: 1

    for i in range(num_lines):
      tmp_list = ecg_list[i*window_size:(i+1)*window_size]
      ecg_window.append(tmp_list)

    return ecg_window

  def change_to_alphabet(self, quantizer, normalized_list):
    qtz_signal = []
    labels = []
    qtz = quantizer
    for i in range(len(normalized_list)): #i is for each line
      r = qtz.perform_quantization(np.array(normalized_list[i]),breakpoints=qtz.breakpoints)
      # print('result:',r.shape)
      # make r from list of chars  to string a chars by ''.joint(r)
      qtz_signal.append(''.join(r))
      labels.append(self.input_file['label'])

    return qtz_signal, labels

In [None]:
### lloyd_max
def discretization_lloyd_max(discretizition_factor, total_data):
  qtz_signal = []
  labels = []
  qtz=DiffQuantizer(alphabet_size=discretizition_factor,breakpoints=None,use_diffs=False)
  qtz.learn_breakpoints(np.array(total_data))
  return qtz

In [None]:
def run_Preprocessing(db, max_window_size=7500, discretizition_factor=100):
    discretizition_factor = discretizition_factor
    max_window_size = max_window_size
    nl_list, total_data = [], []
    r_list, labels = [], []

    for n in range(len(db)):
      pre = Preprocessing(db.loc[n], discretizition_factor, max_window_size)
      nl = pre.create_window()
      nl_list.append(nl)

    for i in range(len(nl_list)):
        total_data.extend(nl_list[i][0])

    quantize_max_lloyd_on_total_data = discretization_lloyd_max(discretizition_factor, total_data)

    for n in range(len(db)):
      pre = Preprocessing(db.loc[n], discretizition_factor, max_window_size)
      nl = pre.create_window()
      r, l = pre.change_to_alphabet(quantize_max_lloyd_on_total_data, nl)
      r_list.extend(r)
      labels.extend(l)
    print(n, " : done")

    return r_list, labels

`check max_window_size`

In [None]:
r_list_train, labels_train = run_Preprocessing(db=train_df, max_window_size=1080, discretizition_factor=100)

r_list_val, labels_val = run_Preprocessing(db=val_df, max_window_size=1080, discretizition_factor=100)

r_list_test, labels_test = run_Preprocessing(db=test_df, max_window_size=1080, discretizition_factor=100)

# Tokenizer

In [None]:
from transformers import AutoTokenizer, AutoModel

In [None]:
tokenizer_ft = AutoTokenizer.from_pretrained(pretrained_model_name_or_path="/path...")

In [None]:
def tokenize_function(examples, max_length = 512):
    return tokenizer_ft(examples, padding="max_length", truncation=True, max_length=max_length)

In [None]:
tokenized_dataset_train = list(map(tokenize_function, r_list_train))

tokenized_dataset_val = list(map(tokenize_function, r_list_val))

tokenized_dataset_test = list(map(tokenize_function, r_list_test))

## input_ids, attention_masks, labels

In [None]:
def return_ids(tokenized_dataset):

  dataset = []
  input_ids = []
  attention_masks = []

  for i in range(len(tokenized_dataset)):

      dataset.append(tokenized_dataset[i])
      del dataset[-1]['token_type_ids']

      input_ids.append(np.array(dataset[-1]['input_ids'], dtype=np.int32))

      attention_masks.append(np.array(dataset[-1]['attention_mask'], dtype=bool))

      dataset = []

  input_ids = np.array(input_ids)
  attention_masks = np.array(attention_masks)

  return input_ids, attention_masks

In [None]:
input_ids_train, attention_masks_train = return_ids(tokenized_dataset_train)

input_ids_val, attention_masks_val = return_ids(tokenized_dataset_val)

input_ids_test, attention_masks_test = return_ids(tokenized_dataset_test)

In [None]:
labels_train = np.array(labels_train,dtype=np.int8)

labels_val = np.array(labels_val,dtype=np.int8)

labels_test = np.array(labels_test,dtype=np.int8)

# DataLoader

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
bert_model = AutoModel.from_pretrained("/path...")

In [None]:
bert_model = bert_model.to(device)

In [None]:
bert_model.eval()

In [None]:
for param in bert_model.parameters():
  param.requires_grad = False

In [None]:
batch_size = 8

In [None]:
dataset_ = TensorDataset(torch.tensor(input_ids_train), torch.tensor(labels_train),
                         torch.tensor(attention_masks_train))
dataloader = DataLoader(dataset_, batch_size=batch_size, shuffle = True)

In [None]:
dataset_valid = TensorDataset(torch.tensor(input_ids_val), torch.tensor(labels_val),
                          torch.tensor(attention_masks_val))
dataloader_valid = DataLoader(dataset_valid, batch_size=batch_size, shuffle = True)

In [None]:
dataset_test = TensorDataset(torch.tensor(input_ids_test), torch.tensor(labels_test),
                          torch.tensor(attention_masks_test))
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle = False)

# BertBiLSTMClassifier

In [None]:
class BertBiLSTMClassifier(nn.Module):
    def __init__(self, num_classes, bert_frozen_layers=3, input_size=768, lstm_hidden_size=128):
        # you can set bert_frozen_layers to an arbitrary number
        super(BertBiLSTMClassifier, self).__init__()

        # HeartBERT model with frozen layers
        self.bert = AutoModel.from_pretrained(pretrained_model_name_or_path="/path...")

        modules = [self.bert.embeddings, *self.bert.encoder.layer[:bert_frozen_layers]]
        for module in modules:
          for param in module.parameters():
            param.requires_grad = False
        # Bi-LSTM layer
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=lstm_hidden_size, bidirectional=True, batch_first=True)

        # Fully connected layer for classification
        self.fc = nn.Linear(lstm_hidden_size * 2, num_classes)
      #  self.softmax = nn.Softmax(dim=1)

    def forward(self, input_ids):
        # BERT forward pass
        bert_output = self.bert(input_ids)[0]
        # Bi-LSTM forward pass
        lstm_out, _ = self.lstm(bert_output)
        # Use the last hidden state from Bi-LSTM
        lstm_last_hidden_state = lstm_out[:, -1, :]
        # Classification using fully connected layer
        res = self.fc(lstm_last_hidden_state)

      #  res = self.softmax(logits)

        return res

In [None]:
model = BertBiLSTMClassifier(4)
model = model.to(device)

In [None]:
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
trainable_params

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=3e-4) # setup the proper lr

# Metrics

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

def compute_metrics(eval_pred, labels):

    predictions = np.argmax(eval_pred, axis=-1)

    # Compute precision, recall, F1, and accuracy
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average=None)
    accuracy = accuracy_score(labels, predictions)

    # Compute micro and macro averages
    micro_precision, micro_recall, micro_f1, _ = precision_recall_fscore_support(labels, predictions, average='micro')
    macro_precision, macro_recall, macro_f1, _ = precision_recall_fscore_support(labels, predictions, average='macro')

    print("Class-wise Metrics:")
    for i in range(len(precision)):
        print(f"Class {i+1}: Precision={precision[i]:.4f}, Recall={recall[i]:.4f}, F1={f1[i]:.4f}")

    print("\nMicro Average Metrics:")
    print(f"Precision={micro_precision:.4f}, Recall={micro_recall:.4f}, F1={micro_f1:.4f}")

    print("\nMacro Average Metrics:")
    print(f"Precision={macro_precision:.4f}, Recall={macro_recall:.4f}, F1={macro_f1:.4f}")

    print("\nAccuracy:")
    print(f"Accuracy={accuracy:.4f}")

def display_confusion_matrix(eval_pred, labels):
    num_classes = 4

    predictions = np.argmax(eval_pred, axis=-1)

    cm = confusion_matrix(labels, predictions)

    # Plot confusion matrix
    plt.figure(figsize=(num_classes, num_classes))

    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
                xticklabels=np.arange(1, num_classes+1),
                yticklabels=np.arange(1, num_classes+1))

    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()


# Train

In [None]:
loss_epoch_train = []
loss_epoch_valid = []
num_epochs = 50

In [None]:
from numpy import mean
total_train_acc = np.zeros((num_epochs,))
total_valid_acc = np.zeros((num_epochs,))
for epoch in range(num_epochs):
  loss_batch_train = []
  correct_train = []
  correct_valid = []
  accuracy_train = []
  model.train()
  for batch in dataloader:

    inputs, labels, _ = batch

    inputs = inputs.to(device)
    labels = labels.to(device)
    outputs = model(inputs)
    loss = criterion(outputs.to(device), labels.type(torch.LongTensor).to(device))
    predicted = torch.argmax(outputs.data, dim=1)
    correct_train.extend(predicted.eq(labels.to(device).data).float())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    loss_batch_train.append(loss.item())

  # if (epoch%10==0):
  #   torch.save(model.state_dict(), '/content/drive/MyDrive/'+str(epoch)+'.pt')

  total_train_acc[epoch] = torch.tensor(correct_train).mean().item()
  loss_train_per_epoch = mean(loss_batch_train)
  loss_epoch_train.append(loss_train_per_epoch)
  print('train-loss',epoch,':', loss_epoch_train[-1])

  model.eval()
  loss_batch_valid = []

  for batch in dataloader_valid:
    inputs, labels, _ = batch
    inputs = inputs.to(device)
    labels = labels.to(device)
    outputs = model(inputs)
    loss = criterion(outputs.to(device), labels.type(torch.LongTensor).to(device))
    predicted = torch.argmax(outputs.data, dim=1)
    correct_valid.extend(predicted.eq(labels.to(device).data).float())
    loss_batch_valid.append(loss.item())

  total_valid_acc[epoch] = torch.tensor(correct_valid).mean().item()
  loss_valid_per_epoch = mean(loss_batch_valid)
  loss_epoch_valid.append(loss_valid_per_epoch)
  print(total_train_acc,total_valid_acc)
  print('valid-loss',epoch,':',loss_epoch_valid[-1])
  print('**********')


In [None]:
print(total_train_acc,total_valid_acc)

## Plot

In [None]:
import matplotlib.pyplot as plt
plt.plot(loss_epoch_train,'r')
plt.plot(loss_epoch_valid,'b')
plt.title('train-loss vs eval-loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['loss_train','loss_eval']);

In [None]:
plt.plot(total_train_acc)
plt.plot(total_valid_acc)

# Test

In [None]:
model.eval()
loss_batch_test = []

outputs_temp=torch.zeros((1,outputs.size()[1])).to(device)
labels_temp = torch.zeros((1,))

for batch in dataloader_test:
  with torch.no_grad():
    inputs, labels, _ = batch
    inputs = inputs.to(device)
    labels = labels.to(device)
    # attention = attention.type(torch.IntTensor).to(device)
    outputs = model(inputs)
    predicted = torch.argmax(outputs.data, dim=1)
    correct_train.extend(predicted.eq(labels.to(device).data).float())

  outputs_temp = torch.cat((outputs_temp,outputs),dim=0)
  labels_temp = torch.cat((labels_temp,labels.to('cpu')),dim=0)

  loss = criterion(outputs.to(device), labels.type(torch.LongTensor).to(device))
  loss_batch_test.append(loss.item())

print('test-loss: ', np.mean(loss_batch_test))

In [None]:
compute_metrics(outputs_temp[1:].to('cpu'),labels_temp[1:].to('cpu'))

In [None]:
display_confusion_matrix(outputs_temp[1:].to('cpu'),labels_temp[1:].to('cpu'))