In [1]:
from pyspark import SparkContext, SparkConf
import os 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import numpy as np
import plotly.figure_factory as ff

In [73]:
from pyspark.sql.functions import mean, stddev 

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.hadoop:hadoop-aws:2.7.4" pyspark-shell'

In [3]:
sc = SparkContext.getOrCreate()

In [4]:
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'AKIA5UF2BJCMTWQO3OVY')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', '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')

In [5]:
file = "s3a://group-4-distributed-data-systems/merged.txt"
rdd = sc.textFile(file)

In [6]:
rdd.cache()

s3a://group-4-distributed-data-systems/merged.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
def toDoubleSafe(v):
    try:
        return float(v)
    except ValueError:
        return str(v) #if it is not a float type return as a string.

In [8]:
rdd=rdd.map(lambda x : x[:-1].split(','))

In [9]:
rdd = rdd.map(lambda row:[toDoubleSafe(x) for x in row])

In [11]:
rdd.first()

[1613.0,
 'phone',
 'gyro',
 'A',
 178468071944614.0,
 -0.020240024,
 -0.004261058,
 -0.023435818]

In [12]:
from pyspark.sql.types import *
schema = StructType([ StructField("person_id", DoubleType(), True),
                      StructField("device", StringType(), True),
                      StructField("sensor", StringType(), True),
                      StructField("activity", StringType(), True),
                      StructField("time_stamp", DoubleType(), True),
                      StructField("x", DoubleType(), True),
                      StructField("y", DoubleType(), True),
                      StructField("z", DoubleType(), True)
                    ])

In [13]:
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
merged_df = ss.createDataFrame(rdd, schema)

In [14]:
merged_df.cache()

DataFrame[person_id: double, device: string, sensor: string, activity: string, time_stamp: double, x: double, y: double, z: double]

In [99]:
phone_gyro_df = merged_df.filter("device=='phone' and sensor=='gyro'")
phone_gyro_avg_df = phone_gyro_df.groupBy(["person_id","activity"]).agg({"x": "avg","y": "avg","z": "avg"})
phone_gyro_stdev_df = phone_gyro_df.groupBy(["person_id","activity"]).agg({"x": "stddev","y": "stddev","z": "stddev"})
phone_gyro_max_df = phone_gyro_df.groupBy(["person_id","activity"]).agg({"x": "max","y": "max","z": "max"})
phone_gyro_min_df = phone_gyro_df.groupBy(["person_id","activity"]).agg({"x": "min","y": "min","z": "min"})
phone_gyro_minmax_df = phone_gyro_max_df.join(phone_gyro_min_df, on=['person_id', 'activity'])

phone_gyro_range_df=phone_gyro_minmax_df.withColumn('range_x',phone_gyro_minmax_df['max(x)']-phone_gyro_minmax_df['min(x)'])\
                    .withColumn('range_y',phone_gyro_minmax_df['max(y)']-phone_gyro_minmax_df['min(y)'])\
                    .withColumn('range_z',phone_gyro_minmax_df['max(z)']-phone_gyro_minmax_df['min(z)'])\
                    .drop('max(x)','max(y)','max(z)','min(x)','min(y)','min(z)') 

phone_gyro_features = phone_gyro_avg_df.join(phone_gyro_stdev_df, on=['person_id', 'activity'])\
                                        .join(phone_gyro_range_df, on=['person_id', 'activity'])

phone_gyro_features_renamed = phone_gyro_features.withColumnRenamed('avg(x)', 'pgyro_x_avg')\
                                     .withColumnRenamed('avg(y)', 'pgyro_y_avg')\
                                     .withColumnRenamed('avg(z)', 'pgyro_z_avg')\
                                     .withColumnRenamed('stddev(x)', 'pgyro_x_stdev')\
                                     .withColumnRenamed('stddev(y)', 'pgyro_y_stdev')\
                                     .withColumnRenamed('stddev(z)', 'pgyro_z_stdev')\
                                     .withColumnRenamed('range_x', 'pgyro_x_range')\
                                     .withColumnRenamed('range_y', 'pgyro_y_range')\
                                     .withColumnRenamed('range_z', 'pgyro_z_range') 
phone_gyro_features_renamed.show(5)

+---------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-----------------+
|person_id|activity|         pgyro_x_avg|         pgyro_y_avg|         pgyro_z_avg|       pgyro_x_stdev|       pgyro_y_stdev|       pgyro_z_stdev|      pgyro_x_range|       pgyro_y_range|    pgyro_z_range|
+---------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-----------------+
|   1605.0|       P|-0.02017300288020...|-0.11814119451513522|-0.00850835712610...|  0.5294417079280406|  1.0516658194483952| 0.37540071690689336|  8.839031599999998|  15.428757300000001|        9.6664762|
|   1610.0|       A|  0.0342472453562128|-0.03603498108390...| 0.07850520876840324|  1.2036294662463933|   1.416102004226747|  0.9598808278599757|          8.2720336|          

