In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
# sc = pyspark.SparkContext(master = "spark://spark-master:7077",appName="Drowsiness")
# Create Spark session
master = "spark://spark-master:7077"
appName = "Drowsiness"
spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

In [3]:
#load df file
df = spark.read.format('csv').option('header',True).option('multiLine', True).load('file:///jupyter-data/data/total-re.csv')
df.show()
print(f'Record count is: {df.count()}')

+-------------------+-------------------+-------------------+------------------+---+----------+
|                EAR|                MAR|        Circularity|               MOE|  Y|respondent|
+-------------------+-------------------+-------------------+------------------+---+----------+
| 0.2579303520876644| 0.3495349908593761| 0.4176893651189145|1.3551526139916152|0.0|         4|
| 0.2746267554059725| 0.3735141394929232|  0.366795128235752|1.3600792061966736|0.0|         4|
| 0.2508982799024048| 0.3434171155554891| 0.4021396462592153|1.3687503783966657|0.0|         4|
|0.30037087831054704|0.35083811828693096| 0.4150342711218802|1.1680164210999404|0.0|         4|
| 0.2935651535998499| 0.3206486623934628| 0.4489237073878338|1.0922572330588307|0.0|         4|
| 0.2674082098871901| 0.3795517964687489| 0.4210985139768859|1.4193722647067124|0.0|         4|
| 0.2754654478027501| 0.3677560596277339| 0.4476255954755593| 1.335035165248999|0.0|         4|
| 0.2993355473569827| 0.3379413643031209

In [4]:
df.dtypes

[('EAR', 'string'),
 ('MAR', 'string'),
 ('Circularity', 'string'),
 ('MOE', 'string'),
 ('Y', 'string'),
 ('respondent', 'string')]

In [5]:
#Change data type
from functools import reduce
changedTypedf = (reduce(
    lambda memo_df, col_name: 
        memo_df.withColumn(col_name, memo_df[col_name].cast('double')),
    ["EAR", "MAR","Circularity","MOE","Y","respondent"],df))

In [6]:
changedTypedf.dtypes
changedTypedf.show()

+-------------------+-------------------+-------------------+------------------+---+----------+
|                EAR|                MAR|        Circularity|               MOE|  Y|respondent|
+-------------------+-------------------+-------------------+------------------+---+----------+
| 0.2579303520876644| 0.3495349908593761| 0.4176893651189145|1.3551526139916152|0.0|       4.0|
| 0.2746267554059725| 0.3735141394929232|  0.366795128235752|1.3600792061966736|0.0|       4.0|
| 0.2508982799024048| 0.3434171155554891| 0.4021396462592153|1.3687503783966657|0.0|       4.0|
|0.30037087831054704|0.35083811828693096| 0.4150342711218802|1.1680164210999404|0.0|       4.0|
| 0.2935651535998499| 0.3206486623934628| 0.4489237073878338|1.0922572330588307|0.0|       4.0|
| 0.2674082098871901| 0.3795517964687489| 0.4210985139768859|1.4193722647067124|0.0|       4.0|
| 0.2754654478027501| 0.3677560596277339| 0.4476255954755593| 1.335035165248999|0.0|       4.0|
| 0.2993355473569827| 0.3379413643031209

In [7]:
import pandas as pd
import numpy as np

pddf = changedTypedf.toPandas()
spark.stop()

In [8]:
pddf

Unnamed: 0,EAR,MAR,Circularity,MOE,Y,respondent
0,0.257930,0.349535,0.417689,1.355153,0.0,4.0
1,0.274627,0.373514,0.366795,1.360079,0.0,4.0
2,0.250898,0.343417,0.402140,1.368750,0.0,4.0
3,0.300371,0.350838,0.415034,1.168016,0.0,4.0
4,0.293565,0.320649,0.448924,1.092257,0.0,4.0
...,...,...,...,...,...,...
17995,0.348480,0.475887,0.490477,1.365606,5.0,60.0
17996,0.262352,0.470458,0.395392,1.793234,5.0,60.0
17997,0.319431,0.578497,0.490811,1.811024,5.0,60.0
17998,0.256146,0.540343,0.378786,2.109511,5.0,60.0


