<a href="https://colab.research.google.com/github/deepak427/Psychi-learn/blob/main/Prepare_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download and prepare EEG dataset files from PhysioNet

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import requests
from bs4 import BeautifulSoup

def download_file(url, index):

  target_directory = "/content/drive/MyDrive/physionet/eegmmidb/"
  os.makedirs(target_directory, exist_ok=True)  # Create the directory if it doesn't exist

  # Construct the local file path
  local_filename = target_directory + url.split('/')[-1]
  # NOTE the stream=True parameter
  r = requests.get(url, stream=True)
  with open(local_filename, 'wb') as f:
      for chunk in r.iter_content(chunk_size=1024):
          if chunk: # filter out keep-alive new chunks
              f.write(chunk)
              f.flush()
  return local_filename

for i in range(1, 110):

    if i < 10:
        root_link="https://archive.physionet.org/pn4/eegmmidb/S00" + str(i) + "/"

    elif i >= 10 and i < 100 :
        root_link = "https://archive.physionet.org/pn4/eegmmidb/S0" + str(i) + "/"

    else:
        root_link = "https://archive.physionet.org/pn4/eegmmidb/S" + str(i) + "/"

    r=requests.get(root_link)

    if r.status_code==200:
      soup=BeautifulSoup(r.text, features="html.parser")
      # print soup.prettify()

      index=1
      for link in soup.find_all('a'):
          new_link=root_link+link.get('href')

          if new_link.endswith(".edf"):
              file_path=download_file(new_link,str(index))
              print("downloading:"+new_link+" -> "+file_path)
              index+=1

          # if new_link.endswith(".edf.event"):
          #     file_path = download_file(new_link, str(index))
          #     print("downloading:" + new_link + " -> " + file_path)
          #     index += 1

      print("all download finished")

    else:
      print("errors occur.")

In [None]:
!pip install pyedflib numpy scipy

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import os
import pyedflib
import scipy.io as sio
from typing import Tuple  # For type hinting (optional)

MOVEMENT_START = 1 * 160  # MI starts 1s after trial begin
MOVEMENT_END   = 5 * 160  # MI lasts 4 seconds

PHYSIONET_ELECTRODES = {
    1:  "FC5",  2: "FC3",  3: "FC1",  4: "FCz",  5: "FC2",  6: "FC4",
    7:  "FC6",  8: "C5",   9: "C3",  10: "C1",  11: "Cz",  12: "C2",
    13: "C4",  14: "C6",  15: "CP5", 16: "CP3", 17: "CP1", 18: "CPz",
    19: "CP2", 20: "CP4", 21: "CP6", 22: "Fp1", 23: "Fpz", 24: "Fp2",
    25: "AF7", 26: "AF3", 27: "AFz", 28: "AF4", 29: "AF8", 30: "F7",
    31: "F5",  32: "F3",  33: "F1",  34: "Fz",  35: "F2",  36: "F4",
    37: "F6",  38: "F8",  39: "FT7", 40: "FT8", 41: "T7",  42: "T8",
    43: "T9",  44: "T10", 45: "TP7", 46: "TP8", 47: "P7",  48: "P5",
    49: "P3",  50: "P1",  51: "Pz",  52: "P2",  53: "P4",  54: "P6",
    55: "P8",  56: "PO7", 57: "PO3", 58: "POz", 59: "PO4", 60: "PO8",
    61: "O1",  62: "Oz",  63: "O2",  64: "Iz"}


def load_edf_signals(path: str) -> Tuple[np.ndarray, list]: # Type hinting
    try:
        with pyedflib.EdfReader(path) as f:
            n = f.signals_in_file
            signal_labels = f.getSignalLabels()
            sigbuf = np.zeros((n, f.getNSamples()[0]))

            for j in range(n):
                sigbuf[j, :] = f.readSignal(j)
            annotations = f.readAnnotations()
            print(type(annotations))
    except KeyboardInterrupt:
        raise
    return sigbuf.transpose(), annotations