In [100]:
phone_accel_df = merged_df.filter("device=='phone' and sensor=='accel'")
phone_accel_avg_df = phone_accel_df.groupBy(["person_id","activity"]).agg({"x": "avg","y": "avg","z": "avg"})
phone_accel_stdev_df = phone_accel_df.groupBy(["person_id","activity"]).agg({"x": "stddev","y": "stddev","z": "stddev"})
phone_accel_max_df = phone_accel_df.groupBy(["person_id","activity"]).agg({"x": "max","y": "max","z": "max"})
phone_accel_min_df = phone_accel_df.groupBy(["person_id","activity"]).agg({"x": "min","y": "min","z": "min"})
phone_accel_minmax_df = phone_accel_max_df.join(phone_accel_min_df, on=['person_id', 'activity'])

phone_accel_range_df=phone_accel_minmax_df.withColumn('range_x',phone_accel_minmax_df['max(x)']-phone_accel_minmax_df['min(x)'])\
                    .withColumn('range_y',phone_accel_minmax_df['max(y)']-phone_accel_minmax_df['min(y)'])\
                    .withColumn('range_z',phone_accel_minmax_df['max(z)']-phone_accel_minmax_df['min(z)'])\
                    .drop('max(x)','max(y)','max(z)','min(x)','min(y)','min(z)') 

phone_accel_features = phone_accel_avg_df.join(phone_accel_stdev_df, on=['person_id', 'activity'])\
                                        .join(phone_accel_range_df, on=['person_id', 'activity'])

phone_accel_features_renamed = phone_accel_features.withColumnRenamed('avg(x)', 'paccel_x_avg')\
                                     .withColumnRenamed('avg(y)', 'paccel_y_avg')\
                                     .withColumnRenamed('avg(z)', 'paccel_z_avg')\
                                     .withColumnRenamed('stddev(x)', 'paccel_x_stdev')\
                                     .withColumnRenamed('stddev(y)', 'paccel_y_stdev')\
                                     .withColumnRenamed('stddev(z)', 'paccel_z_stdev')\
                                     .withColumnRenamed('range_x', 'paccel_x_range')\
                                     .withColumnRenamed('range_y', 'paccel_y_range')\
                                     .withColumnRenamed('range_z', 'paccel_z_range') 
phone_accel_features_renamed.show(5)

+---------+--------+-------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+
|person_id|activity|       paccel_x_avg|       paccel_y_avg|       paccel_z_avg|      paccel_x_stdev|     paccel_y_stdev|      paccel_z_stdev|     paccel_x_range|     paccel_y_range|     paccel_z_range|
+---------+--------+-------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+
|   1605.0|       P|0.29971730265547114|  9.663056926791107|0.29237940635036935|  1.9132575692937814| 1.2609094944102477|  1.7352211860881996| 35.224692000000005|          18.200122| 35.862148000000005|
|   1610.0|       A| 3.9965780043537724|  8.843990061600895|-1.1485782778474645|   4.336511672419491| 3.6220266729242785|   3.399216390815849|          30.938767|         19.1988066|      

In [101]:
watch_gyro_df = merged_df.filter("device=='watch' and sensor=='gyro'")
watch_gyro_avg_df = watch_gyro_df.groupBy(["person_id","activity"]).agg({"x": "avg","y": "avg","z": "avg"})
watch_gyro_stdev_df = watch_gyro_df.groupBy(["person_id","activity"]).agg({"x": "stddev","y": "stddev","z": "stddev"})
watch_gyro_max_df = watch_gyro_df.groupBy(["person_id","activity"]).agg({"x": "max","y": "max","z": "max"})
watch_gyro_min_df = watch_gyro_df.groupBy(["person_id","activity"]).agg({"x": "min","y": "min","z": "min"})
watch_gyro_minmax_df = watch_gyro_max_df.join(watch_gyro_min_df, on=['person_id', 'activity'])

watch_gyro_range_df=watch_gyro_minmax_df.withColumn('range_x',watch_gyro_minmax_df['max(x)']-watch_gyro_minmax_df['min(x)'])\
                    .withColumn('range_y',watch_gyro_minmax_df['max(y)']-watch_gyro_minmax_df['min(y)'])\
                    .withColumn('range_z',watch_gyro_minmax_df['max(z)']-watch_gyro_minmax_df['min(z)'])\
                    .drop('max(x)','max(y)','max(z)','min(x)','min(y)','min(z)') 

watch_gyro_features = watch_gyro_avg_df.join(watch_gyro_stdev_df, on=['person_id', 'activity'])\
                                        .join(watch_gyro_range_df, on=['person_id', 'activity'])

watch_gyro_features_renamed = watch_gyro_features.withColumnRenamed('avg(x)', 'wgyro_x_avg')\
                                     .withColumnRenamed('avg(y)', 'wgyro_y_avg')\
                                     .withColumnRenamed('avg(z)', 'wgyro_z_avg')\
                                     .withColumnRenamed('stddev(x)', 'wgyro_x_stdev')\
                                     .withColumnRenamed('stddev(y)', 'wgyro_y_stdev')\
                                     .withColumnRenamed('stddev(z)', 'wgyro_z_stdev')\
                                     .withColumnRenamed('range_x', 'wgyro_x_range')\
                                     .withColumnRenamed('range_y', 'wgyro_y_range')\
                                     .withColumnRenamed('range_z', 'wgyro_z_range') 
