In [66]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, accuracy_score
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import StandardScaler
import torch.nn.functional as F
import pickle


# Constants
EEG_DATA_DIR = r'C:\Users\User\Documents\Lie detect data\7M_EEGData'
max_length = 3750  # Define maximum length for padding

# Define EEGNet model
class SeparableConv2d(nn.Module):
    def __init__(self, c_in: int, c_out: int, kernel_size: tuple, padding: tuple = 0):
        super().__init__()
        self.c_in = c_in
        self.c_out = c_out
        self.kernel_size = kernel_size
        self.padding = padding
        self.depthwise_conv = nn.Conv2d(self.c_in, self.c_in, kernel_size=self.kernel_size,
                                        padding=self.padding, groups=self.c_in)
        self.conv2d_1x1 = nn.Conv2d(self.c_in, self.c_out, kernel_size=1)

    def forward(self, x: torch.Tensor):
        y = self.depthwise_conv(x)
        y = self.conv2d_1x1(y)
        return y

class SeparableConv1d(nn.Module):
    def __init__(self, c_in: int, c_out: int, kernel_size: tuple, padding: tuple = 0):
        super().__init__()
        self.c_in = c_in
        self.c_out = c_out
        self.kernel_size = kernel_size
        self.padding = padding
        self.depthwise_conv = nn.Conv1d(self.c_in, self.c_in, kernel_size=self.kernel_size,
                                        padding=self.padding, groups=self.c_in)
        self.conv1d_1x1 = nn.Conv1d(self.c_in, self.c_out, kernel_size=1)

    def forward(self, x: torch.Tensor):
        y = self.depthwise_conv(x)
        y = self.conv1d_1x1(y)
        return y

