# Activity detection with accelerometer data

In this notebook, we will compare the performance of an activity detection model trained with and without using featurization with Fast Fourier Transform

This notebook must be run in a Synapse Analytics workspace in order to work as expected

In [None]:
container_name = "YOUR_STORAGE_CONTAINER"
account_name = "YOUR_STORAGE_ACCOUNT"

workspace = mssparkutils.runtime.context["workspace"]
mssparkutils.fs.mount( 
    f"abfss://{container_name}@{account_name}.dfs.core.windows.net", 
    "/data",
    {"linkedService":f"{workspace}-WorkspaceDefaultStorage"} 
)

In [None]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00287/Activity%20Recognition%20from%20Single%20Chest-Mounted%20Accelerometer.zip

In [None]:
# PASTE THE JOB ID PRINTED BELOW IN THE NEXT CELL
mssparkutils.env.getJobId()

In [None]:
!unzip -d /synfs/JOB_ID_HERE/data/ "Activity Recognition from Single Chest-Mounted Accelerometer.zip"

In [None]:
!rm "Activity Recognition from Single Chest-Mounted Accelerometer.zip"

## 0. Libraries
We'll begin by importing all the required libraries:

In [None]:
pip install more_spark_transformers

In [None]:
import numpy as np
from pyspark import keyword_only
from pyspark.sql import Window, SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import lit, to_timestamp, lag, window, mean, stddev, max, min, skewness, kurtosis, col, rand, last
from pyspark.ml import Transformer, Pipeline, PipelineModel, Estimator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.param.shared import HasInputCols, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import MulticlassMetrics

import plotly
import plotly.express as px
import plotly.graph_objects as go

from more_spark_transformers import FFTTransformer

In [None]:
spark = SparkSession \
    .builder \
    .appName("FFT with IoT data sample") \
    .getOrCreate()

## Parameterization

In [None]:
# Numerical input columns to be analyzed
INPUT_COLUMNS = ['AccelerationX', 'AccelerationY', 'AccelerationZ']
# Multiclass label column. Should be of type int
OUTPUT_COLUMN = 'Label'
# Column to partition data before training. Sensor identifier, etc. Should be of type int
PARTITION_COLUMN = 'ParticipantId'

# If data does not contain an explicit timestamp column, provide the measure index and time between samples. 
# Sampling rate should be constant.
SAMPLING_COLUMN = 'SequentialNumber'
SAMPLING_PERIOD = 1 / 52
TIMESTAMP_COLUMN = 'Timestamp' # If not present, will be created based on the above

# FFT transformation parameters. Window duration should be as small as possible while allowing a prediction - imagine if a human were to make the prediction, how much time would they need observing the inputs? Window slide duration defines how much overlap to leave between data points - more overlap means more training data, but also more risk of overfitting the model.
FFT_WINDOW_DURATION = 8 / 52
FFT_WINDOW_SLIDE_DURATION = 8 / 52


In [None]:
FFT_WINDOW_SIZE = round(1 / SAMPLING_PERIOD * FFT_WINDOW_DURATION) # Calculated based on the above

## 1. Importing data to Spark

In [None]:
acceleration_data_raw = spark.read.csv(f"synfs:/{mssparkutils.env.getJobId()}/data/Activity Recognition from Single Chest-Mounted Accelerometer")

acceleration_data_raw = acceleration_data_raw.withColumnRenamed("_c0", "SequentialNumber")
acceleration_data_raw = acceleration_data_raw.withColumnRenamed("_c1", "AccelerationX")
acceleration_data_raw = acceleration_data_raw.withColumnRenamed("_c2", "AccelerationY")
acceleration_data_raw = acceleration_data_raw.withColumnRenamed("_c3", "AccelerationZ")
acceleration_data_raw = acceleration_data_raw.withColumnRenamed("_c4", "Label")
acceleration_data_raw = acceleration_data_raw.withColumn("ParticipantId", F.udf(lambda x: x.split('/')[-1][:-4])(F.input_file_name()))

In [None]:
acceleration_data_raw.head(5)

In [None]:
def format_data(
    raw_data, 
    inputCols=[],
    outputCol=None,
    partitionCol=None,
    samplingCol=None,
    samplingPeriod=None,
    timestampCol=None,
):
    data = raw_data

    for input_column in inputCols:
        data = data.withColumn(input_column, col(input_column).cast("int"))
        
    data = data.withColumn(partitionCol, col(partitionCol))
    data = data.withColumn(samplingCol, col(samplingCol).cast("int"))
    data = data.withColumn(outputCol, col(outputCol).cast("int"))

    if samplingCol and samplingPeriod:
        data = data.withColumn(timestampCol, to_timestamp(samplingPeriod * col(samplingCol)))

    data = data.select(timestampCol, partitionCol, *inputCols, outputCol)
    data = data.orderBy(partitionCol, timestampCol)
    return data