def load_physionet_data(subject_id: int, num_classes: int = 4, long_edge: bool = False) -> Tuple[np.ndarray, np.ndarray, int]:
    SAMPLE_RATE  = 160
    EEG_CHANNELS = 64
    BASELINE_RUN = 1
    MI_RUNS = [4, 8, 12]
    if num_classes >= 4:
        MI_RUNS += [6, 10, 14]

    RUN_LENGTH = 125 * SAMPLE_RATE
    TRIAL_LENGTH = 4 if not long_edge else 6
    NUM_TRIALS   = 21 * num_classes

    n_runs = len(MI_RUNS)
    X = np.zeros((n_runs, RUN_LENGTH, EEG_CHANNELS))
    events = []

    base_path = '/content/drive/MyDrive/physionet/eegmmidb/' + 'S%03dR%02d.edf'

    for i_run, current_run in enumerate(MI_RUNS):
        path = base_path % (subject_id, current_run)
        signals, annotations = load_edf_signals(path)
        X[i_run, :signals.shape[0], :] = signals

        current_event = [i_run, 0, 0, 0]

        for annotation in np.column_stack(annotations):
            t = int(float(annotation[0]) * 160)
            action = int(annotation[2][1])

            if action == 0 and current_event[1] != 0:
                length = TRIAL_LENGTH * SAMPLE_RATE
                pad = (length - (t - current_event[2])) // 2  # Integer division

                current_event[2] -= pad + (t - current_event[2]) % 2
                current_event[3] = t + pad

                if (current_run - 6) % 4 != 0 or current_event[1] == 2:
                    if (current_run - 6) % 4 == 0:
                        current_event[1] = 3
                    events.append(current_event)

            elif action > 0:
                current_event = [i_run, action, t, 0]


    num_mi_trials = len(events)
    trials = np.zeros((NUM_TRIALS, TRIAL_LENGTH * SAMPLE_RATE, EEG_CHANNELS))
    labels = np.zeros((NUM_TRIALS, num_classes))

    for i, ev in enumerate(events):
        trials[i, :, :] = X[ev[0], ev[2]:ev[3]]
        labels[i, ev[1] - 1] = 1.

    if num_classes < 3:
        return (trials[:num_mi_trials, ...],
                labels[:num_mi_trials, ...],
                SAMPLE_RATE)
    else:
        path = base_path % (subject_id, BASELINE_RUN)
        signals, annotations = load_edf_signals(path)
        SAMPLES = TRIAL_LENGTH * SAMPLE_RATE
        for i in range(num_mi_trials, NUM_TRIALS):
            offset = np.random.randint(0, signals.shape[0] - SAMPLES)
            trials[i, :, :] = signals[offset: offset+SAMPLES, :]
            labels[i, -1] = 1.
        return trials, labels, SAMPLE_RATE



def load_raw_data(electrodes: list, subject=None, num_classes=4, long_edge=False): # Type hinting
    trials = []
    labels = []

    if subject is None:  # More Pythonic check for None
        subject_ids = range(11, 15)  # Or adjust the range as needed
    else:
        try:
            subject_ids = [int(subject)]
        except:
            subject_ids = subject

    for subject_id in subject_ids:
        try:
            t, l, fs = load_physionet_data(subject_id, num_classes, long_edge=long_edge)
            if num_classes == 2 and t.shape[0] != 42:
                continue
            trials.append(t[:, :, electrodes])
            labels.append(l)
        except Exception as e: # Catch the exception to understand the error
            print(f"Error loading data for subject {subject_id}: {e}")
            pass # Or handle the error as needed, perhaps logging it or re-raising it.
    return np.array(trials, dtype=np.float64).reshape((len(trials),) + trials[0].shape + (1,)), np.array(labels, dtype=np.float64)




print("Start to save the Files!")


nclasses = 4
SAVE = '/content/drive/MyDrive/physionet/' + '20-Subjects'
if not os.path.exists(SAVE):
    os.mkdir(SAVE)

subject = range(1, 21)

for i in range(0, 64):
    electrodes = [i]
    X, Y = load_raw_data(electrodes=electrodes, subject=subject, num_classes=nclasses)
    X = np.squeeze(X)
    sio.savemat(os.path.join(SAVE, 'Dataset_%d.mat' % int(i+1)), {'Dataset': X}) # Use os.path.join
    sio.savemat(os.path.join(SAVE, 'Labels_%d.mat' % int(i + 1)), {'Labels': Y}) # Use os.path.join
    print("Finished saving {} electrodes".format(int(i+1)))