RAW (MOABB) to CSV

This code convert the data sets from RAW format to CSV format using MOABB.

It has been specifically conceived for BCI data.

This script is for Schirrmeister2017

In [None]:
import numpy as np
import pandas as pd
import mne
from moabb import datasets

# Import decimate 
import sys
import os
sys.path.append(os.path.abspath('..'))
from EEGtools import decimate

In [None]:
# WARNING: If you plan to use this script, know that the Schirrmeister2017 database is quite extensive. 
# Therefore, I recommend loading half of the database at a time (e.g. m_dataset.get_data([m_dataset.subject_list[1,2,3,4,...]])).
# Load Database 
m_dataset = datasets.Schirrmeister2017()
m_data = m_dataset.get_data()

In [None]:
#See all canal names (EEG, misc, stim...)
raw_tr = m_data[1]['0']['0train']
raw_te = m_data[1]['0']['1test']
raw = mne.concatenate_raws([raw_tr, raw_te])
print("Canal list :", raw.ch_names)

In [None]:
# get events from annonations
events_from_annot, event_dict = mne.events_from_annotations(raw)

# create stim canal
stim_data = np.zeros((1, len(raw.times)))

# for each event, place the value of the event in stim canal
for event in events_from_annot:
    stim_data[0, event[0]] = event[2]

# Add stim canal in raw object
info = mne.create_info(['STIM'], raw.info['sfreq'], ['stim'])
stim_raw = mne.io.RawArray(stim_data, info)
raw.add_channels([stim_raw], force_update_info = True)

print("Canal list :", raw.ch_names)
print(raw.ch_names[-1]) 

In [None]:
# quick check
# Extract events from stim canal
events_from_stim = mne.find_events(raw)

# Verify if stim canal has the same events compared to annotations 
print("Number of events from annonations :", len(events_from_annot))
print("Number of events from STIM canal :", len(events_from_stim))
print(events_from_annot == events_from_stim)

In [None]:
#Know what index is stim channel
stim_name = 'STIM'
stim_idx = raw.ch_names.index(stim_name)
print(f"Canal index {stim_name} is : {stim_idx}")

In [None]:
#count stim data unique values
stim_data = raw.get_data(picks=stim_idx)
print(stim_data.shape)
unique_vals, counts = np.unique(stim_data, return_counts=True)

for val, count in zip(unique_vals, counts):
    print(f"Value : {val}, Occurences count : {count}")

In [None]:
# downsampling
sfreq = 500
decimation_factor = 2 
raw_decimated = decimate(raw, sfreq, decimation_factor, stim_name)

In [None]:
# Transpose to invert columns/lines
data = raw_decimated.get_data()
dataT = data.T
print(dataT.shape)

In [None]:
# Standardize labels in the stimulation channel (last column):
# Change marker 1 to 5 (temp)
dataT[:, -1] = np.where(dataT[:, -1] == 1, 5, dataT[:, -1])
# Change marker 2 to 1
dataT[:, -1] = np.where(dataT[:, -1] == 2, 1, dataT[:, -1])
# Change marker 4 to 2
dataT[:, -1] = np.where(dataT[:, -1] == 4, 2, dataT[:, -1])
# Change marker 3 to 4
dataT[:, -1] = np.where(dataT[:, -1] == 3, 4, dataT[:, -1])
# Change marker 5 to 3
dataT[:, -1] = np.where(dataT[:, -1] == 5, 3, dataT[:, -1])
print("Shape of dataT:", dataT.shape)

In [None]:
# Extract the last column (stim channel)
stim_col = dataT[:, -1]

# Count the unique values
unique_vals, counts = np.unique(stim_col, return_counts=True)

# Loop through unique values and their counts to print the results
for val, count in zip(unique_vals, counts):
    print(f"Value : {val}, Occurrence count : {count}")

In [None]:
# creating timestamps and header
n_times, n_channels = dataT.shape
timestamps = np.arange(n_times, dtype=int)
data_with_timestamp = np.column_stack((timestamps, dataT))
header = [""] + [str(i) for i in range(n_channels)]

# Removing decimals from timestamps
df = pd.DataFrame(data_with_timestamp, columns=header)
df.iloc[:, 0] = df.iloc[:, 0].astype(int)

In [None]:
# Test to check csv file
df.to_csv("data.csv", index=False)

In [None]:
# Loop for all subjects
subject_list = list(m_data.keys())

# Downsampling parameters
sfreq = 500
decimation_factor = 2 

for subject in subject_list:
    # Load training and testing runs
    raw_tr = m_data[subject]['0']['0train']
    raw_te = m_data[subject]['0']['1test']
    # Concatenate training and testing data
    raw_session = mne.concatenate_raws([raw_tr, raw_te])

    # Get events from annotations
    events_from_annot, event_dict = mne.events_from_annotations(raw_session)
    # Create a stimulation channel (filled with zeros)
    stim_data = np.zeros((1, len(raw_session.times)))
    # Place event values into the stimulation channel at their respective time points
    for event in events_from_annot:
        stim_data[0, event[0]] = event[2]
    # Add the stimulation channel to the raw object
    info = mne.create_info(['STIM'], raw_session.info['sfreq'], ['stim'])
    stim_raw = mne.io.RawArray(stim_data, info)
    raw_session.add_channels([stim_raw], force_update_info=True)

    # Downsampling
    raw_decimated = decimate(raw, sfreq, decimation_factor, stim_name)
    data = raw_decimated.get_data()

    # Transpose to get dataT of shape (total_timesamples, n_channels)
    dataT = data.T
    n_times, n_channels = dataT.shape

    # Standardize labels in the stimulation channel (last column):
    # Change marker 1 to 5 (temporary placeholder)
    dataT[:, -1] = np.where(dataT[:, -1] == 1, 5, dataT[:, -1])
    # Change marker 2 to 1
    dataT[:, -1] = np.where(dataT[:, -1] == 2, 1, dataT[:, -1])
    # Change marker 4 to 2
    dataT[:, -1] = np.where(dataT[:, -1] == 4, 2, dataT[:, -1])
    # Change marker 3 to 4
    dataT[:, -1] = np.where(dataT[:, -1] == 3, 4, dataT[:, -1])
    # Change marker 5 to 3
    dataT[:, -1] = np.where(dataT[:, -1] == 5, 3, dataT[:, -1])
    print("Shape of dataT:", dataT.shape)

    # Create the timestamps column
    timestamps = np.arange(n_times, dtype=int)
    datacsv = np.column_stack((timestamps, dataT))
    header = [""] + [str(i) for i in range(n_channels)]
    df = pd.DataFrame(datacsv, columns=header)
    df[""] = df[""].astype(int)

    # Set filename and export
    subject_str = f"{int(subject):02d}"
    filename = f"subject_{subject_str}_session_01.csv"
    df.to_csv(filename, index=False)

    # Display info
    events = df.iloc[:, -1]
    n_lh = len(events[events == 1])  
    n_rh = len(events[events == 2]) 
    n_f = len(events[events == 3]) 
    rest = len(events[events == 4])
    print(f"Number of Left hand (1): {n_lh}")
    print(f"Number of Right hand (2): {n_rh}")
    print(f"Number of feet (3): {n_f}")
    print(f"Number of rest (4): {rest}")