## Problem definition and data description

### Problem Definition
Our company (Spotify) would like to dynamically target advertising to non-premium members based on their physical activity while using Spotify services. For example, while a listener is enjoying a podcast and folding their laundry, they would receive an ad for laundry detergent. 

In addition Spotify also wishes to cater to our premium members by enhancing music recommendation/auto-play options based on a members physical activity. For example, while a user is exercising play up-tempo music, and while a user is eating pasta play Italian classics.

### Data Description

Accelerometer (measures proper acceleration) and Gyroscope (measures orientation and angular velocity) data was collected from 51 volunteer subjects. Each subject was asked to perform 18 tasks for 3 minutes each. The 18 tasks were a mix of physical activities that could be distinctly identified, such as walking, eating, laundry, etc. We (Spotify) tried to collect data for activities that our members might be doing while using our services. The tasks are listed below.

![image info](./images/Activity-Code-Table.png)

Each subject had a smartwatch placed on his/her dominant hand and a smartphone in their pocket. The smartphone and smartwatch both had an accelerometer and gyrocope, yielding four total sensors (Phone - Gyroscope, Phone - Accelerometer, Watch - Gyroscope, Watch - Accelerometer).

![image info](./images/Human-With-Sensors.png)

To accomodate the four sensors, the data is split up into 4 subdirectories, one for each device and sensor. 

![image info](./images/Sensor-Subdirectories.png)

Each directory contains the sensor results for the 51 subject's performance of the 18 activities. The results for each subject are stored in a comma delimited text file. Since there are 51 subjects and 4 different sensors, there are a total of 204 text files. Each text file has the same six attributes: Subject-id, Activity Code, Timestamp, x, y, z

![image info](./images/Raw-Data-Description.png)

## Data preparation process

Our data is pretty clean, we don't need to do a lot of preproccessing/data engineering. We really just need to do the ML side, which, lends itself more to the majority of work we need to do with this project. We stuck with dask so we could use the natural integration it has with python, as well as its similiar syntax to Pandas.

To clean, prepare and train our data, we decided to go with dask. Our reasoning was that, while our data was large (approx. 15 million records, ~1 gb), it was not large enough to warrant the use of Spark. The image below summarizes our thoughts on the choice between dask vs spark.

![image info](./images/Pandas-Dask-Spark-Compare.png)

### Importing the data