watch_gyro_features_renamed.show(5)

+---------+--------+--------------------+--------------------+--------------------+------------------+-------------------+------------------+-------------+-----------------+------------------+
|person_id|activity|         wgyro_x_avg|         wgyro_y_avg|         wgyro_z_avg|     wgyro_x_stdev|      wgyro_y_stdev|     wgyro_z_stdev|wgyro_x_range|    wgyro_y_range|     wgyro_z_range|
+---------+--------+--------------------+--------------------+--------------------+------------------+-------------------+------------------+-------------+-----------------+------------------+
|   1605.0|       P|  0.0356906530786702| -0.2210499653278735|   0.324840953022115| 2.028071754734538| 3.6041482973311023| 2.329876180625323|    23.019284|        18.817906|        13.9443107|
|   1610.0|       A|0.003728009696669442|-0.06234500139497911|-0.01526350389822...|1.6830062472496243|    0.8734566787434|  2.62031842039935|    15.745168|9.788881799999999|13.499043400000001|
|   1627.0|       P| 0.061892510663

In [102]:
watch_accel_df = merged_df.filter("device=='watch' and sensor=='accel'")
watch_accel_avg_df = watch_accel_df.groupBy(["person_id","activity"]).agg({"x": "avg","y": "avg","z": "avg"})
watch_accel_stdev_df = watch_accel_df.groupBy(["person_id","activity"]).agg({"x": "stddev","y": "stddev","z": "stddev"})
watch_accel_max_df = watch_accel_df.groupBy(["person_id","activity"]).agg({"x": "max","y": "max","z": "max"})
watch_accel_min_df = watch_accel_df.groupBy(["person_id","activity"]).agg({"x": "min","y": "min","z": "min"})
watch_accel_minmax_df = watch_accel_max_df.join(watch_accel_min_df, on=['person_id', 'activity'])

watch_accel_range_df=watch_accel_minmax_df.withColumn('range_x',watch_accel_minmax_df['max(x)']-watch_accel_minmax_df['min(x)'])\
                    .withColumn('range_y',watch_accel_minmax_df['max(y)']-watch_accel_minmax_df['min(y)'])\
                    .withColumn('range_z',watch_accel_minmax_df['max(z)']-watch_accel_minmax_df['min(z)'])\
                    .drop('max(x)','max(y)','max(z)','min(x)','min(y)','min(z)') 

watch_accel_features = watch_accel_avg_df.join(watch_accel_stdev_df, on=['person_id', 'activity'])\
                                        .join(watch_accel_range_df, on=['person_id', 'activity'])

watch_accel_features_renamed = watch_accel_features.withColumnRenamed('avg(x)', 'waccel_x_avg')\
                                     .withColumnRenamed('avg(y)', 'waccel_y_avg')\
                                     .withColumnRenamed('avg(z)', 'waccel_z_avg')\
                                     .withColumnRenamed('stddev(x)', 'waccel_x_stdev')\
                                     .withColumnRenamed('stddev(y)', 'waccel_y_stdev')\
                                     .withColumnRenamed('stddev(z)', 'waccel_z_stdev')\
                                     .withColumnRenamed('range_x', 'waccel_x_range')\
                                     .withColumnRenamed('range_y', 'waccel_y_range')\
                                     .withColumnRenamed('range_z', 'waccel_z_range') 
watch_accel_features_renamed.show(5)

+---------+--------+------------------+-------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|person_id|activity|      waccel_x_avg|       waccel_y_avg|        waccel_z_avg|    waccel_x_stdev|    waccel_y_stdev|    waccel_z_stdev|    waccel_x_range|    waccel_y_range|    waccel_z_range|
+---------+--------+------------------+-------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|   1605.0|       P| 5.524821611024183|-3.9579231058513504|  3.6875186409674425| 7.539332142836476| 7.936871143736364| 7.818668838089347|39.226001999999994|39.226001999999994|         39.226001|
|   1610.0|       A|10.577153207567255|-2.3672677588918916|  1.4647325036963985|3.6628358025357985|  2.08720193374469|1.8282780969950123|21.659744699999997|         18.746599|        20.4728187|
|   1627.0|       P| 4.38

In [103]:
all_phone = phone_gyro_features_renamed.join(phone_accel_features_renamed, on=['person_id', 'activity'])

In [104]:
all_watch = watch_gyro_features_renamed.join(watch_accel_features_renamed, on=['person_id', 'activity'])

In [105]:
all_features = all_watch.join(all_phone, on=['person_id', 'activity'])

In [106]:
all_features.show(5)

+---------+--------+--------------------+--------------------+--------------------+------------------+-------------------+------------------+-------------+-----------------+------------------+------------------+-------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-----------------+-------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+
|person_id|activity|         wgyro_x_avg|         wgyro_y_avg|         wgyro_z_avg|     wgyro_x_stdev|      wgyro_y_stdev|     wgyro_z_stdev|wgyro_x_range|    wgyro_y_range|     wgyro_z_range|      waccel_x_avg|       waccel_y_avg|        waccel_z_avg|    waccel_x_s