In this notebook we fetch the data from Mongo DB into an EMR cluster and develop several machine learning models for aacurately predicting the event type from the eeg signals. 
We compare the performance of models in terms of both area under the ROC curve (AUC) and time taken to classify the test data.
The raw EEG signals have been pre-processed and stored in Mongo DB along with their corresponding event data. The pre-processing step is part of another notebook.


In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("myApp")
sc = SparkContext(conf=conf)

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
spark = SparkSession \
    .builder \
    .appName("myApp") \
.config("spark.executor.memory", "22g")\
.config("spark.driver.memory", "10g").config("spark.memory.offHeap.enabled",True)\
.config("spark.memory.offHeap.size", "3g")\
.getOrCreate()

In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import asc, desc

In [4]:
rdd = sc.textFile("../all/features/*.csv")

In [5]:
def toFloat(x):
    l = []
    for i in range(len(x)):
        if i < 1:
            l.append(str(x[i]))
        else: l.append(float(x[i]))
    return l

In [6]:
rdd.take(4)

['id,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25',
 '0,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000',
 '1,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000',
 '2,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00003,0.00006,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000']

In [7]:
rdd = rdd.map(lambda x: x.split(",")).map(lambda x: toFloat(x))

In [8]:
schema = StructType([StructField('id', StringType(), True),
                   StructField('0', DoubleType(), True),
                   StructField('1', DoubleType(), True),
                   StructField('2', DoubleType(), True),
                   StructField('3', DoubleType(), True),
                   StructField('4', DoubleType(), True),
                   StructField('5', DoubleType(), True),
                   StructField('6', DoubleType(), True),
                   StructField('7', DoubleType(), True),
                   StructField('8', DoubleType(), True),
                   StructField('9', DoubleType(), True),
                   StructField('10', DoubleType(), True),
                   StructField('11', DoubleType(), True),
                   StructField('12', DoubleType(), True),
                   StructField('13', DoubleType(), True),
                   StructField('14', DoubleType(), True),
                   StructField('15', DoubleType(), True),
                   StructField('16', DoubleType(), True),
                   StructField('17', DoubleType(), True),
                   StructField('18', DoubleType(), True),
                   StructField('19', DoubleType(), True),
                   StructField('20', DoubleType(), True),
                   StructField('21', DoubleType(), True),
                   StructField('22', DoubleType(), True),
                   StructField('23', DoubleType(), True),
                   StructField('24', DoubleType(), True),
                   StructField('25', DoubleType(), True)
                    ])
df = spark.createDataFrame(rdd, schema)

In [9]:
df = df.filter("id != 'id'")

In [10]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- 10: double (nullable = true)
 |-- 11: double (nullable = true)
 |-- 12: double (nullable = true)
 |-- 13: double (nullable = true)
 |-- 14: double (nullable = true)
 |-- 15: double (nullable = true)
 |-- 16: double (nullable = true)
 |-- 17: double (nullable = true)
 |-- 18: double (nullable = true)
 |-- 19: double (nullable = true)
 |-- 20: double (nullable = true)
 |-- 21: double (nullable = true)
 |-- 22: double (nullable = true)
 |-- 23: double (nullable = true)
 |-- 24: double (nullable = true)
 |-- 25: double (nullable = true)



In [11]:
df.show(10)

+---+---+---+---+---+---+---+-------+-------+-------+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| id|  0|  1|  2|  3|  4|  5|      6|      7|      8|      9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25|
+---+---+---+---+---+---+---+-------+-------+-------+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  0|0.0|0.0|0.0|0.0|0.0|0.0|    0.0|    0.0|    0.0|    0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|  1|0.0|0.0|0.0|0.0|0.0|0.0|    0.0|    0.0|    0.0|    0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|  2|0.0|0.0|0.0|0.0|0.0|0.0|    0.0|    0.0| 3.0E-5| 6.0E-5|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|  3|0.0|0.0|0.0|0.0|0.0|0.0| 3.0E-5|    0.0| 3.3E-4| 5.6E-4|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|  4|0.0|0.0|0.0|0.0|0.0|0.0| 2.1E-4| 9.0E-5|0.00215|0.00302|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0

In [12]:
from pyspark.ml.feature import VectorAssembler
target_cols = [(str(x)) for x in range(0,6)]
feature_cols = [(str(x)) for x in range(6,26)]

va = VectorAssembler(outputCol="features", inputCols=feature_cols)

lpoints = va.transform(df).select("features", (df['0']).alias('HandStart'),\
                                  (df['1']).alias('FirstDigitTouch'),\
                                  (df['2']).alias('BothStartLoadPhase'),\
                                  (df['3']).alias('LiftOff'),\
                                  (df['4']).alias('Replace'),\
                                  (df['5']).alias('BothReleased'))

In [13]:
lpoints.show()

