In [1]:
import mne
import numpy as np
import os
import matplotlib.pyplot as plt
import statistics as stat
import sys
import csv
from random import randint
from math import floor

Script converts a raw voltage reading into a time frequency power reading across a specified frequency range 

In [2]:
# THIS IS THE ONLY CELL THAT WILL BE ALTERED AFTER EXTRACTION HAS BEGUN

auth_user = "Keaton"
outfile = auth_user + ".csv"
song_id = 1

total_trials = 4

In [3]:
from enum import Enum
class Sensor(Enum):
    AF3 = 0
    F7 = 1
    F3 = 2
    FC5 = 3
    T7 = 4
    P7 = 5
    O1 = 6
    O2 = 7
    P8 = 8
    T8 = 9
    FC6 = 10
    F4 = 11
    F8 = 12
    AF4 = 13

In [4]:
# global settings

silent_dir = "silent/"
music_dir = "music/"
alpha_lower_freq = 8
beta_lower_freq = 13
beta_upper_freq = 30

# holds the time intervals that will be extracted
# alpha values placed first
# beta values placed second
# order determined by sensor arrays 
time_intervals = []
interval_seconds = 15     # seconds
sample_time_seconds = 0.5 # seconds 
points_per_row = 0                  # dynamically calculated
csv_needs_header = True

# used to denote what sensors are being recorded
alpha_sensors = [Sensor.AF4, Sensor.P8, Sensor.FC5, Sensor.F3, Sensor.AF3, Sensor.F7, Sensor.T8, Sensor.FC6, Sensor.F4, Sensor.F8]
beta_sensors = [Sensor.AF4, Sensor.P8, Sensor.FC5, Sensor.F3, Sensor.AF3, Sensor.F7, Sensor.T8, Sensor.FC6, Sensor.F4, Sensor.F8]    

# names of participants 
names = ["Keaton", "Alyse", "Richard", "Morty", "Jay"]

This section finds the two files to be analyzed based off of the name and trial number
    1. silent reading
    2. music reading

In [5]:
class TagError(Exception):
    pass

def find_filename_by_tag(tag, dir_path):
    """
    function for finding a single file with a given tag
    """
    
    tag_len = len(tag)

    filenames = []

    for filename in os.listdir(dir_path):
        if filename[0:tag_len] == tag:
            filenames.append(filename)

    # check to make sure only one file matched the tag
    # if this is not the case, there is an error
    if len(filenames) != 1:
        print("Error with file lookup: Tag {}".format(tag))
        print("Total files found: {}".format(len(filenames)))
        for filename in filenames:
            print(filename)
        raise TagError("Tag Failed to Locate a Single File")    
        
    return filenames[0]
    

This section generates frequency power readings from the two files using the MNE library

In [6]:
def preprocess_raw(raw):
    """
    prepares a MNE raw object created from an Emotiv generated EDF file to have time frequency analysis done on it
    """
    # drop unnecessary channels
    extra_ch = raw.ch_names[0:2] + raw.ch_names[16:40]
    raw.drop_channels(extra_ch)
    
    # manually add stim channel
    stim_data = np.zeros((1, len(raw.times)))
    info = mne.create_info(['STI'], raw.info['sfreq'], ['stim'])
    stim_raw = mne.io.RawArray(stim_data, info)
    raw.add_channels([stim_raw], force_update_info=True)

In [7]:
def get_power(raw, l_freq, h_freq):
    """
    converts a raw MNE object that has been created using an Emotiv Pro generated EDF 
    file into a set of the average power across the specified frequencies
    
    raw: raw object
    l_freq: frequency lower bound
    h_freq: frequency upper bound
    
    return an MNE TRAverage with the average frequency from l_freq to h_freq
    """
    # define time interval over which to extract
    tmin = 0    # starting second
    tmax = 39   # ending second
    start, stop = raw.time_as_index([tmin, tmax])
    
    # define montage 
    montage = mne.channels.read_montage('standard_1020')
    raw.set_montage(montage)
    
    # add events 
    events = np.array([[0, 0, 1]]) 
    raw.add_events(events)
    
    # add picks
    picks = mne.pick_types(raw.info, eeg=True, eog=False, meg=False, stim=False, 
            exclude='bads')
    
    # add epochs
    event_id, tmin, tmax = 1, 0, 39
    baseline = (None, 0)
    epochs = mne.Epochs(raw, events, event_id, tmin, tmax,
        picks=picks,
        baseline=baseline,
        preload=True)
    
    # extract the average power across the specified range of frequencies
    freqs = np.logspace(*np.log10([l_freq, h_freq]), num=1)
    n_cycles = freqs / 2
    power, itc = mne.time_frequency.tfr_morlet(epochs, freqs=freqs, 
                                                n_cycles=n_cycles,
                                                use_fft=True,
                                                return_itc=True,
                                                decim=3, 
                                                n_jobs=1)
    
    return power