To shortstep the inconvenience of downloading and importing over 200 text files, we decided to host all the data on github for easy access (https://github.com/gojandrooo/DSE-230/tree/main/data). To quickly pull the github data into a pandas dataframe, we defined a function collate_df that will pull in all data matching the parameters given.

Begin by importing all the necessary libraries

Running within *Docker* container you will need to install libraries not already included in the image.
- comment/uncomment the `%pip install` cell (below)
- run the cell, wait for the packages to install, and then restart notebook. 
- once installs are complete, comment out the cell and run all

In [None]:
%pip install plotly
%pip install dask_distance

In [None]:
#set a random state seed for replication
seed=23

In [None]:
# standard libraries
import os
import pandas as pd
import numpy as np
import itertools as it

# plotting libraries
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

# distributed libraries
import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client
from dask import delayed
import joblib

# model processing libraries
import dask_distance
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import StandardScaler
#from sklearn.model_selection import GridSearchCV
import dask_ml.model_selection as dcv

# models
# will need to update these with the models we use
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.cluster import KMeans

import ssl
# needed to request files from GitHub when running within docker container
ssl._create_default_https_context = ssl._create_unverified_context

In [None]:
# Start and connect to a local dask.distributed client
client = Client(processes=True) # use all 4 cores
client.connection_args

Get data from github and prep files for analysis

In [None]:
# key for understanding which activity is being measured in a record
activity_key_url = r"https://raw.githubusercontent.com/gojandrooo/DSE-230/main/data/activity_key.txt"

#read the activity table from gtihub
activity_key = pd.read_csv(activity_key_url, header=None)

#split the data into a proper table
activity_key = activity_key[0].str.replace(" ", "").str.split("=", expand=True)
activity_key.columns = ['activity', 'code']

In [None]:
# #do not run unless you need cleaned parquet files locally
# #cleaned parquet files should already be located on github

# #below function takes the raw data on githup and converts to parquet files
# #then stores the files on local machine
# '''
# def convert_raw_to_parquet():
#     #base URL where raw data can be easly grabbed
#     base_url = r"https://raw.githubusercontent.com/gojandrooo/DSE-230/main/data"

#     # TOGGLE FOR DEVICE
#     devices = ["phone", "watch"]

#     # TOGGLE FOR MEASUREMENT TYPE
#     data_types = ["accel", "gyro"]
    
    
#     # create list of local folders
#     for data_type in data_types:
#         for device in devices:
#             os.makedirs(r"data/parquet/" + "/" + device + "/" + data_type, exist_ok=True) 
    

#     locs = {}
#     for data_type in data_types:
#         for device in devices:
#             file_locs = []
#             for user_id in range(1600, 1651):
#                 url = base_url + "/" + device + "/" + data_type + f"/data_{user_id}_{data_type}_{device}.txt"
#                 df = pd.read_csv(url, header=None)
#                 df.columns = ['subject_id', 'code', 'timestamp', 'x', 'y', 'z']
#                 custom_dtypes = {"subject_id": "int16", "x": "float32", "y": "float32", "z": "float32"}
#                 df['z'] = df['z'].str.replace(";", "")
#                 #df = df.reset_index(drop = True)
#                 df = df.astype(custom_dtypes)
#                 df['index'] = df['subject_id'].astype('str') + df['code'] + df['timestamp'].astype('str')
#                 fname = r"data/parquet/" + "/" + device + "/" + data_type + f"/data_{user_id}_{data_type}_{device}.gzip"
#                 df.to_parquet(fname)
#                 file_locs.append(fname)
#             locs[device, data_type] = file_locs
# '''

In [None]:
# NOTE
# this still only grabs three spreadsheets, update for production

# dask_df = dd.read_parquet('./data/parquet/phone/accel/data_1600_accel_phone.gzip', index = 'index')
def collate_dask_df(device, data_type):

    '''
    returns a single dask dataframe from multiple text files hosted on github
    
    device: ["phone", "watch"]
    
    data_type: ["accel", "gyro"]
    ----------------------------
    '''
    
    #base_url = r"https://raw.githubusercontent.com/gojandrooo/DSE-230/main/data"
    # base_url = r"https://github.com/garrett391/DSE-230/blob/main/data/parquet"
    #base_url = r"https://github.com/gojandrooo/DSE-230/blob/main/data/parquet"
    base_url = './data/parquet'

    # TOGGLE FOR DEVICE
    device = device

    # TOGGLE FOR MEASUREMENT TYPE
    data_type = data_type
    
    # create list of all file names
    #file_names = [f"/data_{user_id}_{data_type}_{device}.txt" for user_id in range(1600, 1651)]
    # file_names = [f"/data_{user_id}_{data_type}_{device}.gzip?raw=true" for user_id in range(1600, 1651)]
    file_names = [f"/data_{user_id}_{data_type}_{device}.gzip" for user_id in range(1600, 1651)]

    # create urls of all files
    loop_urls = [base_url + "/" + device + "/" + data_type + file_name for file_name in file_names]
    
    #setting datatypes to save memory
    
    #dask_df = dd.read_parquet(loop_urls[:3], index = 'index') # for dev this is only the first three files
    dask_df = dd.read_parquet(loop_urls, index = 'index') # PRODUCTION, all of the files

    #dask_df = dd.multi.concat([pd.read_csv(url, header=None) for url in loop_urls[:3]]) # for dev this is only the first three files
    #dask_df = dd.multi.concat([pd.read_csv(url, header=None) for url in loop_urls]) # PRODUCTION, all of the files
    
    #dask_df.columns = ['subject_id', 'code', 'timestamp', 'x', 'y', 'z']
    #dask_df['z'] = dask_df['z'].str.replace(";", "").astype('float64')
    #dask_df = dask_df.reset_index(drop = True)
    
    return dask_df # dask df output

### Importing the data

In [None]:
client.restart()

In [None]:
%%time
dd_ref = {
    'phone_accel': collate_dask_df("phone", "accel"),
    'phone_gyro': collate_dask_df("phone", "gyro"),
    'watch_accel': collate_dask_df("watch", "accel"),
    'watch_gyro': collate_dask_df("watch", "gyro")
}

print(pd.DataFrame({
    'dd': [k for k in dd_ref.keys()],
    'rows': [dd.shape[0].compute() for dd in dd_ref.values()],
    'columns': [dd.shape[1] for dd in dd_ref.values()]
}))

In [None]:
%%time
for k, v_dd in dd_ref.items():
    dd_ref[k] = v_dd.assign(
        xy = v_dd['x'] * v_dd['y'],
        yz = v_dd['y'] * v_dd['z'],
        xz = v_dd['x'] * v_dd['z'],
        x2 = v_dd['x']**2,
        y2 = v_dd['y']**2,
        z2 = v_dd['z']**2
    )
    
print(pd.DataFrame({
    'dd': [k for k in dd_ref.keys()],
    'rows': [dd.shape[0].compute() for dd in dd_ref.values()],
    'columns': [dd.shape[1] for dd in dd_ref.values()]
}))

### Merge files based on index

In [None]:
feat_cols = ['x', 'y', 'z', 'xy', 'yz', 'xz', 'x2', 'y2', 'z2']

def merge_dfs(df1, df2, suffixes):
    df1partitions = df1.npartitions
    df2partitions = df2.npartitions
    partitions = min(df1partitions, df2partitions)
    merged =  dd.merge(
        df1, df2[feat_cols], how='inner', left_index=True, right_index=True, suffixes=suffixes
    ).reset_index(drop = True)
    return dd.from_pandas(merged.compute(), npartitions = partitions)

shape_ref = {}

In [None]:
%%time
phone_df = merge_dfs(dd_ref['phone_accel'], dd_ref['phone_gyro'][feat_cols], ('_phone_accel', '_phone_gyro'))
shape_ref['phone_df'] = phone_df.shape[0].compute()
client.cancel(dd_ref['phone_accel'])
client.cancel(dd_ref['phone_gyro'])


In [None]:
%%time
watch_df = merge_dfs(dd_ref['watch_accel'], dd_ref['watch_gyro'][feat_cols], ('_watch_accel', '_watch_gyro'))
shape_ref['watch_df'] = watch_df.shape[0].compute()
client.cancel(dd_ref['watch_accel'])
client.cancel(dd_ref['watch_gyro'])


In [None]:
del dd_ref

In [None]:
# takes the combined sensor data and bins the data by taking the average depending on the seconds required
# shapes 
def group_into_seconds(df, num_seconds):
    # calculates the number of rows to average over by converting seconds to ms and diving by 50 (sensor interval)
    
    n_rows = (num_seconds*1000)/50
    print('Grouped every', n_rows, 'rows')
    
    tempdf = df.reset_index(drop=True).reset_index()
    
    # rename of the index column
    # print(tempdf['index'].compute())
    
    tempdf = tempdf.rename(columns= {'index': 'grouper'})
    
    # creates a variable to group within n_seconds
    tempdf['grouper'] = tempdf['grouper']//n_rows
    
    # aggregate to n_seconds
    tempdf = tempdf.groupby(by = ['grouper', 'code', 'subject_id']).agg(['mean', 'sum']).reset_index()
    # drop superflous grouper column
    # del tempdf['grouper']
    
    #tempdf.columns = list(map(''.join, tempdf.columns.values))
    
    return tempdf
    # return df.groupby(np.arange(len(df))//n_rows).mean().compute()

agg_time = 2
grouped_phone_df = dask.compute(group_into_seconds(phone_df, agg_time))[0]
shape_ref['grouped_phone_df'] = grouped_phone_df.shape[0]

client.cancel(phone_df)
del phone_df

grouped_watch_df = dask.compute(group_into_seconds(watch_df, agg_time))[0]
shape_ref['grouped_watch_df'] = grouped_watch_df.shape[0]
client.cancel(watch_df)
del watch_df


In [None]:
print(shape_ref)

In [None]:
#just testing to make sure it returns the same exact data frame when growing rows = 1
# group_into_seconds(phone_df.compute(),50/1000)

In [None]:
# group_into_seconds(phone_df.compute(),2)

In [None]:
# pass this variable in to all our aggregation functions
# it is the number of seconds we are aggregating to
agg_time = 3

In [None]:
# calculate the averages within our time interval


# <font color='red'>warning - this is when it gets REALLY slow</font>

In [None]:
# calculate the created features within our time interval
synth_phone_df, synth_watch_df = dask.compute(
    cos_cor_aggregation(phone_df, agg_time, 64),
    cos_cor_aggregation(watch_df, agg_time, 64)
    )    

In [None]:
'''
merge grouped averages with synthetic features
they must be the same shapes
we also need to test for unexpected shuffling behavior
'''

# merge the new features from each sensor into one df
prepped_phone_df = dd.merge(
    grouped_phone_df, 
    synth_phone_df, 
    how='inner', 
    left_index = True, 
    right_index = True, 
        )

prepped_watch_df = dd.merge(
    grouped_watch_df, 
    synth_watch_df, 
    how='inner', 
    left_index = True, 
    right_index = True, 
        )

In [None]:
dd.compute(prepped_phone_df.shape, prepped_watch_df.shape)

In [None]:
prepped_phone_df.head()

In [None]:
prepped_watch_df.head()

### create csv files for faster recall

In [None]:
# write out file to csv
file_name = 'prepped_phone_df'
df = prepped_phone_df
# should output as .csv to retain data structure
df.to_csv(fr'./prepped-data/{file_name}.csv')

In [None]:
# write out file to csv
file_name = 'prepped_watch_df'
df = prepped_watch_df
# should output as .csv to retain data structure
df.to_csv(fr'./prepped-data/{file_name}.csv')

In [None]:
# # write out to excel (wireframe)
# file_name = 'file_name'
# writer = pd.ExcelWriter(f'{file_name}.xlsx', engine='xlsxwriter')
# df.to_excel(writer, sheet_name='sheet-name')
# writer.save()

In [None]:
# # output the to .tsv/csv (wireframe)
# file_name = 'file_name'
# df = df#.astype(str) #preserve dtype with str if not already
# # should output as .tsv to retain data structure
# df.to_csv(fr'{file_name}.tsv', sep='\t', index=False)

In [None]:
# # serialize file (wireframe)
# file_name = 'file_name'
# df = df
# df.to_pickle(f"./{file_name}.pkl")

In [None]:
# # read serialized file (wireframe)
# file_name = 'file_name'
# unpickled_df = pd.read_pickle(f"./{file_name}.pkl")

In [None]:
# # uncompress file and read in to dask (wireframe)
# file_name = 'file_name'
# unpickled_df = pd.read_pickle(f"./{file_name}.pkl")
# ddf = dd.from_pandas(unpickled_df, npartitions=8)

In [None]:
# # read in file as a dask dataframe
# phone_accel = dd.read_csv(f"prepped-data/{file_name}.csv")

**<font color='red'>I don't think we actually need hadoop. saving in case we do and/or syntax for running other  bash commands</font>**

In [None]:
# %%bash
# dir

**create hadoop directory**

In [None]:
# %%bash
# hadoop fs -mkdir /hdfs-data

**copy from local into hadoop**

In [None]:
# %%bash
# hadoop fs -copyFromLocal prepped-data/data_phone_accel.csv /hdfs-data

**make sure file is in hadoop**

In [None]:
# %%bash
# hadoop fs -ls /hdfs-data

- Use PySpark or Dask
- Include one classificationorregressionorclusteranalysis task
- Describe problem
    - To include:  Explain why problem is interesting, what real-life application is being addressed
- Describe analysis task
    - To include:  type of task (e.g., classification), how does task related to business problem
- Describe data
    - To include:  data quality issues, characteristics of the dataset (summary statistics,
correlation, outliers, etc.), plots
- Describe data preparation process
    - To include:  data cleaning steps, features used, train/validation/test datasets
- Describe analysis approaches
    - To include:  input, setup, and output of model(s)
- Describe challenges and solutions
    - To include:  challenges encountered, solutions to address challenges
- Describe analysis results and insights gained
    - To include:  discussion of results, insights gained from analysis
- Describe future work
    - To include:  lessons learned, next steps, what you would have done differently




Measures movement data over ten-second
intervals while subjects perform the various tasks.

## Analysis approaches

### Model Selection

<font color='red'>this section is wildly incomplete</font>

[**sklearn - Decision Tree Regression with AdaBoost**](https://scikit-learn.org/stable/auto_examples/ensemble/plot_adaboost_regression.html)

In [None]:
phone_accel.head()

In [None]:
# TRAIN TEST SPLIT

# split off labels
feat_cols = ['x', 'y', 'z']
label_col = ['code']

feature_df = phone_accel[feat_cols]
label_df = phone_accel[label_col]

X_train, x_test, y_train, y_test = train_test_split(feature_df, label_df, test_size=0.8, shuffle=True, random_state=seed)

In [None]:
# SCALE DATA

# instatiate scaler
scaler = StandardScaler()
# fit the scaler
scalerModel = scaler.fit(X_train)
# scale the training data
X_train_scaled = scalerModel.transform(X_train)
# scale the test data
X_test_scaled = scalerModel.transform(x_test)

In [None]:
# set up grid search parameters
param_grid = {'max_depth'        : list(range(1, 10)), # play around with max depth
              'min_samples_split': list(range(2, 10)), # must start at 2+
              'criterion'        : ['gini','entropy'],
             }

In [None]:
# GRID SEARCH

# instantiate base model
dt_model = DecisionTreeClassifier(random_state=seed)

# istantiate grid search object
dt_model_grid_dask = dcv.GridSearchCV(dt_model, param_grid, cv=10)

# execute grid search
'''
does this need joblib backend if we are using native dask?
'''
with joblib.parallel_backend("dask"):
    dt_model_grid_dask.fit(X_train_scaled, y_train)

In [None]:
best_params = dt_model_grid_dask.best_params_
print(best_params)

In [None]:
print(dt_model_grid_dask.best_score_)

In [None]:
# now that we've performed a gridsearch, use parameters from out best model

# instantiate best model
best_dt_model = DecisionTreeClassifier(
    max_depth=best_params['max_depth'],
    min_samples_split=best_params['min_samples_split'],
    criterion=best_params['criterion'],
    random_state=seed
)

# fit model to training data
'''
does this need joblib backend if we are using native dask?
'''
with joblib.parallel_backend("dask"):
    best_dt_model.fit(X_train_scaled, y_train)

# check accuracy from this model on test data
best_dt_model.score(X_test_scaled, y_test)

## Analysis results

## Challenges & solutions

## Insights gained

## Future work

## References

1. Dask vs spark picture: https://medium.datadriveninvestor.com/pandas-dask-or-pyspark-what-should-you-choose-for-your-dataset-c0f67e1b1d36
2. Accelerometer information https://en.wikipedia.org/wiki/Accelerometer
3. Gyroscope Information https://en.wikipedia.org/wiki/Gyroscope

In [None]:
# always close client connection at end of workflow
client.shutdown()