In [166]:
import numpy as np
import pandas as pd
import os 
import json

In [167]:
morning = r'C:\Users\Administrator\Desktop\our_fatigue\Morning'
evening = r'C:\Users\Administrator\Desktop\our_fatigue\Evening'
label = r'C:\Users\Administrator\Desktop\our_fatigue\label.csv'

In [168]:
label = pd.read_csv(label)  # IF Morning Fatigue < Evening Fatigue ---> Label = 0

In [169]:

def read_json_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)
        return data
    except Exception as e:
        print(f"An error occurred: {e}")
        return None



def read_eeg_data(folder):
    data = {}
    for filename in os.listdir(folder):
        if filename.endswith(".json"): 
            subject_id = filename.split('_')[0]
            file_path = os.path.join(folder, filename)
            json_data = read_json_file(file_path)
            left_wave_all, right_wave_all = [], []
            for raw_data in json_data['ProcessedDatas']:
                left_wave = raw_data['LeftWave']
                right_wave = raw_data['RightWave']
                if any(np.isnan(left_wave)) or any(np.isnan(right_wave)):
                    print(f"NaNs detected in raw data for subject {subject_id}")


                left_wave_all += left_wave
                right_wave_all += right_wave
            final_data = np.column_stack((left_wave_all,right_wave_all))
            final_data = final_data[900:]
            if np.isnan(final_data).any():
                print(f"NaNs detected in final data for subject {subject_id}")

            data[subject_id] = final_data
    return data

def equalize_length(data1, data2):
    min_length = min(len(data1), len(data2))
    return data1[:min_length], data2[:min_length]



def split_into_chunks(data, chunk_size):
    num_chunks = data.shape[0] // chunk_size
    return data[:num_chunks * chunk_size,:].reshape(num_chunks,2, chunk_size)



def apply_fft(data):
    feat = []
    for i in range(data.shape[0]):
        chunk = data[i]
        ch_feat = []
        for channel in chunk:
            fft_data = np.abs(np.fft.fft(channel))[:64]
            sum = np.sum(fft_data)
            if sum == 0:
                print('sum is zero')
            fft_data = fft_data / sum
            ch_feat.extend(fft_data)
        feat.append(ch_feat)
    return feat

In [170]:
morning

'C:\\Users\\Administrator\\Desktop\\our_fatigue\\Morning'

In [171]:
morning_data = read_eeg_data(morning)
evening_data = read_eeg_data(evening)

In [172]:
def splitting(subjects,labels, morning= morning,evening = evening):
    morning_data = read_eeg_data(morning)
    evening_data = read_eeg_data(evening)
    eeg_data = {}
    for subject in subjects:
        morning_data_subject = np.array(morning_data[subject])
        evening_data_subject = np.array(evening_data[subject])
        
        # Equalize length
        morning_data_subject, evening_data_subject = equalize_length(morning_data_subject, evening_data_subject)
        
        eeg_data[subject] = {
            'morning': apply_fft(split_into_chunks(morning_data_subject, 225)),
            'evening': apply_fft(split_into_chunks(evening_data_subject, 225)),
            'label': labels[subject]
        }
    return eeg_data


#subjects = list(set(morning_data.keys()).intersection(evening_data.keys()))
train_positive = ['A06','A04']  # Label = 0
train_negative = ['A05','A03'] # Label = 1
test_positive = ['A02'] # label = 1
test_negative = ['A01'] # label = 0


train_data = train_positive + train_negative
test_data = test_negative + test_positive
labels = {subject: 1 for subject in train_positive + test_positive}
labels.update({subject: 0 for subject in train_negative + test_negative})


train_eeg_data = splitting(train_data,labels= labels)
test_eeg_data = splitting(test_data,labels= labels)


In [174]:
def length_check(data, label):
    total = []
    for i in label:
        dat = data[i]['morning']
        total.extend(dat)
    total = np.array(total)
    return len(total)


