# EEG Data Preprocessing

## Converting Files to CSVs

In [1]:
# Notebook: EEG File Converter to CSV with Integrated Annotations
import os
import mne
import pandas as pd

# Suppress MNE warnings
mne.set_log_level('ERROR')

# Define paths
root_path = './data/selfmade_dataset/'       # adjust if necessary
output_path = './csv_output/'

# Ensure output directory exists
os.makedirs(output_path, exist_ok=True)

# Channels to mark as misc
misc_chs = ['Aux1', 'Aux2', 'x_dir', 'y_dir', 'z_dir']

# Iterate over Person and Recording folders
for person_dir in sorted(os.listdir(root_path)):
    person_path = os.path.join(root_path, person_dir)
    if not os.path.isdir(person_path):
        continue

    for recording_dir in sorted(os.listdir(person_path)):
        recording_path = os.path.join(person_path, recording_dir)
        if not os.path.isdir(recording_path):
            continue

        # Process .vhdr files
        for file in sorted(os.listdir(recording_path)):
            if not file.endswith('.vhdr'):
                continue

            vhdr_path = os.path.join(recording_path, file)
            basename = file[:-5]  # strip .vhdr

            # Fix .vhdr references
            with open(vhdr_path, 'r', encoding='utf-8') as f:
                lines = f.readlines()
            with open(vhdr_path, 'w', encoding='utf-8') as f:
                for line in lines:
                    if line.startswith('DataFile='):
                        f.write(f'DataFile={basename}.eeg\n')
                    elif line.startswith('MarkerFile='):
                        f.write(f'MarkerFile={basename}.vmrk\n')
                    else:
                        f.write(line)

            # Load raw data, specifying eog as empty list and misc channels
            raw = mne.io.read_raw_brainvision(
                vhdr_path,
                preload=True,
                eog=[],
                misc=misc_chs
            )

            # Convert to DataFrame
            df = raw.to_data_frame()

            # Add annotation column, default ''
            df['annotation'] = ''
            if raw.annotations and len(raw.annotations) > 0:
                for onset, duration, desc in zip(raw.annotations.onset,
                                                 raw.annotations.duration,
                                                 raw.annotations.description):
                    mask = (df['time'] >= onset) & (df['time'] < onset + duration)
                    df.loc[mask, 'annotation'] = desc

            # Save combined CSV
            out_csv = os.path.join(output_path, f'{basename}.csv')
            df.to_csv(out_csv, index=False)
            print(f'Saved combined CSV: {out_csv}')


Saved combined CSV: ./csv_output/Person1Recording1.csv
Saved combined CSV: ./csv_output/Person1Recording2.csv
Saved combined CSV: ./csv_output/Person1Recording3.csv
Saved combined CSV: ./csv_output/Person1Recording3.csv
Saved combined CSV: ./csv_output/Person2Recording1.csv
Saved combined CSV: ./csv_output/Person2Recording2.csv
Saved combined CSV: ./csv_output/Person2Recording3.csv
Saved combined CSV: ./csv_output/Person3Recording1.csv
Saved combined CSV: ./csv_output/Person3Recording2.csv
Saved combined CSV: ./csv_output/Person3Recording3.csv
Saved combined CSV: ./csv_output/Person4Recording1.csv
Saved combined CSV: ./csv_output/Person4Recording2.csv
Saved combined CSV: ./csv_output/Person5Recording4.csv
Saved combined CSV: ./csv_output/Person6Recording4.csv
Saved combined CSV: ./csv_output/Person6Recording5.csv
Saved combined CSV: ./csv_output/Person6Recording6.csv
Saved combined CSV: ./csv_output/Person7Recording7.csv
Saved combined CSV: ./csv_output/Person8Recording8.csv


## Extend annotations for all rows and clean annotations 

In [2]:
import os, pandas as pd, numpy as np

for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue
    csv_file = os.path.join(output_path, file)
    df = pd.read_csv(csv_file)

    # Replace empty strings with NaN for ffill
    df['annotation'] = df['annotation'].replace('', np.nan)
    # Remove 'Stimulus/' prefix
    df['annotation'] = df['annotation'].str.replace(r'^Stimulus/', '', regex=True)
    # Forward-fill
    df['annotation'] = df['annotation'].ffill()
    df['annotation'] = df['annotation'].fillna('')

    df.to_csv(csv_file, index=False)
    print(f'Updated annotations in: {csv_file}')


Updated annotations in: ./csv_output/Person1Recording1.csv
Updated annotations in: ./csv_output/Person1Recording2.csv
Updated annotations in: ./csv_output/Person1Recording3.csv
Updated annotations in: ./csv_output/Person2Recording1.csv
Updated annotations in: ./csv_output/Person2Recording2.csv
Updated annotations in: ./csv_output/Person2Recording3.csv
Updated annotations in: ./csv_output/Person3Recording1.csv
Updated annotations in: ./csv_output/Person3Recording2.csv
Updated annotations in: ./csv_output/Person3Recording3.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person4Recording1.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person4Recording2.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person5Recording4.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person6Recording4.csv
Updated annotations in: ./csv_output/Person6Recording5.csv


  df = pd.read_csv(csv_file)


Updated annotations in: ./csv_output/Person6Recording6.csv
Updated annotations in: ./csv_output/Person7Recording7.csv
Updated annotations in: ./csv_output/Person8Recording8.csv


## Add columns for Person and Recording

In [3]:
import os, re, pandas as pd

