## Skilled Forgery using MI1 task

In [None]:
target_subject = 'S5'
test_session = ['S3']
path = 'give the path here'
all_subjects = ['S1', 'S2', 'S3', 'S4', 'S5', 'S6','S7', 'S8', 'S9', 'S10','S11', 'S12', 'S13', 'S14', 'S15', 'S16','S17', 'S18', 'S19', 'S20']


In [344]:
# Import necessary libraries
import torch
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
import os
import seaborn as sns
from sklearn.preprocessing import normalize
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, roc_auc_score
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.utils import shuffle


In [345]:

# Set random seeds for reproducibility
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)
random.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


In [346]:

class EEGCNN_GRU(nn.Module):
    def __init__(self, input_channels, sequence_length, hidden_size, num_classes):
        super(EEGCNN_GRU, self).__init__()
        self.conv1 = nn.Conv1d(input_channels, 64, 3)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(64, 64, 3)
        self.bn2 = nn.BatchNorm1d(64)
        self.conv3 = nn.Conv1d(64, 64, 3)
        self.bn3 = nn.BatchNorm1d(64)
        self.dropout1 = nn.Dropout(0.5)
        self.maxpool = nn.MaxPool1d(2)
        self.gru = nn.GRU(64, hidden_size, batch_first=True)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = F.elu(self.bn1(self.conv1(x)))
        x = F.elu(self.bn2(self.conv2(x)))
        x = F.elu(self.bn3(self.conv3(x)))
        x = self.dropout1(x)
        x = self.maxpool(x)
        x = x.permute(0, 2, 1)
        x, _ = self.gru(x)
        x = self.dropout2(x)
        x = x[:, -1, :]
        x = F.elu(self.fc1(x))
        x = F.log_softmax(self.fc2(x), dim=1)
        return x


In [347]:
#batch_size = 32
batch_size = 32

num_classes = 2

input_channels = 20

sequence_length = 100

#hidden_size = 128
hidden_size = 200

#num_epochs = 40
num_epochs = 60

num_folds = 10

In [None]:
# "D:\EEG_work_2024\work_may_2025\MI2_results\results_MI2_S1\S1_model.pth"
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = EEGCNN_GRU(input_channels, sequence_length, hidden_size, num_classes).to(device)
criterion = torch.nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.load_state_dict(torch.load(f'D:\\EEG_work_2024\\work_may_2025\\MI1_results\\results_MI1_{target_subject}\\{target_subject}_model.pth'))
model.eval()

  model.load_state_dict(torch.load(f'D:\\EEG_work_2024\\work_may_2025\\MI4_results\\results_MI4_{target_subject}\\{target_subject}_model.pth'))


EEGCNN_GRU(
  (conv1): Conv1d(20, 64, kernel_size=(3,), stride=(1,))
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
  (bn2): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout1): Dropout(p=0.5, inplace=False)
  (maxpool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (gru): GRU(64, 200, batch_first=True)
  (dropout2): Dropout(p=0.5, inplace=False)
  (fc1): Linear(in_features=200, out_features=200, bias=True)
  (fc2): Linear(in_features=200, out_features=2, bias=True)
)

In [None]:
## making test data
Data_VEP_test = pd.DataFrame()
for session in test_session:
    vep_path = os.path.join(path, target_subject, 'MI1', f'{session}.csv' )
    vep_data_test = pd.read_csv(vep_path)
    Data_VEP_test = pd.concat([Data_VEP_test, vep_data_test], ignore_index=True)

Data_VEP_test.drop(['time'], axis=1, inplace=True)
normalize_vep_test = normalize(Data_VEP_test, norm='max', axis=0)
Data_VEP_test = pd.DataFrame(normalize_vep_test, columns=Data_VEP_test.columns)

In [350]:
Data_VEP_test.shape

(12800, 20)

## Skilled user Test Dataset

In [None]:
Data_skilled_forgery = pd.DataFrame()
tasks_VEP = ['MI1']
for sub in all_subjects:
    if sub == target_subject:
        continue
    for task in tasks_VEP:
        for session in test_session:
            non_vep_path = os.path.join(path, sub, task, f'{session}.csv')
            non_vep_data_test = pd.read_csv(non_vep_path)
            Data_skilled_forgery = pd.concat([Data_skilled_forgery, non_vep_data_test], ignore_index=True)