In [10]:
#Separating the rows which are "Alert" only
df_alert = pddf[pddf["Y"]==0.0]

In [11]:
df_alert

Unnamed: 0,EAR,MAR,Circularity,MOE,Y,respondent
0,0.257930,0.349535,0.417689,1.355153,0.0,4.0
1,0.274627,0.373514,0.366795,1.360079,0.0,4.0
2,0.250898,0.343417,0.402140,1.368750,0.0,4.0
3,0.300371,0.350838,0.415034,1.168016,0.0,4.0
4,0.293565,0.320649,0.448924,1.092257,0.0,4.0
...,...,...,...,...,...,...
17515,0.291844,0.428875,0.461614,1.469536,0.0,60.0
17516,0.273138,0.407160,0.436752,1.490676,0.0,60.0
17517,0.289664,0.410811,0.453121,1.418236,0.0,60.0
17518,0.291827,0.410597,0.451550,1.406987,0.0,60.0


In [12]:
#Creating separate dataframes for each respondent's first, second and third "Alert" frame
df_alert_1 = df_alert.iloc[0::240, :]
df_alert_2 = df_alert.iloc[1::240, :]
df_alert_3 = df_alert.iloc[2::240, :]
print(df_alert_1)
print(df_alert_2)
print(df_alert_3)

            EAR       MAR  Circularity       MOE    Y  respondent
0      0.257930  0.349535     0.417689  1.355153  0.0         4.0
720    0.337310  0.351927     0.455003  1.043333  0.0         6.0
1440   0.338392  0.388842     0.509606  1.149090  0.0         9.0
2160   0.318259  0.358237     0.487157  1.125618  0.0        11.0
2880   0.316337  0.333333     0.428656  1.053729  0.0        12.0
3600   0.269122  0.326717     0.422226  1.214011  0.0        15.0
4320   0.366856  0.307433     0.509569  0.838023  0.0        16.0
5040   0.335025  0.333744     0.474382  0.996175  0.0        17.0
5760   0.433148  0.366611     0.556329  0.846387  0.0        23.0
6480   0.324675  0.326979     0.459347  1.007096  0.0        24.0
7200   0.323640  0.380465     0.456716  1.175582  0.0        27.0
7920   0.159193  0.204802     0.390018  1.286500  0.0        29.0
8640   0.275790  0.378217     0.392747  1.371398  0.0        34.0
9360   0.273502  0.152652     0.459583  0.558139  0.0        35.0
10080  0.2

In [13]:
#Merging them into one dataframe
alert_first3 = [df_alert_1,df_alert_2,df_alert_3]
df_alert_first3 = pd.concat(alert_first3)
df_alert_first3 = df_alert_first3.sort_index()
print(df_alert_first3)

            EAR       MAR  Circularity       MOE    Y  respondent
0      0.257930  0.349535     0.417689  1.355153  0.0         4.0
1      0.274627  0.373514     0.366795  1.360079  0.0         4.0
2      0.250898  0.343417     0.402140  1.368750  0.0         4.0
720    0.337310  0.351927     0.455003  1.043333  0.0         6.0
721    0.341356  0.374622     0.466900  1.097454  0.0         6.0
...         ...       ...          ...       ...  ...         ...
16561  0.294118  0.348454     0.467201  1.184745  0.0        55.0
16562  0.333180  0.249727     0.467301  0.749524  0.0        55.0
17280  0.320167  0.392274     0.453660  1.225218  0.0        60.0
17281  0.362153  0.425641     0.519951  1.175306  0.0        60.0
17282  0.349003  0.393761     0.496627  1.128244  0.0        60.0

[75 rows x 6 columns]


