# Activity Classification from Smartphone and Smartwatch Data

## Dataset and Goal Description
In this project, we use [WISDM Smartphone and Smartwatch Activity and Biometrics Dataset Data Set](https://archive.ics.uci.edu/ml/datasets/WISDM+Smartphone+and+Smartwatch+Activity+and+Biometrics+Dataset+) to build an acitivity classifier based on sensor data. There are 51 test subjects whose subject-ids are from 1600-1650. These test subjects wears smart phone and smart watch to perform 18 different kinds of activities (coded form A to S). 

- A: Walking 
- B: Jogging
- C: Stairs
- D: Sitting
- E: Standing
- F: Typing
- G: Brushing Teeth
- H: Eating Soup
- I: Eating Chips
- J: Eating Pasta
- K: Drinking from Cup
- L: Eating Sandwich
- M: Kicking Soccer Ball
- O: Palying Catch w/Tennis Ball
- P: Dribbling Basketball
- Q: Writing
- R: Clapping
- S: Folding Clothes

Each device has two kinds of sensors: accelerometer and gyroscope. During each activity, theses sensors collect the following data:
- x: represents the sensor reading (accelerometer or gyroscope) for the x dimension 
- y: represents the sensor reading (accelerometer or gyroscope) for the y dimension 
- z: represents the sensor reading (accelerometer or gyroscope) for the z dimension

In [4]:
%load_ext autoreload
%autoreload 2

In [8]:
wisdm_files = "WISDM/*/*"

In [85]:
from pyspark import SparkContext

from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import asc, desc
from pyspark.sql.functions import avg, round

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

from utils import *

## Create Spark Context

In [6]:
ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

## Load the Data

In [14]:
files_rdd = sc.wholeTextFiles(wisdm_files)

In [15]:
schema = StructType([
    StructField("subject_id", IntegerType(), False),
    StructField("sensor", StringType(), False),
    StructField("device", StringType(), False),
    StructField("activity_code", StringType(), False),
    StructField("timestamp", LongType(), False),
    StructField("x", FloatType(), False),
    StructField("y", FloatType(), False),
    StructField("z", FloatType(), False)
])

In [16]:
df_activity = create_activity_df(ss, files_rdd, schema)

In [19]:
df_activity.show(5)

+----------+------+------+-------------+---------------+------------+------------+------------+
|subject_id|sensor|device|activity_code|      timestamp|           x|           y|           z|
+----------+------+------+-------------+---------------+------------+------------+------------+
|      1613|  gyro| phone|            A|178468071944614|-0.020240024|-0.004261058|-0.023435818|
|      1613|  gyro| phone|            A|178468104194617|  -2.5750105|  0.18109496|   1.3864417|
|      1613|  gyro| phone|            A|178468142811857|  -1.5739282|   0.6668556|    1.320928|
|      1613|  gyro| phone|            A|178468183987271|  -1.5041534|   1.7973675|    0.824781|
|      1613|  gyro| phone|            A|178468225406856| -0.50786483|   1.6002935|  0.45833004|
+----------+------+------+-------------+---------------+------------+------------+------------+
only showing top 5 rows



## Check data balance

We want to check the number of items for each activity, sensor, and device. First, group data by "activity_code", "device", and "sensor". Count the number of data items in each group. As the result shows, the data is well balanced.

In [24]:
df_check_balance = df_activity.groupBy("activity_code", "device", "sensor") \
                              .count().alias("count") \
                              .orderBy("activity_code", "device", "sensor")

In [25]:
df_check_balance.show()

+-------------+------+------+------+
|activity_code|device|sensor| count|
+-------------+------+------+------+
|            A| phone| accel|279817|
|            A| phone|  gyro|203919|
|            A| watch| accel|210495|
|            A| watch|  gyro|192531|
|            B| phone| accel|268409|
|            B| phone|  gyro|200252|
|            B| watch| accel|205787|
|            B| watch|  gyro|187833|
|            C| phone| accel|255645|
|            C| phone|  gyro|197857|
|            C| watch| accel|207312|
|            C| watch|  gyro|180416|
|            D| phone| accel|264592|
|            D| phone|  gyro|202370|
|            D| watch| accel|213018|
|            D| watch|  gyro|195050|
|            E| phone| accel|269604|
|            E| phone|  gyro|202351|
|            E| watch| accel|216529|
|            E| watch|  gyro|194103|
+-------------+------+------+------+
only showing top 20 rows



## Feature Engineering

1. Separate data by sensor type: accelerometer and gyroscope.

In [27]:
df_accel = df_activity.filter("sensor == 'accel'") \
                      .withColumnRenamed("x", "accel_x") \
                      .withColumnRenamed("y", "accel_y") \
                      .withColumnRenamed("z", "accel_z")

In [29]:
df_gyro  = df_activity.filter("sensor == 'gyro'") \
                      .withColumnRenamed("x", "gyro_x") \
                      .withColumnRenamed("y", "gyro_y") \
                      .withColumnRenamed("z", "gyro_z")

2. Join both dataframes by the same activity code, device and timestamp

In [32]:
# join condition
join_cond = (df_accel.activity_code == df_gyro.activity_code) & \
            (df_accel.device == df_gyro.device) & \
            (df_accel.timestamp == df_gyro.timestamp)

In [87]:
df_both = df_accel.join(df_gyro, join_cond, 'inner') \
                  .select(
    
                      df_accel.activity_code,
                      df_accel.subject_id,
                      df_accel.device,
                      df_accel.timestamp,
                      df_accel.accel_x,
                      df_accel.accel_y,
                      df_accel.accel_z,
                      df_gyro.gyro_x,
                      df_gyro.gyro_y,
                      df_gyro.gyro_z,
                    
                  ).distinct().cache()

In [88]:
df_both.select([c for c in df_both.columns[:4]]+[round(c, 3).alias(c) for c in df_both.columns[4:]]).show(5)

+-------------+----------+------+-------------+-------+-------+-------+------+------+------+
|activity_code|subject_id|device|    timestamp|accel_x|accel_y|accel_z|gyro_x|gyro_y|gyro_z|
+-------------+----------+------+-------------+-------+-------+-------+------+------+------+
|            A|      1623| phone|2520873744873| -2.717| -2.945|  0.465| 0.656|-0.797|-0.462|
|            A|      1623| phone|2525808428600|  4.735|-13.048| -1.625|-1.616| -0.51|-0.425|
|            A|      1623| phone|2529887102917| -0.137| -9.755| -1.037| 1.076| 0.412| 0.555|
|            A|      1623| phone|2541569242466|  2.737|-11.875| -1.139| 1.696| 1.246| 0.165|
|            A|      1623| phone|2546604630192| -1.418| -3.727|  0.475|-0.962|-1.724|-0.561|
+-------------+----------+------+-------------+-------+-------+-------+------+------+------+
only showing top 5 rows



In [89]:
df_both.count()

5901089

4. For the same subject_id, activity_code and device, add lead_x_sensor column, which is x rows after the current row. This feature engineering is because we want to use the time series order to make classification.

In [90]:
w = Window.partitionBy("subject_id", "activity_code", "device") \
          .orderBy("timestamp")

In [91]:
window_size = 5

In [92]:
col_array = ['accel_x', 'accel_y', 'accel_z', 'gyro_x', 'gyro_y', 'gyro_z']
scaled_array = [col_array]

In [93]:
for i in range(1, window_size+1):
    lead_array = []
    for sensor in col_array:
        df_both = df_both.withColumn(f"lead_{i}_{sensor}", lead(f"{sensor}", i).over(w))
        lead_array.append(f"lead_{i}_{sensor}")
    scaled_array.append(lead_array)


In [94]:
df_both = df_both.orderBy("subject_id", "activity_code", "device", "timestamp").cache()

In [105]:
base_cols = ["subject_id", "activity_code", "device", "timestamp"]

- Show the first 5 rows of data

In [109]:
truncate_show(df_both, truncate_cols=scaled_array[0], selected_cols=base_cols+scaled_array[0])

+----------+-------------+------+---------------+-------+-------+-------+------+------+------+
|subject_id|activity_code|device|      timestamp|accel_x|accel_y|accel_z|gyro_x|gyro_y|gyro_z|
+----------+-------------+------+---------------+-------+-------+-------+------+------+------+
|      1600|            A| phone|252207918580802| -4.333| 13.361| -0.719|-0.853| 0.297|  0.89|
|      1600|            A| phone|252207968934806| -0.319| 13.318| -0.232|-0.875| 0.015| 0.162|
|      1600|            A| phone|252208019288809|  1.566|  9.515| -0.018| -0.72| 0.388|-0.284|
|      1600|            A| phone|252208069642813| -0.324|  5.263|  0.322|-0.572| 1.227|-0.242|
|      1600|            A| phone|252208119996817| -1.812|  3.711|  1.374| -0.38| 1.203|-0.213|
+----------+-------------+------+---------------+-------+-------+-------+------+------+------+
only showing top 5 rows



In [110]:
truncate_show(df_both, truncate_cols=scaled_array[1], selected_cols=scaled_array[1])

+--------------+--------------+--------------+-------------+-------------+-------------+
|lead_1_accel_x|lead_1_accel_y|lead_1_accel_z|lead_1_gyro_x|lead_1_gyro_y|lead_1_gyro_z|
+--------------+--------------+--------------+-------------+-------------+-------------+
|        -0.319|        13.318|        -0.232|       -0.875|        0.015|        0.162|
|         1.566|         9.515|        -0.018|        -0.72|        0.388|       -0.284|
|        -0.324|         5.263|         0.322|       -0.572|        1.227|       -0.242|
|        -1.812|         3.711|         1.374|        -0.38|        1.203|       -0.213|
|        -1.134|         4.538|         2.298|       -0.226|        0.558|        0.124|
+--------------+--------------+--------------+-------------+-------------+-------------+
only showing top 5 rows



In [114]:
truncate_show(df_both, truncate_cols=scaled_array[5], selected_cols=scaled_array[5])

+--------------+--------------+--------------+-------------+-------------+-------------+
|lead_5_accel_x|lead_5_accel_y|lead_5_accel_z|lead_5_gyro_x|lead_5_gyro_y|lead_5_gyro_z|
+--------------+--------------+--------------+-------------+-------------+-------------+
|        -1.134|         4.538|         2.298|       -0.226|        0.558|        0.124|
|         0.093|         6.706|           1.9|        0.128|         0.38|        0.553|
|        -1.036|        15.612|         2.642|       -0.514|         0.38|        0.332|
|         0.752|         9.683|         3.001|        0.446|       -0.511|       -0.113|
|        -0.497|        18.677|         0.937|       -1.357|       -0.435|       -0.455|
+--------------+--------------+--------------+-------------+-------------+-------------+
only showing top 5 rows

