In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, TimeDistributed
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
import seaborn as sns
import tensorflow as tf
import os

In [None]:
data_path1 = '/kaggle/input/csv-file/File CSV/MIT-BIH/'
data_path2 = '/kaggle/input/transfer-data/'
window_input= 40
window_out= 200
train_size = 0.8
test_size = 1 - train_size
data_set = {
  0: "testset",
  1: "trainset"
}

In [None]:
def get_data(data_path,istrainset):    
    missing_file = 0
    total_sample = 0
    directory = f"{data_path}{data_set[istrainset]}/"
    X, y = [], []
    for filename in os.listdir(directory):
        f = os.path.join(directory, filename)
        if os.path.isfile(f):
            df = pd.read_csv(f, header=None)
            data=df.drop(columns=187)
            data=data.values
            # Số lượng lặp qua dữ liệu
            num_samples = len(data) - window_input - window_out 
            # print("----------------------")
            # print("🍒Len data:", len(data))
            # print("🍒Num_samples:", num_samples)
            # Đối với tập train
            # Tạo dữ liệu train từ cửa sổ trượt
            if(num_samples>0):
                total_sample = total_sample + num_samples
                for i in range(num_samples):
                    X_window = data[i:i+window_input]
                    y_window = data[i+window_input+window_out:i+window_input+window_out+1]

                    X.append(X_window)
                    y.append(y_window)
            else:
                missing_file = missing_file + 1
    print(f"Num of file can not use due to its missing of length: {missing_file}")
    print(f"Number of sample: {len(y)}/{len(X)}/{total_sample}")
    return X,y

In [None]:
X_train1, y_train1 = get_data(data_path2,0)

In [None]:
class Dataset:
    def __init__(self, data, label):
        self.data = np.array(data)
        self.label = np.concatenate(label)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        # read data
        X = self.data[i]
        y = self.label[i]
        return X, y

In [None]:
class Dataloader(tf.keras.utils.Sequence):
    def __init__(self, dataset, batch_size,size):
        self.dataset = dataset
        self.batch_size = batch_size
        self.size= size

    def __getitem__(self, i):
        # collect batch data
        start = i * self.batch_size
        stop = (i + 1) * self.batch_size
        data = []
        for j in range(start, stop):
            data.append(self.dataset[j])

        batch = [np.stack(samples, axis=0) for samples in zip(*data)]
        return tuple(batch)

    def __len__(self):
        return self.size //self.batch_size

In [None]:
train_transfer_dataset = Dataset(X_train1, y_train1)

In [None]:
train_transferloader = Dataloader(train_transfer_dataset, 16,len(train_transfer_dataset))

In [None]:
# Tạo một mô hình LSTM
model = Sequential()
model.add(LSTM(64,activation='relu' ,input_shape=(40, 187), return_sequences=True))  # Đặt input_shape phù hợp với kích thước của mảng X_train
# model.add(LSTM(64,activation='relu' ,input_shape=(40, 187)))
model.add(LSTM(32, activation='relu'))
# model.add(Dense(187))# Đặt lớp Dcense phù hợp với kích thước của mảng y_train
model.add(Dense(187,activation='sigmoid'))
# Biên dịch mô hình
model.compile(optimizer='adam', loss='mean_squared_error', metrics = ['accuracy'])

In [None]:
model.summary()

In [None]:
# Frozen base_network
for layer in model.layers[:1]:
  layer.trainable = False

for layer in model.layers:
  print('Layer: {} ; Trainable: {}'.format(layer, layer.trainable))


In [None]:
model.fit(train_transferloader,epochs=1, verbose=1)

In [None]:
for layer in model.layers[:1]:
  layer.trainable = True

for layer in model.layers:
  print('Layer: {} ; Trainable: {}'.format(layer, layer.trainable))

In [None]:
X_train, y_train = get_data(data_path1,1)
X_test, y_test = get_data(data_path1,0)

In [None]:
train_dataset = Dataset(X_train, y_train)
test_dataset = Dataset(X_test, y_test)

In [None]:
train_loader = Dataloader(train_dataset, 16,len(train_dataset))
test_loader = Dataloader(test_dataset,16,len(test_dataset))

In [None]:
model.fit(train_loader,validation_data=test_loader,epochs=1, verbose=1)

In [None]:
# Dự đoán trên tập kiểm tra
y_pred = model.predict(test_dataset[:][0])

In [None]:
from sklearn.metrics import mean_squared_error, r2_score,mean_absolute_error
# Độ đo Mean Squared Error (MSE)
mse = mean_squared_error(test_dataset[:][1], y_pred)
print(f"Mean Squared Error (MSE): {mse}")

# Độ đo R-squared (R2)
r2 = r2_score(test_dataset[:][1], y_pred)
print(f"R-squared (R2): {r2}"),mean_absolute_error

In [None]:
# Độ đo Mean Squared Error (MSE)
mse = mean_squared_error(test_dataset[:][1], y_pred)

# Tính Mean Absolute Error (MAE)
mae = mean_absolute_error(test_dataset[:][1], y_pred)
print(f"Mean Absolute Error (MAE): {mae}")

# Tính Root Mean Squared Error (RMSE)
rmse = np.sqrt(mse)
print(f"Root Mean Squared Error (RMSE): {rmse}")

In [None]:
import matplotlib.pyplot as plt

# Chọn một ví dụ cụ thể để trực quan hóa
example_index = 40  # Thay đổi chỉ số ví dụ nếu cần

# Lấy giá trị dự đoán và giá trị thực tế tương ứng
y_pred_example = y_pred[example_index]
y_test_example = test_dataset[:][1][example_index]

# Vẽ biểu đồ cho giá trị dự đoán (màu xanh) và giá trị thực tế (màu đỏ)
plt.figure(figsize=(12, 6))
plt.plot(y_pred_example, label='Dự đoán', color='blue')
plt.plot(y_test_example, label='Thực tế', color='red')
plt.xlabel('Cột giá trị')
plt.ylabel('Giá trị')
plt.title('So sánh Dự đoán và Thực tế cho ví dụ cụ thể')
plt.legend()
plt.grid()
plt.show()
