In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy import signal

# Exploring the dataset

### Importing one of the csv files to check out the format

In [None]:
df = pd.read_csv("./processed/backwarda.csv")
df.head()

### Analyzing the EEG Signal

Following code will plot the signal's fourier transform, which will help in visualizing the EEG signal frequencies. The fourier transform will then be split into multiple bands to better understand the hidden relationships inside the EEG signal

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Path to the directory containing CSV files
directory = './processed'

# Get list of CSV files in the directory
csv_files = [file for file in os.listdir(directory) if file.endswith('.csv')]

# Define frequency bands
bands = {
    'Delta': (0.5, 4),
    'Theta': (4, 8),
    'Alpha': (8, 13),
    'Beta': (13, 30),
    'Gamma': (30, 100)
}

# Create subplots
num_plots = len(csv_files)
fig, axes = plt.subplots(num_plots, len(bands)+2, figsize=(5*len(bands), 5*num_plots))

# Plot each CSV file and its Fourier transform
for i, file in enumerate(csv_files):
    # Read CSV file
    data = pd.read_csv(os.path.join(directory, file))
    
    # Compute Fourier transform
    y = data['Value']
    N = len(y)
    dt = 1 / 125  # Sampling interval
    freq = np.fft.fftfreq(N, dt)[:N//2]  # Frequencies
    y_fft = np.fft.fft(y)[:N//2]  # Fourier transform
    
    # Plot original data
    axes[i, 0].plot(data['timestamp'], data['Value'], label='Original')
    axes[i, 0].set_title(file)
    axes[i, 0].set_xlabel('TimeStamp')
    axes[i, 0].set_ylabel('Value')
    axes[i, 0].legend()

    # Plot Fourier transform
    axes[i, 1].plot(freq, np.abs(y_fft))
    axes[i, 1].set_title('Full Spectrum')
    axes[i, 1].set_xlabel('Frequency (Hz)')
    axes[i, 1].set_ylabel('Amplitude')

    # Plot Power Spectral Density
    axes[i, 1].psd(y, Fs=125, NFFT=N, color='r', alpha=0.5)

    # Plot filtered bands
    for j, (band_name, (low, high)) in enumerate(bands.items()):
        band_mask = (freq >= low) & (freq <= high)
        band_freq = freq[band_mask]
        band_fft = y_fft[band_mask]
        axes[i, j+2].plot(band_freq, np.abs(band_fft))
        axes[i, j+2].set_title(f'{band_name} Band')
        axes[i, j+2].set_xlabel('Frequency (Hz)')
        axes[i, j+2].set_ylabel('Amplitude')

# Adjust layout
plt.tight_layout()

# Show plot
plt.savefig("see.svg")
plt.show()

### Analyzing the PSD of the EEG signal

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Path to the directory containing CSV files
directory = './processed'

# Get list of CSV files in the directory
csv_files = [file for file in os.listdir(directory) if file.endswith('.csv')]

# Create subplots for PSD
num_plots = len(csv_files)
fig, axes = plt.subplots(num_plots, 1, figsize=(8, 6*num_plots))

# Plot each CSV file and its PSD
for i, file in enumerate(csv_files):
    # Read CSV file
    data = pd.read_csv(os.path.join(directory, file))
    
    # Compute PSD
    y = data['Value']
    axes[i].psd(y, Fs=125, NFFT=len(y), color='r', alpha=0.5)
    axes[i].set_title(f'PSD of {file}')
    axes[i].set_xlabel('Frequency (Hz)')
    axes[i].set_ylabel('Power/Frequency (dB/Hz)')

# Adjust layout
plt.tight_layout()

# Show plot
plt.show()



### Plotting the Spectogram of the Signal

Spectogram is a graphical representation that shows the distribution of energy in different frequency bands over time.

In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Path to the directory containing CSV files
directory = './processed'

# Get list of CSV files in the directory
csv_files = [file for file in os.listdir(directory) if file.endswith('.csv')]

# Create subplots for spectrograms
num_plots = len(csv_files)
fig, axes = plt.subplots(num_plots, 1, figsize=(8, 6*num_plots))

# Plot each CSV file and its spectrogram
for i, file in enumerate(csv_files):
    # Read CSV file
    data = pd.read_csv(os.path.join(directory, file))
    
    # Compute spectrogram
    y = data['Value']
    Fs = 125  # Sampling frequency
    NFFT = 256  # Length of the windowing segments
    noverlap = NFFT // 2  # Overlap between windows
    spec, freqs, times, _ = plt.specgram(y, NFFT=NFFT, Fs=Fs, noverlap=noverlap)
    
    # Plot spectrogram
    axes[i].imshow(spec, aspect='auto', cmap='viridis', origin='lower', extent=[times.min(), times.max(), freqs.min(), freqs.max()])
    axes[i].set_title(f'Spectrogram of {file}')
    axes[i].set_xlabel('Time (s)')
    axes[i].set_ylabel('Frequency (Hz)')

# Adjust layout
plt.tight_layout()

# Show plot
plt.show()


## Validating Sampling rate:

The code below will validate the sample rate of the processed dataset. The arduino was programmed to take 125 samples of the document.

In [None]:
import os
import pandas as pd
import numpy as np

# Path to the directory containing CSV files
directory = "./processed"

# Get list of CSV files in the directory
csv_files = [file for file in os.listdir(directory) if file.endswith(".csv")]

# Loop through each CSV file to calculate sampling rate
for file in csv_files:
    # Read CSV file
    data = pd.read_csv(os.path.join(directory, file))

    # Extract timestamps
    timestamps = data["timestamp"]

    # Calculate time difference between consecutive samples
    time_diff = np.diff(timestamps)

    # Calculate the average time difference (sampling period)
    sampling_period = np.mean(time_diff)

    # Calculate the sampling rate (samples per second)
    sampling_rate = 1 / sampling_period

    # Print sampling rate
    print(f"Sampling rate for {file}: {sampling_rate} Hz")

    # Validate the sampling rate (should be close to the expected value)
    expected_sampling_rate = 125  # Expected sampling rate
    if abs(sampling_rate - expected_sampling_rate) < 1:  # Acceptable deviation
        print("Sampling rate validated.")
    else:
        print("Sampling rate validation failed.")

### Creating the Dataset

Applying a sliding window algorithm to create the training dataset. A window of variable size will traverse the signal. The EEG signal inside that window will be processed, and features will be extracted from it.

Following features are extracted:

1. `Start Timestamp`: The starting timestamp of the window.
2. `End Timestamp`: The ending timestamp of the window.
3. `FFT Result`: Fast Fourier Transform (FFT) result of the signal within the window.
4. `Mean`: Mean value of the signal within the window.
5. `Max`: Maximum value of the signal within the window.
6. `Standard Deviation`: Standard deviation of the signal within the window.
7. `RMS`: Root Mean Square (RMS) value of the signal within the window.
8. `Kurtosis`: Kurtosis value of the signal within the window.
9. `Skewness`: Skewness value of the signal within the window.
10. `Peak-to-Peak`: Peak-to-peak amplitude of the signal within the window.
11. `Abs Diff Signal`: Absolute difference of the signal within the window.
12. `Alpha Power`: Power in the alpha band of the signal within the window.
13. `Beta Power`: Power in the beta band of the signal within the window.
14. `Gamma Power`: Power in the gamma band of the signal within the window.
15. `Delta Power`: Power in the delta band of the signal within the window.
16. `Theta Power`: Power in the theta band of the signal within the window.
17. `Label`: The corresponding label for the window i.e 0 for **Backward** 1 for **Left** etc.

The resulting dataset is stored inside `FinalDataset.csv`.

In [4]:
import os
import pandas as pd
import numpy as np
from scipy.fft import fft
from scipy.stats import kurtosis, skew

def sliding_window_with_timestamp(data, window_size, stride):
    timestamps = data['timestamp'].values
    values = data['Value'].values
    for i in range(0, len(timestamps) - window_size + 1, stride):
        window_timestamps = timestamps[i:i+window_size]
        window_values = values[i:i+window_size]
        yield window_timestamps, window_values

def compute_fft(signal):
    return np.abs(fft(signal))

def compute_statistics(signal):
    mean = np.mean(signal)
    max_val = np.max(signal)
    std_dev = np.std(signal)
    rms = np.sqrt(np.mean(signal**2))
    kurt = kurtosis(signal)
    skewness = skew(signal)
    peak_to_peak = np.ptp(signal)
    abs_diff_signal = np.sum(np.abs(np.diff(signal)))
    return mean, max_val, std_dev, rms, kurt, skewness, peak_to_peak, np.mean(abs_diff_signal)

def compute_eeg_frequency_bands(fft_result, sampling_rate):
    freq_bins = np.fft.fftfreq(len(fft_result), 1 / sampling_rate)
    alpha_band = (8, 12)
    beta_band = (12, 30)
    gamma_band = (30, 100)
    delta_band = (0.5, 4)
    theta_band = (4, 8)

    alpha_idx = np.where((freq_bins >= alpha_band[0]) & (freq_bins <= alpha_band[1]))[0]
    beta_idx = np.where((freq_bins >= beta_band[0]) & (freq_bins <= beta_band[1]))[0]
    gamma_idx = np.where((freq_bins >= gamma_band[0]) & (freq_bins <= gamma_band[1]))[0]
    delta_idx = np.where((freq_bins >= delta_band[0]) & (freq_bins <= delta_band[1]))[0]
    theta_idx = np.where((freq_bins >= theta_band[0]) & (freq_bins <= theta_band[1]))[0]

    alpha_power = np.sum(fft_result[alpha_idx])
    beta_power = np.sum(fft_result[beta_idx])
    gamma_power = np.sum(fft_result[gamma_idx])
    delta_power = np.sum(fft_result[delta_idx])
    theta_power = np.sum(fft_result[theta_idx])

    return alpha_power, beta_power, gamma_power, delta_power, theta_power

def process_window(start_timestamp, end_timestamp, window_values):
    fft_result = compute_fft(window_values)
    statistics = compute_statistics(window_values)
    alpha_power, beta_power, gamma_power, delta_power, theta_power = compute_eeg_frequency_bands(fft_result, 125)
    return start_timestamp, end_timestamp, fft_result, *statistics, alpha_power, beta_power, gamma_power, delta_power, theta_power

def adjust_window_parameters(window_duration, sampling_rate):
    window_size = int(window_duration * sampling_rate)
    stride = window_size // 2
    return window_size, stride

def process_data_with_sliding_window(df, window_duration, sampling_rate):
    window_size, stride = adjust_window_parameters(window_duration, sampling_rate)
    results = []
    timestamps = df['timestamp'].values
    for i, (window_timestamps, window_values) in enumerate(sliding_window_with_timestamp(df, window_size, stride)):
        start_timestamp = window_timestamps[0]
        end_timestamp = window_timestamps[-1]
        result = process_window(start_timestamp, end_timestamp, window_values)
        results.append(result)
    return results

def generate_training_matrix(directory, output_file):
    training_files = os.listdir(directory)
    
    window_duration = 1  # seconds, adjust according to your preference
    sampling_rate = 125  # Hz

    all_results = []
    for file in training_files:
        state = None
        if "backward" in file.lower():
            state = 0
        elif "right" in file.lower():
            state = 2
        elif "left" in file.lower():
            state = 1
        elif "forward" in file.lower():
            state = 3
        df = pd.read_csv(os.path.join(directory, file))
        results = process_data_with_sliding_window(df, window_duration, sampling_rate=sampling_rate)
        for result in results:
            lis = list(result)
            lis.append(state)
            result = tuple(lis)
            all_results.append(result)
            
    result_df = pd.DataFrame(all_results, columns=['Start Timestamp', 'End Timestamp', 'FFT Result', 'Mean', 'Max', 'Standard Deviation', 'RMS', 'Kurtosis', 'Skewness', 'Peak-to-Peak', 'Abs Diff Signal', 'Alpha Power', 'Beta Power', 'Gamma Power', 'Delta Power', 'Theta Power', 'Label'])
    fft_columns = [f'FFT_{i}' for i in range(125)]
    fft_data = pd.DataFrame(result_df['FFT Result'].tolist(), columns=fft_columns)
    result_df = pd.concat([result_df.drop('FFT Result', axis=1), fft_data], axis=1)
    # Move Label column to end
    label_column = result_df.pop('Label')
    result_df['Label'] = label_column
    result_df.to_csv(output_file, index=False)

    return result_df

# Call the function
result = generate_training_matrix("./processed", "FinalDataset.csv")
