In [55]:
import sys
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import wavfile
from scipy import signal
import pyedflib
from pyedflib import highlevel
import os
import h5py
import torch

In [50]:
#Constants 
dataPath = r"C:\Users\markd\Documents\GitHub\NAThacks\data\files"
h5Path = r"h5py/val.h5"
numChannels = 64
labels = {
    "T0" : "rest",
    "T1" : ["leftHand", "bothHands"],
    "T2" : ["rightHand", "bothFeet"]
}
labelsDict = {
    "rest": 0,
    "leftHand": 1,
    "rightHand": 2,
    "bothHands": 3,
    "bothFeet": 4
}
frequency = 160
time_period = 640
prc_overlap = .90
chunk_size = 2500

In [51]:
rest = []
leftHand = []
rightHand = []
bothHands = []
bothFeet = []

In [4]:
with h5py.File(h5Path, "w") as specs:
    specs.create_dataset("rest",(chunk_size,81,31,64), maxshape=(None,81,31,64),compression="gzip")
    specs.create_dataset("leftHand",(chunk_size,81,31,64), maxshape=(None,81,31,64), compression="gzip")
    specs.create_dataset("rightHand",(chunk_size,81,31,64), maxshape=(None,81,31,64), compression="gzip")
    specs.create_dataset("bothHands",(chunk_size,81,31,64), maxshape=(None,81,31,64), compression="gzip")
    specs.create_dataset("bothFeet",(chunk_size,81,31,64), maxshape=(None,81,31,64), compression="gzip")

def storeData(data, dataset : str, size):
    with h5py.File(h5Path, "r+") as f:
        df = f[dataset]
        current_shape = df.shape
        df.resize((current_shape[0] + size, 81, 31, 64))
        df[current_shape[0]:, :] = data
        print(f"{dataset} adjusted to {current_shape[0] + size}")

In [52]:
with h5py.File(h5Path, "w") as specs:
    specs.create_dataset("Samples",(chunk_size,81,31,64), maxshape=(None,81,31,64),compression="gzip")
    specs.create_dataset("Classes",(chunk_size,5), maxshape=(None,5),compression="gzip")

def storeDataSingle(data, dataset : str, size):
    with h5py.File(h5Path, "r+") as f:
        df = f['Samples']
        current_shape = df.shape
        df.resize((current_shape[0] + size, 81, 31, 64))
        df[current_shape[0]:, :] = data
        print(f"Samples adjusted to {current_shape[0] + size}")
        
        # Store class label
        # Classes need to be one-hot encoded
        class_idx = labelsDict[dataset]
        label = np.zeros([size, 5])
        label[:,class_idx] = 1
        
        labels = f['Classes']
        current_shape = labels.shape
        labels.resize((current_shape[0] + size, 5))
        labels[current_shape[0]:, :] = label  

In [53]:
#Generating spectograms Images
person = 1
for folder in os.listdir(dataPath)[90:]:
    for fileName in os.listdir(os.path.join(dataPath,folder)):

        #Skipping files .event files
        if fileName.find("event") != -1:
            continue

        #reading EDF file and extracting data
        filePath = os.path.join(dataPath,folder,fileName)
        annotations = ""
        file = ""

        #Getting the annotations and Data
        file = pyedflib.EdfReader(filePath)
        annotations = file.readAnnotations()
        file.close()

        signals, signal_headers, header = highlevel.read_edf(filePath)
        
        count = 0
        
        #Looping through the various 
        for i, period in enumerate(annotations[0]):
            #Getting signal data for 4 second period
            data = signals[:,int(annotations[0][i]):int(annotations[0][i]+time_period)]

            f, t, image = signal.spectrogram(data,frequency, nperseg = frequency, noverlap = frequency * prc_overlap)
            image = np.transpose(image,(1,2,0))

            #Assigning label to file
            label = annotations[2][i]

            #Categories of the different tests
            leftOrRight = ["03","04","07","08","11","12"]
            if label == "T1":
                if any(x == fileName[5:7] for x in leftOrRight):
                    leftHand.append(image)
                    if len(leftHand) == chunk_size:
                        storeDataSingle(leftHand,"leftHand", chunk_size)
                        leftHand = []
                else: 
                    bothHands.append(image)
                    if len(bothHands) == chunk_size:
                        storeDataSingle(bothHands,"bothHands", chunk_size)
                        bothHands = []
            elif label == "T2":
                if any(x == fileName[5:7] for x in leftOrRight):
                    rightHand.append(image)
                    if len(rightHand) == chunk_size:
                        storeDataSingle(rightHand,"rightHand", chunk_size)
                        rightHand = []
                else:
                    bothFeet.append(image)
                    if len(bothFeet) == chunk_size:
                        storeDataSingle(bothFeet,"bothFeet", chunk_size)
                        bothFeet = []
            else:
                rest.append(image)
                if len(rest) == chunk_size:
                        storeDataSingle(rest,"rest", chunk_size)
                        rest = []
            count += 1
    print(f"person {person} completed")
    person +=1

person 1 completed
person 2 completed
person 3 completed
person 4 completed
person 5 completed
person 6 completed
person 7 completed
person 8 completed
person 9 completed
person 10 completed
person 11 completed
person 12 completed
person 13 completed


In [54]:
storeDataSingle(rest,"rest", len(rest))
rest = []
storeDataSingle(bothFeet,"bothFeet", len(bothFeet))
bothFeet = []
storeDataSingle(leftHand,"leftHand", len(leftHand))
leftHand = []
storeDataSingle(rightHand, "rightHand",len(rightHand))
rightHand = []
storeDataSingle(bothHands,"bothHands",len(bothHands))
bothHands = []

Samples adjusted to 4856
Samples adjusted to 5435
Samples adjusted to 6020
Samples adjusted to 6605
Samples adjusted to 7185


In [60]:
a = torch.tensor([4,1,2,4,2])
b = torch.tensor([4,1,3,3,3])
torch.sum(a == b).double() / len(a)

tensor(0.4000, dtype=torch.float64)