acceleration_data = format_data(
    acceleration_data_raw,
    inputCols=INPUT_COLUMNS,
    outputCol=OUTPUT_COLUMN,
    partitionCol=PARTITION_COLUMN,
    samplingCol=SAMPLING_COLUMN,
    samplingPeriod=SAMPLING_PERIOD,
    timestampCol=TIMESTAMP_COLUMN,
)

## 2. Exploratory data analysis

Based on the documentation in the README and what we see in the data,
we have the following information:

- **ParticipantId**: participant identifier.
- **SequentialNumber**: incremental column for each participant in the study, 
indicating the point in time the measure was taken. Each increment is
equivalent to 1/52nd of a second.
- **AccelerationX, AccelerationY and AccelerationZ**: accelerometer readings with regards to the three dimensions
of space. The accelerometer is chest mounted, which means the axes are
constantly aligned with the participant's body.
- **Label**: activity label index, from 1-7, indicating seven different
activities we would like to predict, based on movement.

As we are to predict a **categorical value** (activity), we will treat
this as a **classification problem**.

One of the first activities in Exploratory Data Analysis is to understand whether the input columns correlate
with the output in some way. This can be done in many ways, including visually.
Next, as an example let's look at how AccelerationZ (as an example) correlates to the activity being performed.

In [None]:
target_df = acceleration_data.where('ParticipantId = 1').toPandas()

fig = go.Figure(data=[
    go.Line(x=target_df['Timestamp'], y=(target_df['AccelerationZ']-target_df['AccelerationZ'].mean()+11)/target_df['AccelerationZ'].std()),
    go.Line(x=target_df['Timestamp'], y=target_df['Label'])
])
fig.update_yaxes(
    range=[-3,8],  # sets the range of xaxis
    constrain="domain",  # meanwhile compresses the xaxis by decreasing its "domain"
)
fig.show()

Notice how the patterns are significantly different for different activities:
- Activity 1 (working at a computer): Long periods with low activity;
- Activity 4 (walking): Constant activity;

Also notice how an instantaneous acceleration reading is probably not enough to determine the current activity.
We will need to perform some kind of window aggregation to find meaning in our inputs.

Now, turning our attention to the labels, we can plot the distribution of activities to acess whether the dataset
is imbalanced, which would require additional precautions.

In [None]:
activity_counts = acceleration_data.groupBy(OUTPUT_COLUMN).agg({'*': 'count'}).toPandas()
fig = px.bar(activity_counts, x=OUTPUT_COLUMN, y='count(1)')
fig.show()

A few problems are identified, and the following additional steps are taken:

- An unidentified label "0" is present in the dataset. We will drop these rows as we cannot interpret what they mean.
- Labels 2, 5, and 6 are very low frequency (under 10% of the dataset) and likely similar to other labels.
We propose eliminating label 2, as it's in fact a combination of multiple other actions in sequence, and joining
labels 4 and 6, as both mean the participant is walking. As label 5 refers to going up/down stairs, we expect
it will be easily identifiable, so we will not alter it right now.

In [None]:
from pyspark.sql.functions import when

acceleration_data_clean = acceleration_data\
    .where(col(OUTPUT_COLUMN) != 0)\
    .where(col(OUTPUT_COLUMN) != 2)\
    .withColumn(OUTPUT_COLUMN, when(acceleration_data.__getattr__(OUTPUT_COLUMN) == 6, 4).otherwise(acceleration_data.__getattr__(OUTPUT_COLUMN)))


## 3. Modeling

We'll train and evaluate two models using the exact same data:

- On model 1, we will use the data as-is, without feature engineering;
- On model 2, we will use FFT decomposition before modeling.

Check out the blog (linked in the README) for details on what the FFT transform is doing.

In [None]:
training_data = acceleration_data_clean.where(acceleration_data_clean.ParticipantId != "1")
test_data = acceleration_data_clean.where(acceleration_data_clean.ParticipantId == "1")

In [None]:
# At this point, we're using an imported FFTTransformer published to pip.
# This is to ensure that pickling and sharing models is a smoother experience.
# In case you're wondering what the source code for the transformer looks like, it is copied below

# import numpy as np
# import pyspark.sql.types as T
# import pyspark.sql.functions as F
# from pyspark import keyword_only
# from pyspark.sql.functions import window, col, rand, last
# from pyspark.ml import Transformer
# from pyspark.ml.param.shared import HasInputCols, HasOutputCol, Param, Params, TypeConverters
# from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


# class FFTTransformer(Transformer, HasInputCols, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
#     inputCols = Param(Params._dummy(), "inputCols", "inputCols", typeConverter=TypeConverters.toListString)
#     windowDuration = Param(Params._dummy(), "windowDuration", "windowDuration", typeConverter=TypeConverters.toFloat)
#     windowSlideDuration = Param(Params._dummy(), "windowSlideDuration", "windowSlideDuration", typeConverter=TypeConverters.toFloat)
#     samplingPeriod = Param(Params._dummy(), "samplingPeriod", "samplingPeriod", typeConverter=TypeConverters.toFloat)
#     partitionColumn = Param(Params._dummy(), "partitionColumn", "partitionColumn", typeConverter=TypeConverters.toString)
#     timestampColumn = Param(Params._dummy(), "timestampColumn", "timestampColumn", typeConverter=TypeConverters.toString)
#     outputCol = Param(Params._dummy(), "outputCol", "outputCol", typeConverter=TypeConverters.toString)