In [8]:
# replace values outside of a set critical value with the channel average    
def clean_channel(power, sensor_id):
    avg = stat.mean(power.data[sensor_id][0])
    std = stat.mean(power.data[sensor_id][0])
    outliers = []
    crit = 5
    
#     print(Sensor(sensor_id).name)
    for i in range(len(power.data[sensor_id][0])):
        z = (power.data[sensor_id][0][i] - avg) / std
        if z >= crit:
#             print("value: {}".format(power.data[sensor_id][0][i]))
#             print("time: {}".format(power.times[i]))
#             print("z score: {}".format(z))
#             print()
            outliers.append(i)
    
    # remove points and recalculate average
    avg = avg * len(power.data[sensor_id][0])
    for outlier in outliers:
        avg -= power.data[sensor_id][0][outlier]
    
    avg = avg / (len(power.data[sensor_id][0]) - len(outliers))
    
    # set all outliers to the new average
    for outlier in outliers:
        power.data[sensor_id][0][outlier] = avg

Clean the data from the music reading with the following processes
    1. any data point with a z score outside of a set critical value will be replaced with the average of the reading

In [9]:
def get_time_interval(music_power, sensor, time_length):
    """
    finds the four time intervals across the power array that correspond to the maximum change in the frequency band
    
    music_power: the MNE AverageTF Power object created from Emotiv Pro 
    sensor: integer corresponding to the index of the sensor's data in music_power
    time_length: a float representing the length of the time interval to be calculated
    
    returns an array of tuples
        each has a starting index and an ending index
    """
    total = 0
    start = 0
    end = 0
    
    lower = 0
    upper = 0
    
    total = 0
    maximum = 0
    
    intervals = []
    
    # size the interval
    while music_power.times[end] - music_power.times[start] < time_length:
        total += music_power.data[sensor][0][end]
        end += 1
    
    upper = end
    maximum = total
    
    while end < len(music_power.data[sensor][0]):
        total += music_power.data[sensor][0][end]
        total -= music_power.data[sensor][0][start]
        start += 1
        end += 1
        
        if total > maximum:
            lower = start
            upper = end
            maximum = total
            
        
        
    return (lower, upper)
    
    

In [10]:
def extract(name, song, trial, is_user):
    """
    extract the trial information from a user
    @param name: the name associated with the trial file
    @param song: songs are numbered 1 - 5
    @param trial: trials are numbered 1 - 3
    """
    
    global csv_needs_header
    global points_per_row
    
    music_tag = "{}_{}_{}_".format(name, song, trial)
    silent_tag= "{}_s_{}_".format(auth_user, 0)
    
    # Create tag to locate files in respective directories
    silent_filename = ""
    music_filename = ""

    try:
        silent_filename = find_filename_by_tag(silent_tag, silent_dir)
        music_filename = find_filename_by_tag(music_tag, music_dir)
    except TagError:
        print("A Tag Error Occured")
        sys.exit()

    print(silent_filename)
    print(music_filename)

    # read in files
    raw_silent = mne.io.read_raw_edf(silent_dir + silent_filename, preload=True)
    raw_music = mne.io.read_raw_edf(music_dir + music_filename, preload=True)

    preprocess_raw(raw_silent)
    preprocess_raw(raw_music)

    print(raw_music.ch_names)
    print(raw_silent.ch_names)

    # extra average power over alpha and beta frequencies for each file
    alpha_silent = get_power(raw_silent, alpha_lower_freq, beta_lower_freq)
    beta_silent = get_power(raw_silent, beta_lower_freq, beta_upper_freq)
    alpha_music = get_power(raw_music, alpha_lower_freq, beta_lower_freq)
    beta_music = get_power(raw_music, beta_lower_freq, beta_upper_freq)

    # test code
    # plt.plot(alpha_silent.times[:], alpha_silent.data[0][0][:])
    # plt.title(alpha_silent.ch_names[0])
    # plt.show()

    # print(beta_music.ch_names)

    alpha_silent_means = []
    beta_silent_means = []

    alpha_silent_stds = []
    beta_silent_stds = []

    # power.data[ch][freq][times]

    for i in range(len(alpha_silent.data)):
        clean_channel(alpha_silent, i)
        alpha_silent_means.append(stat.mean(alpha_silent.data[i][0]))
        alpha_silent_stds.append(stat.stdev(alpha_silent.data[i][0]))

    for i in range(len(beta_silent.data)):
        clean_channel(beta_silent, i)
        beta_silent_means.append(stat.mean(beta_silent.data[i][0]))
        beta_silent_stds.append(stat.stdev(beta_silent.data[i][0]))

    # print(alpha_silent_means)
    # print(alpha_silent_stds)
    # print(beta_silent_means)
    # print(beta_silent_stds)    

    for sensor in alpha_sensors:
        clean_channel(alpha_music, sensor.value)

    for sensor in beta_sensors:
        clean_channel(beta_music, sensor.value)  

    # extract time intervals from the first trial only
    # this was the registration phase of our model and will be used
    # for model metadata

    if len(time_intervals) == 0:

        for sensor in alpha_sensors:
            time_intervals.append(get_time_interval(alpha_music, sensor.value, interval_seconds))

        for sensor in beta_sensors:
            time_intervals.append(get_time_interval(beta_music, sensor.value, interval_seconds))

        points_per_row = floor((time_intervals[0][1] - time_intervals[0][0]) / (interval_seconds / sample_time_seconds))

    print("Time Interval Indices: {}".format(time_intervals))
    print("Length of Interval: {}".format(interval_seconds))
    print("Length of Sub-Interval: {}".format(sample_time_seconds))
    print("Points Per Sample: {}".format(points_per_row))
    
    if csv_needs_header:
        write_header()
        csv_needs_header = False
        
    write_data_points(alpha_music, alpha_silent_means, alpha_silent_stds, beta_music, beta_silent_means, beta_silent_stds, is_user)


    

