<a href="https://colab.research.google.com/github/VardhanVelamakanni/Mental_stres_Prediction_BandDecomposition_CWT/blob/main/Pipeline4V2(EEG).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import files
uploaded = files.upload()


Saving DASPS+HAM labels.zip to DASPS+HAM labels.zip
Saving Documents and Code.zip to Documents and Code.zip
Saving Raw data mat.zip to Raw data mat.zip


In [2]:
import os, shutil

BASE = "/content/DASPS"
os.makedirs(BASE, exist_ok=True)

for f in uploaded.keys():
    shutil.move(f, BASE+"/"+f)

os.listdir(BASE)


['Raw data mat.zip', 'Documents and Code.zip', 'DASPS+HAM labels.zip']

In [3]:
import zipfile

def unzip(z):
    with zipfile.ZipFile(z,'r') as f:
        f.extractall(BASE)

unzip(BASE+"/Raw data mat.zip")
unzip(BASE+"/DASPS+HAM labels.zip")


In [4]:
!pip install pywavelets scipy h5py scikit-learn opencv-python tqdm




In [49]:
import numpy as np, os, h5py, cv2, pywt, shutil, glob
from scipy.signal import butter, filtfilt
from tqdm import tqdm
from sklearn.model_selection import train_test_split


In [50]:
def bandpass(sig, low, high, fs=128):
    nyq = 0.5*fs
    b,a = butter(4,[low/nyq,high/nyq],btype='band')
    return filtfilt(b,a,sig,padlen=0)


In [51]:
def get_bands(signal):
    return [
        bandpass(signal,1,4),    # Delta
        bandpass(signal,4,8),    # Theta
        bandpass(signal,8,13),   # Alpha
        bandpass(signal,13,30),  # Beta
        bandpass(signal,30,45)   # Gamma
    ]


In [52]:
def bands_to_5ch(bands):
    scales=np.arange(1,64)
    imgs=[]
    for b in bands:
        cwt,_=pywt.cwt(b,scales,'morl')
        img=np.abs(cwt)
        img=(img-img.min())/(img.max()-img.min()+1e-8)
        img=cv2.resize(img,(128,128))
        imgs.append(img)
    return np.stack(imgs,axis=0)   # (5,128,128)


In [53]:
f=h5py.File(BASE+"/DASPS+HAM labels/DASPS+HAM labels.mat",'r')
labels=np.array(f[list(f.keys())[0]]).squeeze()


In [54]:
def decode_label(l):
    if isinstance(l,bytes):
        l=l.decode()
    l=str(l)
    if l.startswith("0"):
        return 0   # no stress
    else:
        return 1   # stress (1* and 2*)


In [55]:
from collections import Counter
print(Counter([decode_label(l) for l in labels]))


Counter({1: 545, 0: 8})


In [56]:
RAW=BASE+"/Raw data mat"
TEMP="/content/temp_all"
os.makedirs(TEMP,exist_ok=True)

count=0

for file in tqdm(os.listdir(RAW)):
    subject=file.replace(".mat","")
    with h5py.File(RAW+"/"+file,'r') as f:
        key=list(f.keys())[0]
        eeg=np.array(f[key])   # (trials,time,channels)

    idx=int(subject.replace("S",""))-1
    lab=decode_label(labels[idx])

    for trial in range(eeg.shape[0]):
        for ch in range(eeg.shape[2]):

            signal=eeg[trial,:,ch]
            bands=get_bands(signal)
            sample=bands_to_5ch(bands)

            cname="stress" if lab==1 else "no_stress"
            np.save(f"{TEMP}/{subject}_T{trial}_CH{ch}_{cname}.npy",sample)
            count+=1

print("Total samples:",count)


100%|██████████| 23/23 [06:32<00:00, 17.09s/it]

Total samples: 3864





In [57]:
print("Stress:",len(glob.glob(TEMP+"/*_stress.npy")))
print("NoStress:",len(glob.glob(TEMP+"/*_no_stress.npy")))


Stress: 3864
NoStress: 1344


In [58]:
DATASET="/content/dataset_5ch"

if os.path.exists(DATASET):
    shutil.rmtree(DATASET)

for s in ["train","val","test"]:
    for c in ["stress","no_stress"]:
        os.makedirs(f"{DATASET}/{s}/{c}",exist_ok=True)


In [59]:
stress=glob.glob(TEMP+"/*_stress.npy")
nostress=glob.glob(TEMP+"/*_no_stress.npy")

def split_and_copy(files,cls):
    train,tmp=train_test_split(files,test_size=0.3,random_state=42)
    val,test=train_test_split(tmp,test_size=0.5,random_state=42)

    for f in train:
        shutil.copy(f,f"{DATASET}/train/{cls}/"+os.path.basename(f))
    for f in val:
        shutil.copy(f,f"{DATASET}/val/{cls}/"+os.path.basename(f))
    for f in test:
        shutil.copy(f,f"{DATASET}/test/{cls}/"+os.path.basename(f))

split_and_copy(stress,"stress")
split_and_copy(nostress,"no_stress")


In [60]:
for s in ["train","val","test"]:
    for c in ["stress","no_stress"]:
        print(s,c,len(os.listdir(f"{DATASET}/{s}/{c}")))


train stress 2704
train no_stress 940
val stress 580
val no_stress 202
test stress 580
test no_stress 202


In [61]:
shutil.make_archive("/content/pipeline4_5ch_dataset","zip",DATASET)


'/content/pipeline4_5ch_dataset.zip'

In [62]:
from google.colab import files
files.download("/content/pipeline4_5ch_dataset.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>