class EEGNet(nn.Module):
    def __init__(self, nb_classes: int = 2, Chans: int = 65, Samples: int = 3750,
                 dropoutRate: float = 0.5, kernLength: int = 63,
                 F1:int = 8, D:int = 2):
        super().__init__()

        F2 = F1 * D

        # Make kernel size and odd number
        try:
            assert kernLength % 2 != 0
        except AssertionError:
            raise ValueError("ERROR: kernLength must be odd number")

        # In: (B, Chans, Samples, 1)
        # Out: (B, F1, Samples, 1)
        self.conv1 = nn.Conv1d(Chans, F1, kernLength, padding=(kernLength // 2))
        self.bn1 = nn.BatchNorm1d(F1) # (B, F1, Samples, 1)
        # In: (B, F1, Samples, 1)
        # Out: (B, F2, Samples - Chans + 1, 1)
        self.conv2 = nn.Conv1d(F1, F2, Chans, groups=F1)
        self.bn2 = nn.BatchNorm1d(F2) # (B, F2, Samples - Chans + 1, 1)
        # In: (B, F2, Samples - Chans + 1, 1)
        # Out: (B, F2, (Samples - Chans + 1) / 4, 1)
        self.avg_pool = nn.AvgPool1d(4)
        self.dropout = nn.Dropout(dropoutRate)

        # In: (B, F2, (Samples - Chans + 1) / 4, 1)
        # Out: (B, F2, (Samples - Chans + 1) / 4, 1)
        self.conv3 = SeparableConv1d(F2, F2, kernel_size=31, padding=15)
        self.bn3 = nn.BatchNorm1d(F2)
        # In: (B, F2, (Samples - Chans + 1) / 4, 1)
        # Out: (B, F2, (Samples - Chans + 1) / 32, 1)
        self.avg_pool2 = nn.AvgPool1d(8)
        # In: (B, F2 *  (Samples - Chans + 1) / 32)
        self.fc = nn.Linear(F2 * ((Samples - Chans + 1) // 32), nb_classes)

    def forward(self, x: torch.Tensor):
        # Block 1
        y1 = self.conv1(x)
        #print("conv1: ", y1.shape)
        y1 = self.bn1(y1)
        #print("bn1: ", y1.shape)
        y1 = self.conv2(y1)
        #print("conv2", y1.shape)
        y1 = F.relu(self.bn2(y1))
        #print("bn2", y1.shape)
        y1 = self.avg_pool(y1)
        #print("avg_pool", y1.shape)
        y1 = self.dropout(y1)
        #print("dropout", y1.shape)

        # Block 2
        y2 = self.conv3(y1)
        #print("conv3", y2.shape)
        y2 = F.relu(self.bn3(y2))
        #print("bn3", y2.shape)
        y2 = self.avg_pool2(y2)
        #print("avg_pool2", y2.shape)
        y2 = self.dropout(y2)
        #print("dropout", y2.shape)
        y2 = torch.flatten(y2, 1)
        #print("flatten", y2.shape)
        y2 = self.fc(y2)
        #print("fc", y2.shape)

        return y2
        
# Function to load and label data (same as in your training script)
def load_data(data_dir, max_length):
    X = []
    y = []
    file_list = os.listdir(data_dir)
    for file in file_list:
        with open(os.path.join(data_dir, file), 'rb') as f:
            data = pickle.load(f)
        label = 1 if 'truth' in file else 0
        if data.shape[1] > max_length:
            processed_data = data[:, :max_length]  # Cut data if it exceeds max_length
        else:
            processed_data = np.zeros((data.shape[0], max_length))
            processed_data[:, :data.shape[1]] = data  # Pad data if it is shorter than max_length
        X.append(processed_data)
        y.append(label)
    return np.array(X), np.array(y)

# Define dataset class
class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # Ensure the data is reshaped to [1, Chans, Samples]
        return torch.tensor(self.X[idx], dtype=torch.float32), torch.tensor(self.y[idx], dtype=torch.long)


# Main script
if __name__ == "__main__":
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load the saved model
    model_path = r'C:\Users\User\Documents\Lie detect data\Model\revise_model_fold_4.pth'
    model = EEGNet().to(device)
    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval

    # Move the model to the appropriate device
    model.to(device)

    # Load and preprocess the training data to create the scaler
    X, y = load_data(EEG_DATA_DIR, max_length)

    # Load the scaler
    with open(r'C:\Users\User\Documents\Lie detect data\Model\RevisedEEGNet_scaler_4.pkl', 'rb') as f:
        scaler = pickle.load(f)

    X = scaler.transform(X.reshape(X.shape[0], -1))
    X = X.reshape(-1, 65, max_length)
    

    test_dataset = EEGDataset(X, y)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    all_labels = []
    all_predictions = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        print(f"batch shape: {X_batch.shape}")
        
        X_batch = X_batch.to(device)
        labels = y_batch.to(device)
        
        outputs = model(X_batch)
        _, predicted = torch.max(outputs.data, 1)
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Convert lists to numpy arrays for metric calculations
all_labels = np.array(all_labels)
all_predictions = np.array(all_predictions)

# Calculate metrics
accuracy = accuracy_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions)
recall = recall_score(all_labels, all_predictions)
f1 = f1_score(all_labels, all_predictions)
auc = roc_auc_score(all_labels, all_predictions)
conf_matrix = confusion_matrix(all_labels, all_predictions)


# Print the metrics
print(f"Accuracy: {accuracy:.4f}")
print(f'Precision: {precision}, Recall: {recall}, F1-score: {f1}, AUC: {auc}')
print('Confusion Matrix:')
print(conf_matrix)

batch shape: torch.Size([32, 65, 3750])
batch shape: torch.Size([32, 65, 3750])
batch shape: torch.Size([32, 65, 3750])
batch shape: torch.Size([32, 65, 3750])
batch shape: torch.Size([15, 65, 3750])
Accuracy: 0.8182
Precision: 0.8421052631578947, Recall: 0.8205128205128205, F1-score: 0.8311688311688312, AUC: 0.8179487179487179
Confusion Matrix:
[[53 12]
 [14 64]]
