In [10]:
import numpy as np
import scipy as sp
import pandas as pd

from scipy.interpolate import interp1d

In [20]:

def slide_func(data, window_size, iter):
    if iter >= window_size:
        if iter % (1 * 512) == 0:
            sliding_window_start = iter - window_size
            sliding_window_end = iter
            sliding_window = np.array(data[sliding_window_start:sliding_window_end])  # sliding_window ~ y
            return sliding_window
        return None
    return None

def filter_data(data):
    # Bandpass filter
    band = [0.5 / (0.5 * 512), 40 / (0.5 * 512)]
    b, a = sp.signal.butter(4, band, btype='band', analog=False, output='ba')
    data = sp.signal.lfilter(b, a, data)

    # Filter for EMG by interpolated
    filtered_data = data[(np.abs(data) <= 256)]
    x = np.arange(len(filtered_data))
    interpolated_data = interp1d(x, filtered_data)(np.linspace(0, len(filtered_data) - 1, len(data)))
    return interpolated_data

def hjorth_parameters(signal):
    diff1 = np.diff(signal) # Tính đạo hàm bậc nhất
    diff2 = np.diff(diff1)  # Tính đạo hàm bậc hai

    activity = np.var(signal)      # Activity (A)
    mobility = np.sqrt(np.var(diff1) / activity)  # Mobility (B)
    complexity = np.sqrt(np.var(diff2) / np.var(diff1)) / mobility  # Complexity (C)

    return activity, mobility, complexity

def FeatureExtract(data, number):
    f, t, Zxx = sp.signal.stft(data, 512, nperseg=15 * 512, noverlap=14 * 512)
    delta = np.array([], dtype=float)
    theta = np.array([], dtype=float)
    alpha = np.array([], dtype=float)
    beta = np.array([], dtype=float)

    for i in range(0, int(t[-1])):
        indices = np.where((f >= 0.5) & (f <= 4))[0]
        delta = np.append(delta, np.sum(np.abs(Zxx[indices, i])))

        indices = np.where((f >= 4) & (f <= 8))[0]
        theta = np.append(theta, np.sum(np.abs(Zxx[indices, i])))

        indices = np.where((f >= 8) & (f <= 13))[0]
        alpha = np.append(alpha, np.sum(np.abs(Zxx[indices, i])))

        indices = np.where((f >= 13) & (f <= 30))[0]
        beta = np.append(beta, np.sum(np.abs(Zxx[indices, i])))

    abr = alpha / beta
    tbr = theta / beta
    dbr = delta / beta
    tar = theta / alpha
    dar = delta / alpha
    dtabr = (alpha + beta) / (delta + theta)

    diction = {
        "delta": delta,
        "theta": theta,
        "alpha": alpha,
        "beta": beta,
        "abr": abr,
        "tbr": tbr,
        "dbr": dbr,
        "tar": tar,
        "dar": dar,
        "dtabr": dtabr,
        "label": number
    }

    return diction

def save_to_csv(data, filename):
    df = pd.DataFrame(data)
    df.to_csv(filename, index=False)



In [68]:
import os
folder_path = '../Data'

task1_data = []
task2_data = []
task3_data = []
task4_data = []
task5_data = []

print(os.listdir(folder_path))

for dir in os.listdir(folder_path)[1:]:
    subject_path = os.path.join(folder_path, dir)

    for filename in os.listdir(subject_path):
        # Kiểm tra nếu là file .txt
        if filename.endswith(".txt"):
            # Đọc dữ liệu từ file
            file_data = np.loadtxt(os.path.join(subject_path, filename))

        match filename[:-4]:
            case "task1": 
                task1_data.append(file_data)
            case "task2":
                task2_data.append(file_data)
            case "task3":
                task3_data.append(file_data)
            case "task4":
                task4_data.append(file_data)
            case "task5":
                task5_data.append(file_data)
            case _:
                print("file name is not correct!")
            # print(filename)
        

# task1_data = np.concatenate(task1_data, axis = 0)
task2_data = np.concatenate(task2_data, axis = 0)
task3_data = np.concatenate(task3_data, axis = 0)
task4_data = np.concatenate(task4_data, axis = 0)
task5_data = np.concatenate(task5_data, axis = 0)

# print(task1_data.shape)
print(task2_data.shape)
print(task3_data.shape)
print(task4_data.shape)
print(task5_data.shape)



In [82]:
# Task1 = filter_data(task1_data)
Task2 = filter_data(task2_data)
Task3 = filter_data(task4_data)
Task4 = filter_data(task3_data)
Task5 = filter_data(task5_data)


# Extract features
# task1_features = FeatureExtract(Task1, 0)
task2_features = FeatureExtract(Task2, 0)
task3_features = FeatureExtract(Task3, 1)
task4_features = FeatureExtract(Task4, 2)
task5_features = FeatureExtract(Task5, 3)


# Save features to CSV
# save_to_csv(task1_features, '../Data/CSV/Task1.csv')
save_to_csv(task2_features, '../CSV/Task2.csv')
save_to_csv(task3_features, '../CSV/Task3.csv')
save_to_csv(task4_features, '../CSV/Task4.csv')
save_to_csv(task5_features, '../CSV/Task5.csv')

print("EEG features saved to csv")

In [83]:
# Task1 = pd.read_csv('../CSV/Task1.csv')
Task2 = pd.read_csv('../CSV/Task2.csv')
Task3 = pd.read_csv('../CSV/Task3.csv')
Task4 = pd.read_csv('../CSV/Task4.csv')
Task5 = pd.read_csv('../CSV/Task5.csv')