In [175]:
length_of_train_positve = length_check(train_eeg_data,train_positive)
length_of_train_negative = length_check(train_eeg_data,train_negative)
length_of_test_positve = length_check(test_eeg_data,test_positive)
length_of_test_negative = length_check(test_eeg_data,test_negative)



print("Length of train positive ",length_of_train_positve )
print("Length of train negative ",length_of_train_negative )
print("Length of test positive ",length_of_test_positve )
print("Length of test negative ",length_of_test_negative )


Length of train positive  506
Length of train negative  638
Length of test positive  316
Length of test negative  318


In [176]:
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

class CustomEEGDataset(Dataset):
    def __init__(self, eeg_data):
        self.data = []
        for subject in eeg_data:
            morning_chunks = eeg_data[subject]['morning']
            evening_chunks = eeg_data[subject]['evening']
            label = eeg_data[subject]['label']
            
            # Assuming morning_chunks and evening_chunks are lists of shape [316, 128]
            for i in range(len(morning_chunks)):
                morning_chunk = torch.tensor(morning_chunks[i], dtype=torch.float32)
                evening_chunk = torch.tensor(evening_chunks[i], dtype=torch.float32)
                self.data.append((subject, morning_chunk, evening_chunk, torch.tensor(label, dtype=torch.float32)))
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

# Create datasets
train_dataset = CustomEEGDataset(train_eeg_data)
test_dataset = CustomEEGDataset(test_eeg_data)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [179]:
for batch in train_loader:
    sub, input1, input2, label = batch
    print(f"subject shape: {sub}")
    print(f"Input1 shape: {input1.shape}")
    print(f"Input2 shape: {input2.shape}")
    print(f"Label shape: {label.shape}")
    break  # Remove this break to inspect all batches

subject shape: ('A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06', 'A06')
Input1 shape: torch.Size([128, 128])
Input2 shape: torch.Size([128, 128])
Label shape: t

In [180]:
from torch import nn
from torch import optim
class RankNet(nn.Module):
    def __init__(self, input_size):
        super(RankNet, self).__init__()
        self.linear = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )

    def forward(self, x):
        x = self.linear(x)
        return x

In [181]:
from datetime import datetime
import os
current_date = datetime.now().strftime("%Y-%m-%d")
outdir = 'outdir'
pth_file_path = os.path.join(outdir, f'testing-best_model_{current_date}.pth')
onnx_file_path = os.path.join(outdir, f"eeg_testing-best_model_{current_date}.onnx")

In [182]:
import torch.nn.functional as F
def rank_loss(output1, output2, label):
    # Subtract scores
    diff = output1 - output2
    prob = torch.sigmoid(diff).squeeze()
    assert (0 <= prob).all() and (prob <= 1).all(), "Probabilities are not in [0, 1] range"
    loss = F.binary_cross_entropy(prob, label)
    return loss

def rank_accuracy(output1, output2, label):
    # Compute the predicted ranking (0 or 1)
    #print("Input shape ", output1.shape)
    diff = output1 - output2
    #print("diff_shape", diff.shape)
    prob = torch.sigmoid(diff).squeeze()
    #print('prob',prob.shape)
    pred_label = (prob > 0.5)
    #print('pred_label',pred_label.shape)
    test_correct = (pred_label == label).float().sum()
    #print("test_correct",test_correct.shape, test_correct)
    return test_correct