In [11]:
def write_header():
    with open(outfile, 'w', newline='') as csvfile:
        row = []
        w = csv.writer(csvfile, delimiter=',')

        # create 1 column for each sensor
        for sensor in alpha_sensors:
            row.append("{}_alpha".format(sensor.name))
        
        for sensor in beta_sensors:
            row.append("{}_beta".format(sensor.name))
        
        # write header for each time point
#         for i in range(len(alpha_sensors)):
#             for j in range(points_per_row):
#                 row.append("{}_alpha_{}".format(alpha_sensors[i].name, j))

#         for i in range(len(beta_sensors)):
#             for j in range(points_per_row):
#                 row.append("{}_beta_{}".format(beta_sensors[i].name, j))

        row.append("is_user")

        w.writerow(row)

In [12]:
# calculate and write to CSV file

def write_data_points(alpha_music, alpha_silent_means, alpha_silent_stds, beta_music, beta_silent_means, beta_silent_stds, is_user):

    with open(outfile, 'a', newline='') as csvfile:

        w = csv.writer(csvfile, delimiter=',')
        
        row = []
        
        offset = 0
        
        sum = 0

        # record the z score of each data point for the sample
        while time_intervals[0][0] + offset + points_per_row < time_intervals[0][1]:
            for i in range(len(alpha_sensors)):
                sum = 0
                
                for j in range(points_per_row):
                    sum += alpha_music.data[alpha_sensors[i].value][0][time_intervals[i][0] + offset + j]
                    #row.append(abs(alpha_music.data[alpha_sensors[i].value][0][time_intervals[i][0] + offset + j] 
                    #              - alpha_silent_means[alpha_sensors[i].value]) / alpha_silent_stds[alpha_sensors[i].value])
                row.append((sum / points_per_row - - alpha_silent_means[alpha_sensors[i].value]) / alpha_silent_stds[alpha_sensors[i].value] / points_per_row ** .5)
                    
            for i in range(len(beta_sensors)):
                sum = 0
                
                for j in range(points_per_row):
                    sum += beta_music.data[beta_sensors[i].value][0][time_intervals[i + len(alpha_sensors)][0] + offset + j]
                    # append the deviation from the mean for the data point in question
#                     row.append(abs(beta_music.data[beta_sensors[i].value][0][time_intervals[i + len(alpha_sensors)][0] + offset + j] 
#                                    - beta_silent_means[beta_sensors[i].value]) / beta_silent_stds[beta_sensors[i].value])                
                row.append((sum / points_per_row - beta_silent_means[beta_sensors[i].value]) / (beta_silent_stds[beta_sensors[i].value] / points_per_row ** .5))
    
            row.append(is_user)        

            offset += points_per_row

            w.writerow(row)

            row.clear()


In [13]:
attackers = []

for name in names:
    if name != auth_user:
        attackers.append(name)
        
# print(attackers)

# extract data for authorized user
for i in range(total_trials):
    extract(auth_user, song_id, i, 1)
    
# extract data for attackers 
for i in range(len(attackers)):
    extract(attackers[i], song_id, randint(1, total_trials - 1), 0)
    

FileNotFoundError: [Errno 2] No such file or directory: 'silent\\'