# print(Task1.shape)
print(Task2.shape)
print(Task3.shape)
print(Task4.shape)
print(Task5.shape)



In [84]:
import matplotlib.pyplot as plt

# Giả sử Task1, Task2, Task3, Task4, Task5 là các DataFrame hoặc Series có cột 'delta'

# plt.plot(Task1['delta'], label='Task1')
plt.plot(Task2['delta'], label='Task2')
plt.plot(Task3['delta'], label='Task3')
plt.plot(Task4['delta'], label='Task4')
plt.plot(Task5['delta'], label='Task5')

# Thêm chú thích vào biểu đồ
plt.legend()

# Hiển thị biểu đồ
plt.show()


In [85]:
# plt.plot(Task1['alpha'], label='Task1')
plt.plot(Task2['alpha'], label='Task2')
plt.plot(Task3['alpha'], label='Task3')
plt.plot(Task4['alpha'], label='Task4')
plt.plot(Task5['alpha'], label='Task5')
plt.legend()

plt.show()

In [86]:
# plt.plot(Task1['beta'], label='Task1')
plt.plot(Task2['beta'], label='Task2')
plt.plot(Task3['beta'], label='Task3')
plt.plot(Task4['beta'], label='Task4')
plt.plot(Task5['beta'], label='Task5')

plt.legend()

plt.show()

In [87]:
# plt.plot(Task1['theta'], label='Task1')
plt.plot(Task2['theta'], label='Task2')
plt.plot(Task3['theta'], label='Task3')
plt.plot(Task4['theta'], label='Task4')
plt.plot(Task5['theta'], label='Task5')
plt.legend()

plt.show()

In [88]:
def df_to_X_y(df, window_size=15):
  df_as_np = df.to_numpy()
  X = []
  y = []
  for i in range(len(df_as_np)-window_size):
    row = [[a[:-1]] for a in df_as_np[i:i+window_size]]
    X.append(row)
    label = df_as_np[i+window_size][-1]
    y.append(label)
  return np.array(X), np.array(y)

# feature_task1, label_task2 = df_to_X_y(Task1)
feature_task2, label_task2 = df_to_X_y(Task2)
feature_task3, label_task3 = df_to_X_y(Task3)
feature_task4, label_task4 = df_to_X_y(Task4)
feature_task5, label_task5 = df_to_X_y(Task5)


# feature_task1 = np.squeeze(feature_task1)
feature_task2 = np.squeeze(feature_task2)
feature_task3 = np.squeeze(feature_task3)
feature_task4 = np.squeeze(feature_task4)
feature_task5 = np.squeeze(feature_task5)



In [89]:
print(feature_task2[1])


In [90]:
features = np.vstack((feature_task2, feature_task3, feature_task4, feature_task5))
labels = np.concatenate((label_task2, label_task3, label_task4, label_task5))
print(features.shape)
print(labels.shape)

In [91]:
from sklearn.model_selection import train_test_split
import tensorflow as tf
print(tf.__version__)
from tensorflow import keras

from keras.models import Sequential, Model
# from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Dense, Activation, Flatten, Dropout, BatchNormalization, MultiHeadAttention, Input, DepthwiseConv2D, LSTM, Bidirectional,Embedding, Input,GlobalAveragePooling2D,GlobalAveragePooling1D
from keras.layers import Conv2D, MaxPooling2D, GRU
from keras import regularizers
from keras.callbacks import LearningRateScheduler
import numpy as np
import pandas as pd

X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.3, random_state=42, stratify=labels)

batch_size = 32
data_train = tf.data.Dataset.from_tensor_slices((X_train, y_train))
data_test = tf.data.Dataset.from_tensor_slices((X_test, y_test))
data_train = data_train.repeat(10).batch(batch_size).cache().prefetch(tf.data.experimental.AUTOTUNE).shuffle(buffer_size=500)
data_test = data_test.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

print(X_train[1])

In [92]:
weight_decay = 0.001
model = Sequential()
model.add(Dense(64, activation='relu', input_shape=(15, 10), kernel_regularizer=regularizers.l2(weight_decay)))
model.add(Flatten())
model.add(Dense(4, activation='softmax'))
print(model.summary())

In [93]:
model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(0.001), metrics=['accuracy'])

model.fit(data_train, validation_data=data_test, epochs=10)



In [94]:
loss_train, acc_train = model.evaluate(data_train)
loss_test, acc_test = model.evaluate(data_test)
print("Train results: ", acc_train)
print("Test results: ", acc_test)

In [95]:
train_predictions = model.predict(X_test)
train_predictions_labels = np.argmax(train_predictions, axis=1)
train_results = pd.DataFrame(data={'Train Predictions':train_predictions_labels, 'Actuals':y_test})
train_results

print(np.argmax(model.predict(features[1:3]), axis = 1))

In [99]:
# Make predictions on the test set
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay



# Print classification report
print(classification_report(y_test, train_predictions_labels, labels=[0, 1,2,3]))

# Display confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix(y_test, train_predictions_labels), display_labels=["Task2", "Task3", "Task4", "Task5"])
disp.plot()

plt.show()


In [77]:
import pickle
pickle.dump(model, open("../trained_model/ANN.h5", "wb"))

# pickle.dump(model, open("/Users/nguyentrithanh/Downloads/OneDrive_1_29-5-2024/ZZZZZ/trained_model/ANN.h5", "wb"))