# Training function
def train(model, data_loader, test_loader, optimizer, epochs):
    #model.train()
    best_accuracy = 0.0
    for epoch in range(epochs):
        total_loss = 0
        #total_accuracy = 0
        for _,input1, input2, label in data_loader:
            optimizer.zero_grad()
            output1 = model(input1)
            output2 = model(input2)
            
            loss = rank_loss(output1, output2, label)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        model.eval()  # Set the model to evaluation mode
        test_correct = 0
        total_samples = 0

        with torch.no_grad():
            for _,t_in1,t_in2,lab in test_loader:
                out1 = model(t_in1)
                out2 = model(t_in2)
                total_samples += lab.size(0)
                
                acc = rank_accuracy(out1,out2,lab)
                test_correct += acc.item()
                #print('final test_correct',test_correct)
        #print('total_samples',total_samples)   
        accuracy = test_correct / total_samples
        print(f'Epoch: {epoch} Train Loss: {loss} Test Accuracy: {accuracy}')
        # Save the best model
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            torch.save(model.state_dict(), pth_file_path)
            print("Best Accuracy Model: {}".format(best_accuracy))

            # Export the best model to ONNX
            dummy_input = torch.randn(1, 128)
            torch.onnx.export(model, dummy_input, onnx_file_path, input_names=['input'], output_names=['output'])

input_size = 128
model = RankNet(input_size)  # Ensure your model outputs match input_size
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Start training
train(model, train_loader,test_loader, optimizer, epochs=100)



Epoch: 0 Train Loss: 0.6893543601036072 Test Accuracy: 0.48264984227129337
Best Accuracy Model: 0.48264984227129337
Epoch: 1 Train Loss: 0.6190316081047058 Test Accuracy: 0.6009463722397477
Best Accuracy Model: 0.6009463722397477
Epoch: 2 Train Loss: 0.3601899743080139 Test Accuracy: 0.5993690851735016
Epoch: 3 Train Loss: 0.31114310026168823 Test Accuracy: 0.6451104100946372
Best Accuracy Model: 0.6451104100946372
Epoch: 4 Train Loss: 0.2976534068584442 Test Accuracy: 0.637223974763407
Epoch: 5 Train Loss: 0.2875290513038635 Test Accuracy: 0.6198738170347003
Epoch: 6 Train Loss: 0.2866315543651581 Test Accuracy: 0.6246056782334385
Epoch: 7 Train Loss: 0.2836405634880066 Test Accuracy: 0.6324921135646687
Epoch: 8 Train Loss: 0.27383777499198914 Test Accuracy: 0.6324921135646687
Epoch: 9 Train Loss: 0.271921843290329 Test Accuracy: 0.6356466876971609
Epoch: 10 Train Loss: 0.2688020169734955 Test Accuracy: 0.6309148264984227
Epoch: 11 Train Loss: 0.26468032598495483 Test Accuracy: 0.6324

Epoch: 12 Train Loss: 0.2620687782764435 Test Accuracy: 0.6277602523659306
Epoch: 13 Train Loss: 0.25606799125671387 Test Accuracy: 0.6277602523659306
Epoch: 14 Train Loss: 0.2508523464202881 Test Accuracy: 0.613564668769716
Epoch: 15 Train Loss: 0.25124606490135193 Test Accuracy: 0.6230283911671924
Epoch: 16 Train Loss: 0.2447555810213089 Test Accuracy: 0.613564668769716
Epoch: 17 Train Loss: 0.24286018311977386 Test Accuracy: 0.6072555205047319
Epoch: 18 Train Loss: 0.2361968606710434 Test Accuracy: 0.613564668769716
Epoch: 19 Train Loss: 0.23514172434806824 Test Accuracy: 0.5851735015772871
Epoch: 20 Train Loss: 0.2252461314201355 Test Accuracy: 0.5914826498422713
Epoch: 21 Train Loss: 0.22587554156780243 Test Accuracy: 0.580441640378549
Epoch: 22 Train Loss: 0.23001471161842346 Test Accuracy: 0.5741324921135647
Epoch: 23 Train Loss: 0.2180643379688263 Test Accuracy: 0.5851735015772871
Epoch: 24 Train Loss: 0.21771186590194702 Test Accuracy: 0.5725552050473186
Epoch: 25 Train Loss: 

In [183]:
import glob
import numpy as np
import onnxruntime as ort

from collections import defaultdict

In [230]:
import glob
import os
import numpy as np
import torch
import onnxruntime as ort
from collections import defaultdict

