<a href="https://colab.research.google.com/github/aryankotru/MINERVA/blob/min_beta/MINERVA_Preprocessing_Unit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [129]:
import scipy.io
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pywt
import math
import csv
import os
from collections import Counter
from scipy.stats import entropy, kurtosis, skew
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from scipy.signal import butter, sosfilt, sosfreqz, lfilter, iirnotch, spectrogram
from google.colab import drive

#TODO:
#test with GUI in real-time
#motor imagery data
#collect a lot more data for each predictor
#ssvep binary classification
#motor imagery - left foot, right hand
#more data, less predictors
#siamese network - one shot learning
#fix shannon entro and other features

In [2]:
def read_input_file(input_file):
    data = []
    with open(input_file, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith('%Sample Rate'):
                key_value = line[1:].strip().split(' = ')
                value = key_value[1]
                fs = int(value.strip().split(' ')[0])

            elif line.startswith('Sample Index'):
                header = line.split(', ')
            else:
                data.append(line.split(', ')) #typecast as int'

    data = [sublist[1:][:8] for sublist in data[3:]]
    converted_array = np.array(data, dtype=float).tolist()
    return fs, header[:9], converted_array

def convert_and_write_output(output_file, header, data):
    with open(output_file, 'w', newline='') as file:
        writer = csv.writer(file)

        # Write metadata
        #writer.writerow(['% Metadata'])
        """for key, value in metadata.items():
          if key == 'Sample Rate':
            fs = int(value.strip().split(' ')[0])"""
            #print(fs)
            #writer.writerow([f'% {key} = {value}'])

        # Write data header
        #writer.writerow(['% Data'])
        """update_header = []
        for header_val in header:
          if header_val.startswith('EXG Channel'):
            header_val = header_val[:].split('EXG Channel ')
            update_header.append(int(header_val[1]))
        writer.writerow(update_header[:])"""

        # Write data rows
         #removes index and keeps only 8 channel data from OpenBCI format
        writer.writerows(data)
    return fs, data



def get_latest_subdirs(b):
  result = []
  for d in os.listdir(b):
    bd = os.path.join(b, d)
    if os.path.isdir(bd): result.append(bd)
    latest_subdir = max(result, key=os.path.getmtime)
  return latest_subdir


def get_input_raw_file(latest_subdir):
  file_names = []
  for x in os.listdir(latest_subdir):
    if x.endswith(".txt"):
      filepath = os.path.join(latest_subdir, x)
    else:
      return "No matching file"
  return filepath


global variable declaration

In [6]:
epoch = 710 #calc epoch len from fs and epoch time len
num_epochs = 12
# Sample rate and desired cutoff frequencies (in Hz)*
fs = 250
fs = fs
lowcut = 1
highcut = 30
filter_order = 9
notch_quality_factor = 0.693
wavelet = 'sym9'
drive_mounted = False

freq_bands = {"delta": [0.3, 4], "theta": [4, 8], "alpha": [8, 13], "beta": [13, 30], "gamma": [30, 50]}
chars = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0", "*", "#"]
targets = [1 , 2 ,3 , 4, 5, 6,7,8,9,10,11,12]
feature_names = ['Mean', 'Standard Deviation', 'Skewness', 'Entropy', 'Energy', 'Kurtosis', 'Median' ,'Variance']

channel_names = ["POz", "PO3", "PO4", "PO5", "PO6", "Oz", "O1", "O2"]
channel_numbers = [1,2,3,4,5,6,7,8]

T =  2.84
nsamples = T * fs
t = np.arange(0, epoch) / fs


data filtering

In [7]:
def notch_filter(data, rate, freq, quality):
      x = scipy.signal.filtfilt(*scipy.signal.iirnotch(freq / (rate / 2), quality), data)
      #https://neuraldatascience.io/7-eeg/erp_filtering.html
      return x

def butter_bandpass(lowcut, highcut, fs, order=filter_order):
        nyq = 0.5 * fs #
        low = lowcut / nyq
        high = highcut / nyq
        sos = butter(order, [low, high], analog=False, btype='band', output='sos')
        return sos

def butter_bandpass_filter(data, lowcut, highcut, fs, order=filter_order):
        sos = butter_bandpass(lowcut, highcut, fs, order=order)
        y = sosfilt(sos, data)
        return y

data normalization

In [128]:
def dataset_scaler(dataframe, column_names):
  scaler = MinMaxScaler() #normalizing data points, object of class StandardScaler
  minmax_scaled_data = scaler.fit_transform((dataframe))
  np.reshape(minmax_scaled_data, dataframe.shape)
  scaled_data = np.vstack((minmax_scaled_data))
  dfa = pd.DataFrame(minmax_scaled_data)
  dfa.columns = column_names #index for columns
  return dfa

In [116]:
def eta(data, unit='shannon'):
    base = {
        'shannon' : 2.,
        'natural' : math.exp(1),
        'hartley' : 10.}
    if len(data) <= 1:
        return 0
    counts = Counter()
    for d in data:
        counts[d] += 1
    ent = 0
    probs = [round(float(c)) / len(data) for c in counts.values()]
    for p in probs:
        if p > 0.:
            ent -= p * math.log(p, base[unit])
    return ent
  #https://stackoverflow.com/questions/15450192/fastest-way-to-compute-entropy-in-python

def calculate_features(coeffs):
    mean_val = np.mean(coeffs)
    std_val = np.std(coeffs) #maybe try variance
    skewness_val = skew(coeffs)
    entropy_val = eta(coeffs)
    energy = np.sum(coeffs**2)
    kurtosis_value = kurtosis(coeffs)
    median_value = np.median(coeffs)
    variance_value = np.var(coeffs)
    return mean_val, std_val, skewness_val, entropy_val, energy, kurtosis_value, median_value, variance_value

wavelet transform & feature extraction

In [130]:
def feature_generator(df, channel_names, epoch_len):
    num_epochs = int(len(df) / epoch_len)
    all_features = []

    for n in range(num_epochs):
        for channel in channel_numbers:
            data = df.iloc[n * epoch_len:(n + 1) * epoch_len, channel-1]
            coeffs = pywt.wavedec(data, wavelet, level = 5) #returns level+1 values, level detail coeffs and 1 approx
            """Frequency Bands:
              The frequency range corresponding to each level is calculated. For a sampling frequency of 250 Hz, the levels correspond roughly to:
              Level 1: 62.5 - 125 Hz
              Level 2: 31.25 - 62.5 Hz
              Level 3: 15.625 - 31.25 Hz
              Level 4: 7.8125 - 15.625 Hz
              Level 5: 3.9062 - 7.8125 Hz
              Approximation: 0 - 3.9062 Hz
              The alpha band (8-13 Hz) lies within Level 4."""
            for i, coeff in enumerate(coeffs[3:], 1):
                features = calculate_features(coeff)
                all_features.append([channel] + list(features))
            features = calculate_features(coeffs[0])
            all_features.append([channel] + list(features))

    feature_df = pd.DataFrame(all_features, columns=['Channel'] + feature_names)
    feature_df.set_index('Channel', inplace=True)

    return feature_df

dataset labeller

In [132]:
level_map = [1,2,3,4,5]
feature_map = [1,2,3,4]
chars = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

#num = 20 for label, num = 1 for feature,  num = 4 for level
#n = 0 for label, n = 1 for feature, n = 2 for level, in order

def drop_column(dataframe, column_name):
  try:
    df = dataframe.drop(columns=column_name, axis =1)
    return df
  except Exception as e:
    return dataframe

def dataset_labeller(dataframe, num_features_per_epoch, chars, n, label):
  num_samples = len(dataframe) // num_features_per_epoch
  num_labels = len(chars)  # 12 labels

  # Create a list of labels for each 32-row segment
  labels = []
  for i in range(num_samples):
      labels.extend([chars[i % num_labels]] * num_features_per_epoch)
  # Add the labels to the DataFrame
  dataframe.insert(dataframe.shape[1]-n, label , labels, allow_duplicates = False) #df.shape[1] returns the number of columns
  return dataframe

main function

In [134]:
def preprocessing(dataframe, epoch_len):

   filtered_df = pd.DataFrame(columns=dataframe.columns)
   num_epochs = round(int(len(dataframe.index)/epoch_len))
   #print(num_epochs)

   for channel, i in enumerate(dataframe.columns):
      filtered_col_data = []
      for n in range(num_epochs):
          data = dataframe.iloc[n*epoch:(n+1)*epoch, channel]
          data = notch_filter(data, fs, 50, notch_quality_factor)
          data = butter_bandpass_filter(data, lowcut, highcut, fs, order=filter_order)
          filtered_col_data.extend(data)
      filtered_df[i] = filtered_col_data
   feature_df = feature_generator(filtered_df, channel_names, epoch_len) #extract and store features
   scaled_df = dataset_scaler(feature_df,  feature_names) #scale dataset AFTER filtering
   return scaled_df

In [13]:
def dataset_combiner(file_names, pattern):
  data = [pd.read_excel(file) for file in file_names]
  df = pd.concat(data, axis = 0)
  return df

In [135]:
def dataset_output(file_path):
  files = []
  for x in os.listdir(file_path):
    if x.endswith(".csv"):
      files.append(x)
      files.sort()
  for file in files:
    df = pd.read_csv(file, header = None)
    df = np.transpose(df)
    df.columns = channel_names
    da = preprocessing(df, 710)
    output = da.to_excel(f'MIN{files.index(file)+1}eta.xlsx', sheet_name='sheet1',  float_format = "%.8f")
  return dataset_combiner(files, "eta.xlsx")

In [None]:
if not drive.mount('/content/drive'):
  drive_mounted = True

In [124]:
def main(input_method, output_file):
  if input_method == 'system':
    #dir = get_latest_subdirs('/content')
    #input_file = get_input_raw_file("/content/OpenBCI-RAW-2024-06-10_14-01-02.txt")  # Replace with your input file path
    fs, header, data = read_input_file(f"/content/OpenBCI-RAW-2024-06-10_14-01-02.txt")
    #fs, data = convert_and_write_output(output_file,  header, data)
    df = pd.DataFrame(data, columns = channel_names)

  elif input_method == 'manual':
    #Load EEG data from the csv file
    df = pd.read_csv("S001.csv", header = None)
    #df = pd.read_csv(f"/content/drive/MyDrive/Final Year Project/Machine Learning/ML Models/Datasets/MINERVA_dataset/S001.csv", header = None)
    df = np.transpose(df)
    df.columns = channel_names
    num_rows, num_cols = df.shape
    num_epochs = int((num_rows)/epoch)

  df = preprocessing(df, 710)
  print(df.columns)
  dataset = flatten(df)
  labelled_dataset = dataset_labeller(dataset, 1, targets, 0, 'Label')
  output = labelled_dataset.to_excel(f'MINtheta.xlsx', sheet_name='sheet1',  float_format = "%.8f")
  return labelled_dataset

if __name__ == '__main__':
    output_file = 'output.csv'  # Replace with your desired output file path
    df = main('system', output_file)

Index(['Mean', 'Standard Deviation', 'Skewness', 'Entropy', 'Energy',
       'Kurtosis', 'Median', 'Variance'],
      dtype='object')
Original data shape: (5664, 8)
Flattened data shape: (177, 256)


In [136]:
def flatten(dataframe):
  # Define the segment size
  segment_height = 32
  segment_width = 8
  # Calculate the number of segments
  num_segments = dataframe.shape[0] // segment_height

  # Initialize a list to hold the flattened vectors
  flattened_vectors = []

  for i in range(num_segments):
      # Extract the 8x32 segment
      segment = dataframe.iloc[i * segment_height:(i + 1) * segment_height, :]

      # Flatten the segment into a 1D vector of length 256
      flattened_vector = segment.to_numpy().flatten()

      # Add the flattened vector to the list
      flattened_vectors.append(flattened_vector)

  # Convert the list of vectors to a NumPy array
  flattened_vectors = np.array(flattened_vectors)

  #print("Original data shape:", dataframe.shape)
  #print("Flattened data shape:", flattened_vectors.shape)
  da =  pd.DataFrame(flattened_vectors)
  return da

game interface

In [137]:
def model_output():
  model_output = 3
  with open(r"D://Documents\College\Semester-VIII\B.E. Project\MINERVA_dataset\Output\MINERVA_output.txt", "w") as file:
    file.write(str(model_output))

siamese network

In [None]:
from scipy.spatial import distance

arr1 = df.iloc[0,:-1]
arr2 = df.iloc[24, :-1]
#print(arr1)
dist = distance.euclidean(arr1, arr2)
print(f'Euclidean distance: {dist}')
#THIS TELLS ME THIS DATASET IS A LOAD OF CRAP