#     @keyword_only
#     def __init__(self,inputCols=[], windowDuration=None, windowSlideDuration=None, samplingPeriod=None, partitionColumn=None, timestampColumn=None, outputCol=None):
#         super(FFTTransformer, self).__init__()
#         kwargs = self._input_kwargs
#         self.set_params(**kwargs)
#         self.uid = 'FFTTransformer'
#         self.fft_udf = F.udf(lambda z: [float(i) for i in np.fft.fft(z)], T.ArrayType(T.FloatType()))

  
#     @keyword_only
#     def set_params(self, **kwargs):
#         self._set(**kwargs)
    

#     def get_fft(self, column):
#         return self.fft_udf(F.collect_list(col(column))).alias(f"{column}_FFT")


#     def _transform(self, dataset):

#         inputCols = self.getOrDefault("inputCols")
#         windowDuration = self.getOrDefault("windowDuration")
#         windowSlideDuration = self.getOrDefault("windowSlideDuration")
#         samplingPeriod = self.getOrDefault("samplingPeriod")
#         partitionColumn = self.getOrDefault("partitionColumn")
#         timestampColumn = self.getOrDefault("timestampColumn")
#         outputCol = self.getOrDefault("outputCol")

#         windowDurationString = f'{round(1000 * windowDuration)} milliseconds'
#         windowSlideDurationString = f'{round(1000 * windowSlideDuration)} milliseconds'
#         windowSize = round(1 / samplingPeriod * windowDuration)

#         fft_columns = [self.get_fft(column) for column in inputCols]
#         fft_column_names = [f"{column}_FFT" for column in inputCols ]

#         df =  dataset\
#             .groupBy(partitionColumn, window(timestampColumn, windowDuration=windowDurationString, slideDuration=windowSlideDurationString))\
#             .agg(
#                 *fft_columns,
#                 last(col(outputCol))
#             )

#         fft_column_components = [df[f"{column}_FFT"][index] for column in inputCols for index in range(windowSize) ]

#         return df\
#             .select('*', *fft_column_components)\
#             .drop(*fft_column_names)


In [None]:
fftTransformer = FFTTransformer(
    inputCols=INPUT_COLUMNS,
    windowDuration=FFT_WINDOW_DURATION,
    windowSlideDuration=FFT_WINDOW_SLIDE_DURATION,
    samplingPeriod=SAMPLING_PERIOD,
    partitionColumn=PARTITION_COLUMN,
    timestampColumn=TIMESTAMP_COLUMN,
    outputCol=OUTPUT_COLUMN
)
FFT_COLUMNS = [f"{column}_FFT[{index}]" for column in INPUT_COLUMNS for index in range(FFT_WINDOW_SIZE)]
vectorAssembler_1 = VectorAssembler(inputCols=INPUT_COLUMNS, outputCol="features", handleInvalid="skip")
vectorAssembler_2 = VectorAssembler(inputCols=FFT_COLUMNS, outputCol="features", handleInvalid="skip")
scaler_1 = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler_2 = StandardScaler(inputCol="features", outputCol="features_scaled")
rf_1 = RandomForestClassifier(numTrees=100, featuresCol="features_scaled", labelCol=OUTPUT_COLUMN)
rf_2 = RandomForestClassifier(numTrees=100, featuresCol="features_scaled", labelCol=f"last({OUTPUT_COLUMN})")


pipeline_1 = Pipeline(stages=[
    vectorAssembler_1, 
    scaler_1,
    rf_1
])
pipeline_2 = Pipeline(stages=[
    fftTransformer,
    vectorAssembler_2, 
    scaler_2,
    rf_2
])

finalModel_1 = pipeline_1.fit(training_data)
finalModel_2 = pipeline_2.fit(training_data)


## 4. Model evaluation

In [None]:
predictions_1 = finalModel_1.transform(test_data)
preds_and_labels_1 = predictions_1.select(['prediction','Label']).withColumn('label', col('Label').cast('float')).orderBy('prediction')
preds_and_labels_1 = preds_and_labels_1.select(['prediction','label'])

predictions_2 = finalModel_2.transform(test_data)
preds_and_labels_2 = predictions_2.select(['prediction','last(Label)']).withColumn('label', col('last(Label)').cast('float')).orderBy('prediction')
preds_and_labels_2 = preds_and_labels_2.select(['prediction','label'])

metrics_1= MulticlassMetrics(preds_and_labels_1.rdd.map(tuple))
metrics_2= MulticlassMetrics(preds_and_labels_2.rdd.map(tuple))

In [None]:
print(f"Regular data model accuracy: {metrics_1.accuracy}")

print(f"FFT data model accuracy: {metrics_2.accuracy}")