def get_onnxfile():
    outdir = './outdir'
    onnx_files = glob.glob(os.path.join(outdir, '*.onnx'))
    if onnx_files:
        onnx_files.sort()
        last_onnx_file = onnx_files[-1]
        return last_onnx_file
    return None

onnx_model_path = get_onnxfile()
if onnx_model_path is None:
    raise FileNotFoundError("No ONNX model file found in the specified directory.")

# Load the ONNX model
ort_session = ort.InferenceSession(onnx_model_path)

# Prediction function
def predict(features):
    inputs = {ort_session.get_inputs()[0].name: features}
    outputs = ort_session.run(None, inputs)
    return outputs[0]

# Helper function to predict probabilities
def predict_probabilities(features):
    prediction = predict(features)
    return torch.sigmoid(torch.from_numpy(prediction)).squeeze().item()

# Initialize results dictionary



In [234]:
def processing(train_loader):
    subject_results = defaultdict(list)
    # Iterate through the train loader and calculate predictions
    for subject, features1_batch, features2_batch, label_batch in train_loader:
        for i in range(features1_batch.shape[0]):
            features1 = features1_batch[i].numpy().astype(np.float32).reshape(1, -1)
            features2 = features2_batch[i].numpy().astype(np.float32).reshape(1, -1)
            label = label_batch[i].item()

            p1 = predict_probabilities(features1)
            p2 = predict_probabilities(features2)
            
            diff = predict(features1) - predict(features2)
            prob = torch.sigmoid(torch.from_numpy(diff)).squeeze().item()
            pred_label = (prob > 0.5)
            
            subject_results[subject].append((pred_label, label, [p1, p2]))

    # Combine subject results into a new structure
    new_subject_results = defaultdict(list)
    for subjects, pairs in subject_results.items():
        if isinstance(subjects, tuple):
            for subject in subjects:
                if subject:
                    new_subject_results[subject].extend(pairs)
        else:
            new_subject_results[subjects].extend(pairs)

    # Convert defaultdict to a regular dictionary if needed
    new_subject_results = dict(new_subject_results)

    # Calculate and print accuracy for each subject
    for subject, results in new_subject_results.items():
        if results:  # Ensure there are results to calculate accuracy
            correct = sum(pred == true for pred, true, _ in results)
            accuracy = correct / len(results)
            print(f'Subject: {subject}, Accuracy: {accuracy:.2f}')
        else:
            print(f'Subject: {subject} has no results, accuracy cannot be calculated.')
    p1_f = 0
    p2_f = 0
    subject_features = defaultdict(list)
    # Iterate over each result
    for subject, feat in new_subject_results.items():
        p1_f = 0
        p2_f = 0
        for _, _, features in feat:
            # Debug: Print the features to understand its structure
            #print("Features:", features)
            if features[0] > features[1]:
                p1_f += 1
            else:
                p2_f +=1
        subject_features[subject].append({'Morning': p1_f, 'Evening': p2_f})
    print(dict(subject_features))
    return 



train_positive = ['A06','A04']  # Label = 0

train_negative = ['A05','A03'] # Label = 1

test_positive = ['A02'] # label = 1

test_negative = ['A01'] # label = 0

In [235]:
processing(train_loader)

Subject: A06, Accuracy: 0.39
Subject: A04, Accuracy: 0.72
Subject: A05, Accuracy: 0.66
Subject: A03, Accuracy: 0.84
{'A06': [{'Morning': 8904, 'Evening': 14136}], 'A04': [{'Morning': 30512, 'Evening': 11216}], 'A05': [{'Morning': 14288, 'Evening': 26416}], 'A03': [{'Morning': 6336, 'Evening': 33664}]}


In [236]:
processing(test_loader)

Subject: A01, Accuracy: 0.62
Subject: A02, Accuracy: 0.67
{'A01': [{'Morning': 15318, 'Evening': 25386}], 'A02': [{'Morning': 25468, 'Evening': 14248}]}