In [14]:
#Based on the first 3 "Alert" frames, calculating respondent-wise mean and std for each feature
df_means = df_alert_first3.groupby("respondent")[["EAR","MAR","Circularity","MOE"]].mean()
df_std = df_alert_first3.groupby("respondent")[["EAR","MAR","Circularity","MOE"]].std()

In [15]:
print(df_means,df_std)

                 EAR       MAR  Circularity       MOE
respondent                                           
4.0         0.261152  0.355489     0.395541  1.361327
6.0         0.345302  0.355346     0.469180  1.030365
9.0         0.356151  0.381092     0.536731  1.072131
11.0        0.305206  0.359569     0.471850  1.179719
12.0        0.345067  0.334884     0.468970  0.973817
15.0        0.294268  0.343453     0.441576  1.170173
16.0        0.376711  0.277123     0.519738  0.736635
17.0        0.353838  0.331065     0.510286  0.938805
23.0        0.426725  0.393335     0.561573  0.923091
24.0        0.274061  0.331920     0.417000  1.364586
27.0        0.300963  0.382958     0.455608  1.282504
29.0        0.192115  0.207137     0.383478  1.094498
34.0        0.273529  0.381552     0.414526  1.395214
35.0        0.277838  0.149736     0.450161  0.539200
36.0        0.280088  0.224770     0.426765  0.803408
37.0        0.252752  0.296860     0.388790  1.177233
39.0        0.287642  0.2383

In [16]:
#Functions for mean and std for each feature
def mean_EAR(respondent):
    return df_means.loc[respondent]["EAR"]

def mean_MAR(respondent):
    return df_means.loc[respondent]["MAR"]

def mean_Circularity(respondent):
    return df_means.loc[respondent]["Circularity"]

def mean_MOE(respondent):
    return df_means.loc[respondent]["MOE"]


def std_EAR(respondent):
    return df_std.loc[respondent]["EAR"]

def std_MAR(respondent):
    return df_std.loc[respondent]["MAR"]

def std_Circularity(respondent):
    return df_std.loc[respondent]["Circularity"]

def std_MOE(respondent):
    return df_std.loc[respondent]["MOE"]

In [17]:
#Adding respondent-wise mean and std for each feature to each row in the original dataframe
pddf["EAR_mean"] = pddf["respondent"].apply(mean_EAR)
pddf["MAR_mean"] = pddf["respondent"].apply(mean_MAR)
pddf["Circularity_mean"] = pddf["respondent"].apply(mean_Circularity)
pddf["MOE_mean"] = pddf["respondent"].apply(mean_MOE)

pddf["EAR_std"] = pddf["respondent"].apply(std_EAR)
pddf["MAR_std"] = pddf["respondent"].apply(std_MAR)
pddf["Circularity_std"] = pddf["respondent"].apply(std_Circularity)
pddf["MOE_std"] = pddf["respondent"].apply(std_MOE)


In [18]:
#Calculating normalised features for each row in the original dataframe
pddf["EAR_N"] = (pddf["EAR"]-pddf["EAR_mean"])/ pddf["EAR_std"]
pddf["MAR_N"] = (pddf["MAR"]-pddf["MAR_mean"])/ pddf["MAR_std"]
pddf["Circularity_N"] = (pddf["Circularity"]-pddf["Circularity_mean"])/ pddf["Circularity_std"]
pddf["MOE_N"] = (pddf["MOE"]-pddf["MOE_mean"])/ pddf["MOE_std"]

In [19]:
#Saving the file to a CSV with all the information
pddf.to_csv('./data/totalwithallinfo-re.csv',index=False)

#Saving the file to a CSV with main the information
df_main = pddf.drop(["EAR_mean","MAR_mean","Circularity_mean","MOE_mean","EAR_std","MAR_std","Circularity_std","MOE_std"],axis=1)
df_main.to_csv('./data/totalwithmaininfo-re.csv',index=False)