+--------------------+---------+---------------+------------------+-------+-------+------------+
|            features|HandStart|FirstDigitTouch|BothStartLoadPhase|LiftOff|Replace|BothReleased|
+--------------------+---------+---------------+------------------+-------+-------+------------+
|          (20,[],[])|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|          (20,[],[])|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|(20,[2,3],[3.0E-5...|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|(20,[0,2,3],[3.0E...|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|(20,[0,1,2,3],[2....|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|(20,[0,1,2,3],[9....|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|(20,[0,1,2,3],[0....|      0.0|            0.0|               0.0|    0.0|    0.0|         0.0|
|(20,[0,1,2,3],[0....|      0.

In [14]:
splits = lpoints.randomSplit([0.8,0.2])
eeg_train = splits[0].cache()
eeg_valid = splits[1].cache()

In [15]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time

In [136]:

#labels are events that we are trying to classify
labels= ['HandStart','FirstDigitTouch', 'BothStartLoadPhase', 'LiftOff', 'Replace', 'BothReleased']
#iterating over the events and fitting a logisitic regression model and train it for each event
start= time.time()
for label in labels:
    lr = LogisticRegression(regParam=0.01, maxIter=100, fitIntercept=True, labelCol=label)
    lrmodel = lr.fit(eeg_train.select('features',label))
    validpredicts = lrmodel.transform(eeg_valid.select('features',label))
    bceval = BinaryClassificationEvaluator(labelCol=label)
    auc = bceval.evaluate(validpredicts)
    duration= time.time()-start
    print ('area Under ROC ' + label+ " : " + str(auc))
print('Time taken for logistic regression '+ str(duration) + 's')

area Under ROC HandStart : 0.5219793799831756
area Under ROC FirstDigitTouch : 0.5631759281062706
area Under ROC BothStartLoadPhase : 0.5610746282201716
area Under ROC LiftOff : 0.5777764418900291
area Under ROC Replace : 0.5993477665100534
area Under ROC BothReleased : 0.5822078067124323
Time taken for logistic regression 2400.504782676697s


In [137]:
from pyspark.ml.classification import RandomForestClassifier


In [138]:
#labels are events that we are trying to classify
labels= ['HandStart','FirstDigitTouch', 'BothStartLoadPhase', 'LiftOff', 'Replace', 'BothReleased']
start= time.time()
#iterating over the events and fitting a Random forest model and train it for each event
for label in labels:
    rf = RandomForestClassifier(maxDepth=10, labelCol=label)
    rfmodel = rf.fit(eeg_train.select('features',label))
    validpredicts = rfmodel.transform(eeg_valid.select('features',label))
    bceval = BinaryClassificationEvaluator(labelCol=label)
    auc = bceval.evaluate(validpredicts)
    duration= time.time()-start
    print ('area Under ROC ' + label+ " : " + str(auc))
print('Time taken for Random Forest '+ str(duration) + 's')

area Under ROC HandStart : 0.8569108168696032
area Under ROC FirstDigitTouch : 0.8236509395086055
area Under ROC BothStartLoadPhase : 0.8305707939287095
area Under ROC LiftOff : 0.8203077534031135
area Under ROC Replace : 0.8064275157300104
area Under ROC BothReleased : 0.8407604777963608
Time taken for Random Forest 3924.458245038986s


In [139]:
from pyspark.ml.classification import LinearSVC

In [140]:
#labels are events that we are trying to classify
labels= ['HandStart','FirstDigitTouch', 'BothStartLoadPhase', 'LiftOff', 'Replace', 'BothReleased']
start= time.time()
#iterating over the events and fitting a logisitic regression model and train it for each event
for label in labels:
    svc = LinearSVC(labelCol=label)
    svcmodel = svc.fit(eeg_train.select('features',label))
    validpredicts = svcmodel.transform(eeg_valid.select('features',label))
    bceval = BinaryClassificationEvaluator(labelCol=label)
    auc = bceval.evaluate(validpredicts)
    duration= time.time()-start
    print ('areaUnderROC ' + label+ " : " + str(auc))
print('Time taken for Linear SVC  '+ str(duration) + 's')

areaUnderROC HandStart : 0.49729977713214596
areaUnderROC FirstDigitTouch : 0.5660934681431121
areaUnderROC BothStartLoadPhase : 0.49135874243233607
areaUnderROC LiftOff : 0.5268992717459255
areaUnderROC Replace : 0.5472355448846024
areaUnderROC BothReleased : 0.5200579970168281
Time taken for Linear SVC  19596.57626605034s


In [16]:
from pyspark.ml.classification import GBTClassifier

In [17]:
labels= ['HandStart','FirstDigitTouch', 'BothStartLoadPhase', 'LiftOff', 'Replace', 'BothReleased']
start= time.time()
for label in labels:
    gbt = GBTClassifier(maxIter=10, maxDepth=10, labelCol=label)
    gbtmodel = gbt.fit(eeg_train.select('features',label))
    validpredicts = gbtmodel.transform(eeg_valid.select('features',label))
    bceval = BinaryClassificationEvaluator(labelCol=label)
    auc = bceval.evaluate(validpredicts)
    duration= time.time()-start
    print ('areaUnderROC ' + label+ " : " + str(auc))
print('Time taken for Gradient Boosted Tree  '+ str(duration) + 's')

areaUnderROC HandStart : 0.8451880202290933
areaUnderROC FirstDigitTouch : 0.8136384438900479
areaUnderROC BothStartLoadPhase : 0.8025608893173575
areaUnderROC LiftOff : 0.8046389658993593
areaUnderROC Replace : 0.7749284826024881
areaUnderROC BothReleased : 0.8241372970951558
Time taken for Gradient Boosted Tree  18350.97699737549s