for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue
    csv_file = os.path.join(output_path, file)
    m = re.match(r'Person(\d+)Recording(\d+)\.csv$', file)
    if not m:
        print(f"Filename does not match pattern: {file}")
        continue
    person_num, recording_num = map(int, m.groups())

    df = pd.read_csv(csv_file)
    df.insert(0, 'Recording', recording_num)
    df.insert(0, 'Person',    person_num)
    df.to_csv(csv_file, index=False)
    print(f'Added Person and Recording columns in: {csv_file}')


Added Person and Recording columns in: ./csv_output/Person1Recording1.csv
Added Person and Recording columns in: ./csv_output/Person1Recording2.csv
Added Person and Recording columns in: ./csv_output/Person1Recording3.csv
Added Person and Recording columns in: ./csv_output/Person2Recording1.csv
Added Person and Recording columns in: ./csv_output/Person2Recording2.csv
Added Person and Recording columns in: ./csv_output/Person2Recording3.csv
Added Person and Recording columns in: ./csv_output/Person3Recording1.csv
Added Person and Recording columns in: ./csv_output/Person3Recording2.csv
Added Person and Recording columns in: ./csv_output/Person3Recording3.csv
Added Person and Recording columns in: ./csv_output/Person4Recording1.csv
Added Person and Recording columns in: ./csv_output/Person4Recording2.csv
Added Person and Recording columns in: ./csv_output/Person5Recording4.csv
Added Person and Recording columns in: ./csv_output/Person6Recording4.csv
Added Person and Recording columns in:

## Remove New Segment, START, END annotations

In [4]:
import os, pandas as pd

unwanted = ['New Segment/', 'START', 'END']
for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue
    csv_file = os.path.join(output_path, file)
    df = pd.read_csv(csv_file)

    df = df[~df['annotation'].isin(unwanted)]
    df.to_csv(csv_file, index=False)
    print(f'Removed unwanted annotations in: {csv_file}')


Removed unwanted annotations in: ./csv_output/Person1Recording1.csv
Removed unwanted annotations in: ./csv_output/Person1Recording2.csv
Removed unwanted annotations in: ./csv_output/Person1Recording3.csv
Removed unwanted annotations in: ./csv_output/Person2Recording1.csv
Removed unwanted annotations in: ./csv_output/Person2Recording2.csv
Removed unwanted annotations in: ./csv_output/Person2Recording3.csv
Removed unwanted annotations in: ./csv_output/Person3Recording1.csv
Removed unwanted annotations in: ./csv_output/Person3Recording2.csv
Removed unwanted annotations in: ./csv_output/Person3Recording3.csv
Removed unwanted annotations in: ./csv_output/Person4Recording1.csv
Removed unwanted annotations in: ./csv_output/Person4Recording2.csv
Removed unwanted annotations in: ./csv_output/Person5Recording4.csv
Removed unwanted annotations in: ./csv_output/Person6Recording4.csv
Removed unwanted annotations in: ./csv_output/Person6Recording5.csv
Removed unwanted annotations in: ./csv_output/Pe

## Replace lost-sample marker with REST

In [5]:
import os, pandas as pd

for fname in sorted(os.listdir(output_path)):
    if not fname.endswith('.csv'):
        continue
    fullpath = os.path.join(output_path, fname)
    df = pd.read_csv(fullpath)

    df['annotation'] = df['annotation'].replace(
        'New Segment/LostSamples: 1', 'REST'
    )
    df.to_csv(fullpath, index=False)
    print(f"Oppdatert annotering i {fname}")


Oppdatert annotering i Person1Recording1.csv
Oppdatert annotering i Person1Recording2.csv
Oppdatert annotering i Person1Recording3.csv
Oppdatert annotering i Person2Recording1.csv
Oppdatert annotering i Person2Recording2.csv
Oppdatert annotering i Person2Recording3.csv
Oppdatert annotering i Person3Recording1.csv
Oppdatert annotering i Person3Recording2.csv
Oppdatert annotering i Person3Recording3.csv
Oppdatert annotering i Person4Recording1.csv
Oppdatert annotering i Person4Recording2.csv
Oppdatert annotering i Person5Recording4.csv
Oppdatert annotering i Person6Recording4.csv
Oppdatert annotering i Person6Recording5.csv
Oppdatert annotering i Person6Recording6.csv
Oppdatert annotering i Person7Recording7.csv
Oppdatert annotering i Person8Recording8.csv


In [6]:
import os
import pandas as pd

print("Recording durations:\n")

for file in sorted(os.listdir(output_path)):
    if not file.endswith('.csv'):
        continue

    csv_file = os.path.join(output_path, file)
    df = pd.read_csv(csv_file)

    # Duration is last time value minus first time value
    start_time = df['time'].iloc[0]
    end_time = df['time'].iloc[-1]
    duration_sec = end_time - start_time

    minutes = int(duration_sec // 60)
    seconds = int(duration_sec % 60)

    print(f"{file}: {minutes} min {seconds} sec")


Recording durations:

Person1Recording1.csv: 4 min 43 sec
Person1Recording2.csv: 4 min 43 sec
Person1Recording3.csv: 4 min 43 sec
Person2Recording1.csv: 4 min 43 sec
Person2Recording2.csv: 4 min 43 sec
Person2Recording3.csv: 4 min 43 sec
Person3Recording1.csv: 4 min 43 sec
Person3Recording2.csv: 4 min 43 sec
Person3Recording3.csv: 4 min 43 sec
Person4Recording1.csv: 14 min 10 sec
Person4Recording2.csv: 14 min 10 sec
Person5Recording4.csv: 14 min 19 sec
Person6Recording4.csv: 4 min 53 sec
Person6Recording5.csv: 4 min 53 sec
Person6Recording6.csv: 4 min 53 sec
Person7Recording7.csv: 4 min 53 sec
Person8Recording8.csv: 4 min 53 sec