Data_skilled_forgery.drop(['time'], axis=1, inplace=True)
normalize_non_vep_test = normalize(Data_skilled_forgery, norm='max', axis=0)
Data_skilled_forgery = pd.DataFrame(normalize_non_vep_test, columns=Data_skilled_forgery.columns)


In [352]:
Data_skilled_forgery.shape

(243200, 20)

In [353]:

# Function to create dataset
def create_dataset(df, lookback, label):
    df = df.to_numpy()
    X, y = [], []
    for i in range(0, len(df) - lookback + 1, lookback):
        X.append(df[i:i + lookback])
        y.append(label)
    return np.array(X), np.array(y)
lookback = 100

In [354]:
X_vep_test, y_vep_test = create_dataset(Data_VEP_test, lookback, label=1)
X_non_vep_test, y_non_vep_test = create_dataset(Data_skilled_forgery, lookback, label=0)

In [355]:
X_vep_test.shape, y_vep_test.shape, X_non_vep_test.shape, y_non_vep_test.shape

((128, 100, 20), (128,), (2432, 100, 20), (2432,))

In [356]:
X_test = np.concatenate((X_non_vep_test, X_vep_test), axis=0)
y_test = np.concatenate((y_non_vep_test, y_vep_test), axis=0)

In [357]:
X_test.shape, y_test.shape

((2560, 100, 20), (2560,))

In [358]:
X_test, y_test = torch.tensor(X_test).float(), torch.tensor(y_test).long()

In [359]:
X_test.shape, y_test.shape

(torch.Size([2560, 100, 20]), torch.Size([2560]))

In [None]:
model.eval()
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)
correct, total = 0, 0
predicted_labels, true_labels, predicted_probs_list = [], [], []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        predicted_labels.extend(predicted.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        predicted_probs = F.softmax(outputs, dim=1)
        predicted_probs_list.append(predicted_probs.cpu().numpy())
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

predicted_probs_np = np.concatenate(predicted_probs_list)
test_accuracy = 100 * correct / total

os.makedirs(f"Skilled_forgery_results_MI1_{target_subject}", exist_ok=True)

# Metrics calculations
acc = accuracy_score(true_labels, predicted_labels)
prec = precision_score(true_labels, predicted_labels, average='weighted')
rec = recall_score(true_labels, predicted_labels, average='weighted')
f1 = f1_score(true_labels, predicted_labels, average='weighted')

cm = confusion_matrix(true_labels, predicted_labels)
tn, fp, fn, tp = cm.ravel()
tpr = tp / (tp + fn)
fpr = fp / (fp + tn)

far = fp / (fp + tn) if (fp + tn) > 0 else 0  # False Acceptance Rate
frr = fn / (fn + tp) if (fn + tp) > 0 else 0  # False Rejection Rate

# Calculate EER
fpr_curve, tpr_curve, thresholds = roc_curve(true_labels, predicted_probs_np[:, 1])
eer_threshold = thresholds[np.nanargmin(np.abs(fpr_curve - (1 - tpr_curve)))]
eer = fpr_curve[np.nanargmin(np.abs(fpr_curve - (1 - tpr_curve)))]

auc = roc_auc_score(true_labels, predicted_probs_np[:, 1])

# Save metrics as CSV
metrics = {
    "test_accuracy": test_accuracy,
    "precision": prec,
    "recall": rec,
    "f1_score": f1,
    "TPR": tpr,
    "FPR": fpr,
    "AUC": auc,
    "EER": eer,
    "FAR": far,
    "FRR": frr
}

metrics_df = pd.DataFrame([metrics])
metrics_df.to_csv(f"Skilled_forgery_results_MI1_{target_subject}/metrics_subject_{target_subject}.csv", index=False)

# Save Confusion Matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False, annot_kws={"size": 35})
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.savefig(f"Skilled_forgery_results_MI1_{target_subject}/confusion_matrix_subject_{target_subject}.png")
plt.close()

# Save ROC Curve
plt.figure()
plt.plot(fpr_curve, tpr_curve, label=f"ROC Curve (AUC = {auc:.2f})")
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend()
plt.savefig(f"Skilled_forgery_results_MI1_{target_subject}/roc_curve_subject_{target_subject}.png")
plt.close()

print(f"Completed processing for subject {target_subject}.")
