In [31]:
import mne
import numpy as np
from scipy.fft import fft, fftfreq
import matplotlib.pyplot as plt

In [44]:
import os


In [43]:
def get_dominant_frequency(filename):
    
    raw = mne.io.read_raw_bdf(filename)
    signals, times = raw[:]
    sampling_rate = raw.info['sfreq']

    samples_to_remove = int(5 * sampling_rate)

    # Remove the first 5 seconds from the data
    if signals.shape[1] > samples_to_remove:
        signals = signals[:, samples_to_remove:]
        times = times[samples_to_remove:]
    else:
        print("Warning: The data is shorter than 5 seconds.  No data removed.")
    freq_list = []
    for chn in range(9):
    # Choose the first channel
        signal = signals[chn, :]
        n = len(signal)

        # Perform FFT
        yf = fft(signal)
        xf = fftfreq(n, 1 / sampling_rate)

        # Calculate magnitude spectrum for positive frequencies
        magnitude = np.abs(yf)
        positive_frequencies = xf[:n//2]
        positive_magnitude = magnitude[:n//2] * 2

        # Find dominant frequency
        dominant_frequency_index = np.argmax(positive_magnitude)
        dominant_frequency = positive_frequencies[dominant_frequency_index]
        freq_list.append(dominant_frequency)
        # print(f"The dominant frequency for {chn} is : {dominant_frequency:.2f} Hz")
    return max(freq_list)

In [45]:

# filename = 'real_data/first.bdf'
final_freq_list = []
files_list = os.listdir('real_data')
for file_name in files_list:
    if file_name != 'sixth.bdf':
        continue
    maximum_freq = get_dominant_frequency('real_data/'+file_name)
    final_freq_list.append(maximum_freq)
print(final_freq_list)

Extracting EDF parameters from c:\Users\Keerti\Downloads\MySolution\HackGtech\ssvep\real_data\sixth.bdf...
BDF file detected
Setting channel info structure...
Creating raw.info structure...
[np.float64(3.834735279564572)]


# CCA implementation

In [41]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter
from collections import defaultdict
from sklearn.cross_decomposition import CCA
import mne  # Import MNE

SAMPLING_RATE = 128
CHESS_BOARD_TILES = [f'{chr(ord("A") + col)}{row}' for col in range(8) for row in range(1, 9)]
NUM_CHANNELS = 8
EPOCH_DURATION = 1.0
NUM_TARGETS = len(CHESS_BOARD_TILES)
FREQUENCY_START = 2.0
FREQUENCY_STEP = 0.1
if NUM_TARGETS > 0:
    TARGET_FREQUENCIES = np.linspace(FREQUENCY_START, FREQUENCY_START + (NUM_TARGETS - 1) * FREQUENCY_STEP, NUM_TARGETS)
    MAPPING = {freq: tile for freq, tile in zip(TARGET_FREQUENCIES, CHESS_BOARD_TILES)}
    # REVERSE_MAPPING = {tile: freq for freq, tile in MAPPING.items()}
else:
    TARGET_FREQUENCIES = []
    MAPPING = {}
    # REVERSE_MAPPING = {}

print(f"Number of Chessboard Tiles: {len(CHESS_BOARD_TILES)}")
print(f"Number of Target Frequencies: {len(TARGET_FREQUENCIES)}")
print(f"Frequency Mapping: {MAPPING}")

# --- Data Loading ---
def load_bdf_data(file_path):
    """Loads EEG data from a BDF file using MNE-Python."""
    try:
        raw = mne.io.read_raw_bdf(file_path, preload=True)  # Read and preload data
        data = raw.get_data()  # Get the EEG data as a NumPy array (channels x time points)
        sampling_rate = raw.info['sfreq']  # Get the sampling rate
        channel_labels = raw.ch_names  # Get channel names
        return data, sampling_rate, channel_labels
    except Exception as e:
        print(f"Error loading BDF file with MNE: {e}")
        return None, None, None

def read_sample_data(file_path):
    """Reads EEG data from either BDF, EDF, or MAT file."""
    if file_path.endswith(".BDF") or file_path.endswith(".bdf"):
        data, sr, labels = load_bdf_data(file_path)
        file_type = "BDF"
    else:
        print("Unsupported file format.")
        return None, None, None, None
    return data, sr, labels, file_type

# --- Data Analysis ---
def plot_eeg_signal(data, sampling_rate, channel_labels):
    """Plots a segment of the raw EEG signal."""
    num_channels = data.shape[0]
    num_samples = data.shape[1]
    time = np.linspace(0, num_samples / sampling_rate, num_samples)
    fig, axes = plt.subplots(num_channels, 1, figsize=(15, 3 * num_channels), sharex=True)
    for i, ax in enumerate(axes):
        ax.plot(time, data[i, :])
        ax.set_ylabel(channel_labels[i])
    ax.set_xlabel("Time (s)")
    plt.suptitle("Raw EEG Signal")
    plt.tight_layout()
    plt.show()

def plot_frequency_spectrum(data, sampling_rate, channel_labels, channel_index=0):
    """Plots the frequency spectrum of a single EEG channel."""
    n = data.shape[1]
    yf = np.fft.fft(data[channel_index, :])
    xf = np.fft.fftfreq(n, 1 / sampling_rate)
    plt.figure(figsize=(12, 6))
    plt.plot(xf[n//2:], np.abs(yf[n//2:]))
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Amplitude")
    plt.title(f"Frequency Spectrum - Channel: {channel_labels[channel_index]}")
    plt.grid(True)
    plt.show()

# --- Preprocessing ---
def butterworth_filter(data, lowcut, highcut, fs, order=5):
    """Applies a Butterworth bandpass filter."""
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    filtered_data = lfilter(b, a, data)
    return filtered_data

def preprocess_eeg(raw_eeg, sampling_rate):
    """Applies basic preprocessing to the EEG data."""
    # Example: Bandpass filter between 1 and 30 Hz
    filtered_eeg = butterworth_filter(raw_eeg, 1.0, 30.0, sampling_rate)
    return filtered_eeg

def epoch_data(processed_eeg, sampling_rate, epoch_duration, stimulus_info=None):
    """Segments the EEG data into epochs.
    stimulus_info (dict, optional): Dictionary where keys are timestamps and values are stimulus labels.
                                    If provided, epochs will be created around these events.
    """
    if stimulus_info:
        epochs = defaultdict(list)
        for timestamp, stimulus in stimulus_info.items():
            start_sample = int(timestamp * sampling_rate)
            end_sample = start_sample + int(epoch_duration * sampling_rate)
            if 0 <= start_sample < processed_eeg.shape[1] and end_sample <= processed_eeg.shape[1]:
                epochs[stimulus].append(processed_eeg[:, start_sample:end_sample])
        return epochs
    else:
        num_samples_per_epoch = int(sampling_rate * epoch_duration)
        num_epochs = processed_eeg.shape[1] // num_samples_per_epoch
        epochs = processed_eeg[:, :num_epochs * num_samples_per_epoch].reshape(processed_eeg.shape[0], num_epochs, num_samples_per_epoch)
        return epochs

def it_cca_train(epochs, sampling_rate):
    """Trains individual templates for each stimulus using averaged epochs."""
    templates = {}
    for stimulus, epoch_list in epochs.items():
        if epoch_list:
            templates[stimulus] = np.mean(np.array(epoch_list), axis=0)
        else:
            print(f"Warning: No epochs found for stimulus {stimulus} during training.")
    return templates

def it_cca_predict(eeg_epoch, templates):
    """Predicts the target stimulus using Individual Template based CCA."""
    correlations = {}
    cca = CCA(n_components=1)
    for stimulus, template in templates.items():
        if eeg_epoch.shape[1] == template.shape[1]:
            cca.fit(eeg_epoch.T, template.T)
            U_test, V_template = cca.transform(eeg_epoch.T, template.T)
            correlation = np.corrcoef(U_test.T, V_template.T)[0, 1]
            correlations[stimulus] = correlation
        else:
            print(f"Warning: Epoch length mismatch for stimulus {stimulus}.")
            correlations[stimulus] = -1  # Indicate mismatch

    if correlations:
        best_stimulus = max(correlations, key=correlations.get)
        return best_stimulus, correlations[best_stimulus]
    else:
        return "Unknown", -1


Number of Chessboard Tiles: 64
Number of Target Frequencies: 64
Frequency Mapping: {np.float64(2.0): 'A1', np.float64(2.1): 'A2', np.float64(2.2): 'A3', np.float64(2.3): 'A4', np.float64(2.4): 'A5', np.float64(2.5): 'A6', np.float64(2.6): 'A7', np.float64(2.7): 'A8', np.float64(2.8): 'B1', np.float64(2.9): 'B2', np.float64(3.0): 'B3', np.float64(3.1): 'B4', np.float64(3.2): 'B5', np.float64(3.3): 'B6', np.float64(3.4000000000000004): 'B7', np.float64(3.5): 'B8', np.float64(3.6): 'C1', np.float64(3.7): 'C2', np.float64(3.8): 'C3', np.float64(3.9000000000000004): 'C4', np.float64(4.0): 'C5', np.float64(4.1): 'C6', np.float64(4.2): 'C7', np.float64(4.300000000000001): 'C8', np.float64(4.4): 'D1', np.float64(4.5): 'D2', np.float64(4.6): 'D3', np.float64(4.7): 'D4', np.float64(4.800000000000001): 'D5', np.float64(4.9): 'D6', np.float64(5.0): 'D7', np.float64(5.1): 'D8', np.float64(5.2): 'E1', np.float64(5.300000000000001): 'E2', np.float64(5.4): 'E3', np.float64(5.5): 'E4', np.float64(5.6):

In [53]:
edf_file1 = "real_data/"+files_list[0]

data1, sr1, labels1, type1 = read_sample_data(edf_file1)

if data1 is not None and sr1 is not None and labels1 is not None:
    print(f"Read {type1} file: {edf_file1} with shape {data1.shape} and sampling rate {sr1} Hz.")
    print(f"Channel Labels: {labels1}")
    stimulus_info_train = {
        1: "A1",  # At 1 second, user looked at A1
        3: "B1",  # At 3 seconds, user looked at B1
        5: "A1",
        7: "C1",
        9: "B1",
        11: "A1",
    }

    # Preprocess the training data
    processed_data1 = data1 #preprocess_eeg(data1, sr1)
    train_epochs = epoch_data(processed_data1, sr1, EPOCH_DURATION, stimulus_info_train)

    # 3. Implement IT-CCA for decision making (Training phase)
    templates = it_cca_train(train_epochs, sr1)
    print("\nLearned Templates (average shape):")
    for stimulus, template in templates.items():
        print(f"{stimulus}: {template.shape}")

    # Simulate a new test epoch (replace with real-time data later)
    test_start_time = 5  # Simulate a test epoch starting at 15 seconds
    test_end_time = test_start_time + EPOCH_DURATION
    start_sample_test = int(test_start_time * sr1)
    end_sample_test = int(test_end_time * sr1)

    if 0 <= start_sample_test < processed_data1.shape[1] and end_sample_test <= processed_data1.shape[1]:
        test_epoch = processed_data1[:, start_sample_test:end_sample_test]

        # Prediction phase
        predicted_stimulus, correlation = it_cca_predict(test_epoch.reshape(processed_data1.shape[0], -1), templates)
        print(f"\nPredicted Stimulus for the test epoch: {predicted_stimulus} with correlation {correlation:.4f}")
        if predicted_stimulus in MAPPING.values():
            predicted_tile = list(MAPPING.keys())[list(MAPPING.values()).index(predicted_stimulus)]
            print(f"Predicted Chess Tile: {predicted_tile}")
        else:
            print("Predicted stimulus not mapped to a chess tile.")
    else:
        print("Simulated test epoch out of bounds.")


Extracting EDF parameters from c:\Users\Keerti\Downloads\MySolution\HackGtech\ssvep\real_data\fifth.bdf...
BDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 2508  =      0.000 ...    10.032 secs...
Read BDF file: real_data/fifth.bdf with shape (9, 2509) and sampling rate 250.0 Hz.
Channel Labels: ['EEG 1', 'EEG 2', 'EEG 3', 'EEG 4', 'EEG 5', 'EEG 6', 'EEG 7', 'EEG 8', 'Status']

Learned Templates (average shape):
A1: (9, 250)
B1: (9, 250)
C1: (9, 250)

Predicted Stimulus for the test epoch: A1 with correlation 0.9429
Predicted Chess Tile: 2.0


In [52]:
# 0 <= start_sample_test < processed_data1.shape[1]
start_sample_test
# processed_data1.shape[1]

3750