
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.


## Source of the data
Kaggle: https://www.kaggle.com/datasets/deepalisukhdeve/bellabeat

## Dataframes:
1. dailyActivity_merged
2. heartrate_seconds_merged
3. sleepDay_merged
4. weightLogInfo_merged


## Loading

In [0]:
from pyspark.sql.functions import isnan, when, count, col

In [0]:
df_dailyActivity_merged = spark.read.format("csv").option("header", "true").load("/FileStore/tables/dailyActivity_merged.csv")

In [0]:
df_sleepDay_merged = spark.read.format("csv").option("header", "true").load("/FileStore/tables/sleepDay_merged.csv")

In [0]:
df_heartrate_seconds_merged = spark.read.format("csv").option("header", "true").load("/FileStore/tables/heartrate_seconds_merged.csv")

In [0]:
df_weightLogInfo_merged = spark.read.format("csv").option("header", "true").load("/FileStore/tables/weightLogInfo_merged.csv")

In [0]:
from pyspark.sql.functions import to_date, to_timestamp, col, regexp_replace, date_format

In [0]:
df_dailyActivity_merged = df_dailyActivity_merged.withColumn("ActivityDate", to_date("ActivityDate", "M/d/y"))

for c in ['TotalSteps',
 'TotalDistance',
 'TrackerDistance',
 'LoggedActivitiesDistance',
 'VeryActiveDistance',
 'ModeratelyActiveDistance',
 'LightActiveDistance',
 'SedentaryActiveDistance',
 'VeryActiveMinutes',
 'FairlyActiveMinutes',
 'LightlyActiveMinutes',
 'SedentaryMinutes',
 'Calories']:
    df_dailyActivity_merged = df_dailyActivity_merged.withColumn(c, regexp_replace(col(c), ",", ".").cast("double"))

In [0]:
df_dailyActivity_merged = df_dailyActivity_merged.filter(df_dailyActivity_merged.Calories != 0).select("*")

In [0]:
display(df_sleepDay_merged)

In [0]:
df_sleepDay_merged = df_sleepDay_merged.withColumn(
    "SleepDay",
    to_timestamp("SleepDay", "M/d/y h:mm:ss a")
)

for c in ['TotalSleepRecords',
 'TotalMinutesAsleep',
 'TotalTimeInBed']:
    df_sleepDay_merged = df_sleepDay_merged.withColumn(c, regexp_replace(col(c), ",", ".").cast("double"))

In [0]:
df_sleepDay_merged.printSchema()

root
 |-- Id: string (nullable = true)
 |-- SleepDay: timestamp (nullable = true)
 |-- TotalSleepRecords: double (nullable = true)
 |-- TotalMinutesAsleep: double (nullable = true)
 |-- TotalTimeInBed: double (nullable = true)



In [0]:
display(df_dailyActivity_merged)

In [0]:
df_dailyActivity_merged.printSchema()

root
 |-- Id: string (nullable = true)
 |-- ActivityDate: date (nullable = true)
 |-- TotalSteps: double (nullable = true)
 |-- TotalDistance: double (nullable = true)
 |-- TrackerDistance: double (nullable = true)
 |-- LoggedActivitiesDistance: double (nullable = true)
 |-- VeryActiveDistance: double (nullable = true)
 |-- ModeratelyActiveDistance: double (nullable = true)
 |-- LightActiveDistance: double (nullable = true)
 |-- SedentaryActiveDistance: double (nullable = true)
 |-- VeryActiveMinutes: double (nullable = true)
 |-- FairlyActiveMinutes: double (nullable = true)
 |-- LightlyActiveMinutes: double (nullable = true)
 |-- SedentaryMinutes: double (nullable = true)
 |-- Calories: double (nullable = true)



In [0]:
display(df_heartrate_seconds_merged)

In [0]:
df_heartrate_seconds_merged = df_heartrate_seconds_merged.withColumn(
    "Time",
    to_timestamp("Time", "M/d/y h:mm:ss a")
)

df_heartrate_seconds_merged = df_heartrate_seconds_merged.withColumn("Value", regexp_replace(col("Value"), ",", ".").cast("double"))

In [0]:
df_heartrate_seconds_merged.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Value: double (nullable = true)



In [0]:
display(df_weightLogInfo_merged)

Id,Date,WeightKg,WeightPounds,Fat,BMI,IsManualReport,LogId
1503960366,5/2/2016 11:59:59 PM,52.5999984741211,115.963146545323,22.0,22.6499996185303,True,1462233599000
1503960366,5/3/2016 11:59:59 PM,52.5999984741211,115.963146545323,,22.6499996185303,True,1462319999000
1927972279,4/13/2016 1:08:52 AM,133.5,294.317120016975,,47.5400009155273,False,1460509732000
2873212765,4/21/2016 11:59:59 PM,56.7000007629395,125.002104340889,,21.4500007629395,True,1461283199000
2873212765,5/12/2016 11:59:59 PM,57.2999992370605,126.324874550011,,21.6900005340576,True,1463097599000
4319703577,4/17/2016 11:59:59 PM,72.4000015258789,159.614681185927,25.0,27.4500007629395,True,1460937599000
4319703577,5/4/2016 11:59:59 PM,72.3000030517578,159.394222287729,,27.3799991607666,True,1462406399000
4558609924,4/18/2016 11:59:59 PM,69.6999969482422,153.662190014971,,27.25,True,1461023999000
4558609924,4/25/2016 11:59:59 PM,70.3000030517578,154.984977044029,,27.4599990844727,True,1461628799000
4558609924,5/1/2016 11:59:59 PM,69.9000015258789,154.103124631302,,27.3199996948242,True,1462147199000


In [0]:
df_weightLogInfo_merged = df_weightLogInfo_merged.withColumn(
    "Date",
    to_timestamp("Date", "M/d/y h:mm:ss a")
)

for c in ['WeightKg',
 'WeightPounds']:
    df_weightLogInfo_merged = df_weightLogInfo_merged.withColumn(c, regexp_replace(col(c), ",", ".").cast("double"))

In [0]:
# Assuming you want to drop columns 'column1', 'column2', and 'column3'
columns_to_drop = ['Fat', 'LogId']

df_weightLogInfo_merged = df_weightLogInfo_merged.drop(*columns_to_drop)


In [0]:
df_weightLogInfo_merged.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- WeightKg: double (nullable = true)
 |-- WeightPounds: double (nullable = true)
 |-- BMI: string (nullable = true)
 |-- IsManualReport: string (nullable = true)



## EXPLORATION !!!!
All data loaded and formatted. Now we can start exploring

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
df_dailyActivity_merged.cache()

DataFrame[Id: string, ActivityDate: date, TotalSteps: double, TotalDistance: double, TrackerDistance: double, LoggedActivitiesDistance: double, VeryActiveDistance: double, ModeratelyActiveDistance: double, LightActiveDistance: double, SedentaryActiveDistance: double, VeryActiveMinutes: double, FairlyActiveMinutes: double, LightlyActiveMinutes: double, SedentaryMinutes: double, Calories: double]

In [0]:
from pyspark.sql import functions

In [0]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark import SparkContext
from operator import add

In [0]:
display(df_dailyActivity_merged.describe())

summary,Id,TotalSteps,TotalDistance,TrackerDistance,LoggedActivitiesDistance,VeryActiveDistance,ModeratelyActiveDistance,LightActiveDistance,SedentaryActiveDistance,VeryActiveMinutes,FairlyActiveMinutes,LightlyActiveMinutes,SedentaryMinutes,Calories
count,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0,936.0
mean,4849840869.51282,7670.551282051282,5.513162387393686,5.498749993966383,0.1086332088606989,1.5091025640384064,0.5699679468893725,3.355096153497027,0.001613247841119,21.25534188034188,13.622863247863249,193.6367521367521,989.292735042735,2313.45405982906
stddev,2421440106.640881,5073.379140077313,3.916496666155722,3.899142895924891,0.621180621793722,2.662801641621189,0.8846865308575281,2.0332578626789246,0.0073611202496938,32.885735953264124,20.01036818755719,108.67557811602325,300.4746544350443,703.683874107321
min,1503960366.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,52.0
max,8877689391.0,36019.0,28.0300006866455,28.0300006866455,4.94214200973511,21.9200000762939,6.48000001907349,10.710000038147,0.109999999403954,210.0,143.0,518.0,1440.0,4900.0


lets check a little bit the distribution of hour dataset

In [0]:
data = np.array(df_dailyActivity_merged.select('Calories').rdd.flatMap(lambda x: x).collect())

plt.hist(data, bins=25, edgecolor='black')
# Calculate quartile values
q1 = np.percentile(data, 25)
q2 = np.percentile(data, 50)
q3 = np.percentile(data, 75)

# Plot the histogram using NumPy and Matplotlib
plt.xlabel('Values')
plt.ylabel('Frequency')
plt.title('Histogram of Numeric Column')

# Add red lines for each quartile
plt.axvline(q1, color='red', linestyle='dashed', linewidth=2, label='Q1')
plt.axvline(q2, color='red', linestyle='dashed', linewidth=2, label='Q2 (Median)')
plt.axvline(q3, color='red', linestyle='dashed', linewidth=2, label='Q3')

# Display legend
plt.legend()

plt.show()


Creating different dataframes to compare trends on Active Minutes and Distances

In [0]:
df_veryActive = df_dailyActivity_merged.select('Id','ActivityDate','VeryActiveMinutes','VeryActiveDistance')\
    .withColumn("Type",lit("VeryActive"))\
    .withColumnRenamed("VeryActiveMinutes","ActiveMinutes")\
    .withColumnRenamed("VeryActiveDistance","ActiveDistance")\
    .filter('ActiveMinutes != 0 And ActiveDistance != 0')
    

df_fairlyActive = df_dailyActivity_merged.select('Id','ActivityDate','FairlyActiveMinutes','ModeratelyActiveDistance')\
    .withColumn("Type",lit("Fairly"))\
    .withColumnRenamed("FairlyActiveMinutes","ActiveMinutes")\
    .withColumnRenamed("ModeratelyActiveDistance","ActiveDistance")\
    .filter('ActiveMinutes != 0 And ActiveDistance != 0')

df_lightActive = df_dailyActivity_merged.select('Id','ActivityDate','LightlyActiveMinutes','LightActiveDistance')\
    .withColumn("Type",lit("Light"))\
    .withColumnRenamed("LightlyActiveMinutes","ActiveMinutes")\
    .withColumnRenamed("LightActiveDistance","ActiveDistance")\
    .filter('ActiveMinutes != 0 And ActiveDistance != 0')

df_sedentaryActive = df_dailyActivity_merged.select('Id','ActivityDate','SedentaryMinutes','SedentaryActiveDistance')\
    .withColumn("Type",lit("Sedentary"))\
    .withColumnRenamed("SedentaryMinutes","ActiveMinutes")\
    .withColumnRenamed("SedentaryActiveDistance","ActiveDistance")

df_activity = df_veryActive.union(
    df_fairlyActive.union(
        df_lightActive.union(
            df_sedentaryActive
        )
        )
    )

display(df_activity)

In [0]:
display(
    df_activity.filter('ActiveMinutes == 0 And ActiveDistance == 0').groupBy('Type').count()
)

Type,count
Sedentary,1


In [0]:
display(
    df_activity.filter('ActiveDistance != 0').groupBy('Type').count()
)

Type,count
VeryActive,527
Fairly,554
Light,855
Sedentary,82


In [0]:
display(
    df_activity.filter('ActiveDistance == 0').groupBy('Type').count()
)

Type,count
Sedentary,854


In [0]:
display(
    df_activity.filter('ActiveMinutes!= 0 And ActiveDistance != 0').groupBy('Type').count()
)

Type,count
VeryActive,527
Fairly,554
Light,855
Sedentary,82


In [0]:
y_1 = np.array(df_activity.filter('Type == "VeryActive" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_1 = np.array(df_activity.filter('Type == "VeryActive" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_2 = np.array(df_activity.filter('Type == "Fairly" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_2 = np.array(df_activity.filter('Type == "Fairly" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_3 = np.array(df_activity.filter('Type == "Light" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_3 = np.array(df_activity.filter('Type == "Light" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y = np.array(df_activity.filter('Type == "Sedentary" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x = np.array(df_activity.filter('Type == "Sedentary" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

plt.scatter(x_3, y_3, c='yellow')
plt.scatter(x_1, y_1, edgecolor='black')
plt.scatter(x_2, y_2, c='green')
plt.scatter(x, y, c='black')

plt.xlabel('ActiveDistance')
plt.ylabel('ActiveMinutes')
plt.legend()

plt.show()


There is people have 0 active distance but this should not be sedentary, this should be GYM or stationary activity.
1. Anything between 60 mins and 400 mins i will keep it as Gym / Stationary activity
2. Anything greater than 400 mins or less that 60 mins will be keep as sedentary

In [0]:
df_gym = df_sedentaryActive.filter('ActiveMinutes>= 60 And ActiveMinutes <= 400').drop("Type").withColumn("Type",lit("Gym")).select('*')

In [0]:
df_sedentaryActive = df_sedentaryActive.filter('ActiveMinutes< 60 Or ActiveMinutes>600 ').select('*')

In [0]:
df_activity = df_veryActive.union(
    df_fairlyActive.union(
        df_lightActive.union(
            df_sedentaryActive
        ). union(
            df_gym
        )
        )
    )

display(df_activity)

In [0]:
y_1 = np.array(df_activity.filter('Type == "VeryActive" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_1 = np.array(df_activity.filter('Type == "VeryActive" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_2 = np.array(df_activity.filter('Type == "Fairly" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_2 = np.array(df_activity.filter('Type == "Fairly" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_3 = np.array(df_activity.filter('Type == "Light" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_3 = np.array(df_activity.filter('Type == "Light" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y = np.array(df_activity.filter('Type == "Sedentary" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x = np.array(df_activity.filter('Type == "Sedentary" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_4 = np.array(df_activity.filter('Type == "Gym" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_4 = np.array(df_activity.filter('Type == "Gym" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

plt.scatter(x_3, y_3, c='yellow')
plt.scatter(x_1, y_1, edgecolor='black')
plt.scatter(x_2, y_2, c='green')
plt.scatter(x, y, c='black')
plt.scatter(x_4, y_4, c='purple')

plt.xlabel('ActiveDistance')
plt.ylabel('ActiveMinutes')
plt.legend()

plt.show()


In [0]:
df_activity.describe().show()

+-------+-------------------+------------------+------------------+----------+
|summary|                 Id|     ActiveMinutes|    ActiveDistance|      Type|
+-------+-------------------+------------------+------------------+----------+
|  count|               2800|              2800|              2800|      2800|
|   mean|4.992093813194643E9| 393.3510714285714| 1.817103570671752|      NULL|
| stddev|2.434044994498934E9|460.99851158904175|2.3113825576832325|      NULL|
|    min|         1503960366|               0.0|               0.0|    Fairly|
|    max|         8877689391|            1440.0|  21.9200000762939|VeryActive|
+-------+-------------------+------------------+------------------+----------+



In [0]:
df_activity.groupBy('Type').agg(
    functions.min('ActiveMinutes').alias('min_mins'),
    functions.max('ActiveMinutes').alias('min_max'),
    functions.avg('ActiveMinutes').alias('min_avg'),
    functions.count('ActiveMinutes').alias('min_count')
).show()

+----------+--------+-------+------------------+---------+
|      Type|min_mins|min_max|           min_avg|min_count|
+----------+--------+-------+------------------+---------+
|VeryActive|     1.0|  210.0|37.297912713472485|      527|
|    Fairly|     1.0|  143.0|  22.9927797833935|      554|
|     Light|     1.0|  518.0|211.98011695906433|      855|
| Sedentary|     0.0| 1440.0|1036.7587822014052|      854|
|       Gym|   111.0|  380.0|             235.4|       10|
+----------+--------+-------+------------------+---------+



Interesting! 
1. Looks like we have a lot of people that has 0 active Minutes but had a lot of active Distance. This could be an error from the sensor or maybe people that moved in different ways that dont generates calories expenditure. 

2. -> We may need to remove this dataoints if we want to check a relation of distance and calories. But it can be people in the Gym or not going outside where we can track distance.

3. Looks like people spend less time on Fairly Active and do less distance than in the Very Active or Light Active. 
It will give us a better picture if we can clasify our users depending on their activiy level.

For that we need to check if there is anyone that is 100% not sedentary. It would be a useless data point for us.

In [0]:
def _add(x,y,z):
    return x+y+z

_add= udf(_add,DoubleType())


display(
    df_dailyActivity_merged.groupBy('Id').sum('VeryActiveMinutes','FairlyActiveMinutes','LightlyActiveMinutes','SedentaryMinutes')\
        .select('Id',_add('sum(VeryActiveMinutes)', 'sum(FairlyActiveMinutes)', 'sum(LightlyActiveMinutes)'))\
        .withColumnRenamed("_add(sum(VeryActiveMinutes), sum(FairlyActiveMinutes), sum(LightlyActiveMinutes))", "Active_mins")\
        .sort(functions.asc("Active_mins"))
)

Id,Active_mins
4057192912,421.0
1927972279,1261.0
6775888955,1715.0
4020332650,2712.0
8792009665,2807.0
8253242879,2883.0
1844505072,3623.0
2347167796,5158.0
1624580081,5207.0
8583815059,5275.0


good, we dont have any data point where we have only sedentary data

Now we can apply a simple rule of thumb. Further exploration on the lightActive must be necessary


If:
 VeryActiveMinutes > fairlyActiveMinutes || lightActiveMinutes
Then:
  veryActive

If:
  fairlyActiveMinutes > VeryActiveMinutes || lightActiveMinutes
Then:
  fairlyActive

Else :
  lightActive 

In [0]:
display(df_dailyActivity_merged.select("Id","VeryActiveMinutes","FairlyActiveMinutes","LightlyActiveMinutes"))

In [0]:
display(
    df_dailyActivity_merged.filter("Id == 1503960366").select('ActivityDate','VeryActiveMinutes','FairlyActiveMinutes','LightlyActiveMinutes')
)

ActivityDate,VeryActiveMinutes,FairlyActiveMinutes,LightlyActiveMinutes
2016-04-12,25.0,13.0,328.0
2016-04-13,21.0,19.0,217.0
2016-04-14,30.0,11.0,181.0
2016-04-15,29.0,34.0,209.0
2016-04-16,36.0,10.0,221.0
2016-04-17,38.0,20.0,164.0
2016-04-18,42.0,16.0,233.0
2016-04-19,50.0,31.0,264.0
2016-04-20,28.0,12.0,205.0
2016-04-21,19.0,8.0,211.0


## MY CLASSIFICATION IS NOT GOOD I NEED A BETTER WAY TO CLASSIFY PEOPLE 

In [0]:
def _classify(v,m,l):
    if v > m: 
        return "VeryActive"
    if m>v:
        return "FairlyActive"
    if l > v+m:
        return "LightActive"
    if v + m + l == 0:
        return "Sedentary"
    
_classify = udf(_classify, StringType())

display(
    df_dailyActivity_merged.select(
        'Id',
        'VeryActiveMinutes',
        'FairlyActiveMinutes',
        'LightlyActiveMinutes', 
        'Calories',
        'TotalSteps',
        'TotalDistance',
        _classify('VeryActiveMinutes','FairlyActiveMinutes','LightlyActiveMinutes'))\
    .withColumnRenamed('_classify(VeryActiveMinutes, FairlyActiveMinutes, LightlyActiveMinutes)','day_Category')\
    
)

In [0]:
df_users = df_dailyActivity_merged.select(
        'Id',
        'ActivityDate',
        'VeryActiveMinutes',
        'FairlyActiveMinutes',
        'LightlyActiveMinutes', 
        'Calories',
        'TotalSteps',
        'TotalDistance',
        _classify('VeryActiveMinutes','FairlyActiveMinutes','LightlyActiveMinutes'))\
    .withColumnRenamed('_classify(VeryActiveMinutes, FairlyActiveMinutes, LightlyActiveMinutes)','day_Category')

In [0]:
df_cal = df_users.groupBy('day_Category').avg('Calories')
df_step = df_users.groupBy('day_Category').avg('TotalSteps')
df_dist = df_users.groupBy('day_Category').avg('TotalDistance')

df_agg_day_Category = df_cal.join(df_step, 'day_Category').join(df_dist, 'day_Category').sort(functions.asc('avg(Calories)'))

df_agg_day_Category.show()

+------------+------------------+------------------+------------------+
|day_Category|     avg(Calories)|   avg(TotalSteps)|avg(TotalDistance)|
+------------+------------------+------------------+------------------+
| lightActive|1995.4788732394366|   4461.1569416499|3.1115090519047417|
|fairlyActive| 2480.055944055944|  9505.65034965035| 6.800979010708682|
|  veryActive| 2766.864864864865|12172.746621621622| 8.923513504298958|
+------------+------------------+------------------+------------------+



In [0]:
figure, axis = plt.subplots(3, 1) 

x_1 = np.array(df_agg_day_Category.select('avg(Calories)').rdd.flatMap(lambda x: x).collect())
x_2 = np.array(df_agg_day_Category.select('avg(TotalSteps)').rdd.flatMap(lambda x: x).collect())
x_3 = np.array(df_agg_day_Category.select('avg(TotalDistance)').rdd.flatMap(lambda x: x).collect())


axis[2].scatter(x_2, x_3, edgecolor='black')
axis[1].scatter(x_1, x_3, edgecolor='black')
axis[0].scatter(x_1, x_2, c='yellow')

plt.show()


Nice, we aggree that the more time of activities / distance you walk, the more you burn calories 
Lets try bringing the information about sleeping and weight loss

In [0]:
df_sleepDay_merged.show(5)

+----------+-------------------+-----------------+------------------+--------------+
|        Id|           SleepDay|TotalSleepRecords|TotalMinutesAsleep|TotalTimeInBed|
+----------+-------------------+-----------------+------------------+--------------+
|1503960366|2016-04-12 00:00:00|              1.0|             327.0|         346.0|
|1503960366|2016-04-13 00:00:00|              2.0|             384.0|         407.0|
|1503960366|2016-04-15 00:00:00|              1.0|             412.0|         442.0|
|1503960366|2016-04-16 00:00:00|              2.0|             340.0|         367.0|
|1503960366|2016-04-17 00:00:00|              1.0|             700.0|         712.0|
+----------+-------------------+-----------------+------------------+--------------+
only showing top 5 rows



In [0]:
df_sleepDay_merged.groupBy('Id','SleepDay').count().sort(functions.desc('count')).head(5)

[Row(Id='8378563200', SleepDay=datetime.datetime(2016, 4, 25, 0, 0), count=2),
 Row(Id='4388161847', SleepDay=datetime.datetime(2016, 5, 5, 0, 0), count=2),
 Row(Id='4702921684', SleepDay=datetime.datetime(2016, 5, 7, 0, 0), count=2),
 Row(Id='4388161847', SleepDay=datetime.datetime(2016, 4, 27, 0, 0), count=1),
 Row(Id='3977333714', SleepDay=datetime.datetime(2016, 5, 2, 0, 0), count=1)]

So we have duplicate entries, i will average them so i can merge them with the user data

In [0]:
display(df_sleepDay_merged.groupBy('Id','SleepDay').agg(
    functions.avg('TotalMinutesAsleep').alias('TotalMinutesAsleep'), 
    functions.avg('TotalTimeInBed').alias('TotalTimeInBed')
    ))

lets make the date columns match the same format from the df_users

In [0]:
df_sleepDay_merged = df_sleepDay_merged.withColumn(
    "SleepDay",
    to_date("SleepDay", "M/d/y")
)

In [0]:
df_activity.select('Id').distinct().count()

33

In [0]:
df_users.select('Id').distinct().count()

33

In [0]:
df_sleepDay_merged.select('Id').distinct().count()

24

Lets see if we can keep the same Ids everywhere

In [0]:
df_sleepDay_merged.join(
    df_activity, 'Id'
).select('Id').distinct().count()

24

In [0]:
df_sleepDay_merged.join(
    df_users, 'Id'
).select('Id').distinct().count()

24

In [0]:
df_sleepDay_merged.select('Id').distinct().count()

24

Good lets keep only 24 Ids in everything

In [0]:
df_ids = df_sleepDay_merged.select('Id').distinct()
display(df_ids)

Id
4388161847
5553957443
1503960366
7007744171
5577150313
8378563200
3977333714
1644430081
1927972279
6962181067


In [0]:
df_activity = df_activity.join(df_ids,'Id')
df_users = df_users.join(df_ids,'Id')

In [0]:
df_cal = df_users.groupBy('day_Category').avg('Calories')
df_step = df_users.groupBy('day_Category').avg('TotalSteps')
df_dist = df_users.groupBy('day_Category').avg('TotalDistance')

df_agg_day_Category = df_cal.join(df_step, 'day_Category').join(df_dist, 'day_Category').sort(functions.asc('avg(Calories)'))

df_agg_day_Category.show()

+------------+------------------+------------------+-------------------+
|day_Category|     avg(Calories)|   avg(TotalSteps)| avg(TotalDistance)|
+------------+------------------+------------------+-------------------+
|   Sedentary|            1743.5|255.35714285714286|0.18542857170104984|
| LightActive| 1966.374449339207| 4394.638766519824| 3.0100440506450012|
|FairlyActive|2399.3602150537636|  8992.02688172043| 6.3541397803252755|
|  VeryActive|2711.1004366812226|11209.183406113538|  8.079563321244768|
+------------+------------------+------------------+-------------------+



Nice, now we have all data aggregated on df_users, and the activity log created. It may be important to create another dataframe were we classify our users based on the amount of activity before doing additional plots

In [0]:
df_activity.show(5)

+----------+------------+-------------+----------------+----------+
|        Id|ActivityDate|ActiveMinutes|  ActiveDistance|      Type|
+----------+------------+-------------+----------------+----------+
|1503960366|  2016-04-12|         25.0|1.87999999523163|VeryActive|
|1503960366|  2016-04-13|         21.0|1.57000005245209|VeryActive|
|1503960366|  2016-04-14|         30.0|2.44000005722046|VeryActive|
|1503960366|  2016-04-15|         29.0|2.14000010490417|VeryActive|
|1503960366|  2016-04-16|         36.0|2.71000003814697|VeryActive|
+----------+------------+-------------+----------------+----------+
only showing top 5 rows



In [0]:
df_sleepDay_merged.show(5)

+----------+----------+-----------------+------------------+--------------+
|        Id|  SleepDay|TotalSleepRecords|TotalMinutesAsleep|TotalTimeInBed|
+----------+----------+-----------------+------------------+--------------+
|1503960366|2016-04-12|              1.0|             327.0|         346.0|
|1503960366|2016-04-13|              2.0|             384.0|         407.0|
|1503960366|2016-04-15|              1.0|             412.0|         442.0|
|1503960366|2016-04-16|              2.0|             340.0|         367.0|
|1503960366|2016-04-17|              1.0|             700.0|         712.0|
+----------+----------+-----------------+------------------+--------------+
only showing top 5 rows



In [0]:
df_users.show(5)

+----------+-----------------+-------------------+--------------------+--------+----------+----------------+------------+
|        Id|VeryActiveMinutes|FairlyActiveMinutes|LightlyActiveMinutes|Calories|TotalSteps|   TotalDistance|day_Category|
+----------+-----------------+-------------------+--------------------+--------+----------+----------------+------------+
|1503960366|             25.0|               13.0|               328.0|  1985.0|   13162.0|             8.5|  VeryActive|
|1503960366|             21.0|               19.0|               217.0|  1797.0|   10735.0|6.96999979019165|  VeryActive|
|1503960366|             30.0|               11.0|               181.0|  1776.0|   10460.0|6.73999977111816|  VeryActive|
|1503960366|             29.0|               34.0|               209.0|  1745.0|    9762.0|6.28000020980835|FairlyActive|
|1503960366|             36.0|               10.0|               221.0|  1863.0|   12669.0|8.15999984741211|  VeryActive|
+----------+------------

In [0]:
df_heartrate_seconds_merged = df_heartrate_seconds_merged.withColumn(
    "Time",
    to_date("Time", "M/d/y")
)

In [0]:
df_heartrate_seconds_merged.select('Id').distinct().count()

14

df_heartrate_seconds_merged contains only 14 users. Im not sure if we should bring that data into any of the other df's

In [0]:
df_hr_agg_id_time = df_heartrate_seconds_merged.groupBy('Id','Time').agg(
    functions.avg('Value').alias('Avg_HR'), 
    functions.max('Value').alias('Max_HR'), 
    functions.min('Value').alias('Min_HR')
    )

In [0]:
display(df_hr_agg_id_time)

In [0]:
df_users_agg = df_users.groupBy('Id','day_Category').count().sort(functions.asc('Id'))
df_users_agg.show(5)


+----------+------------+-----+
|        Id|day_Category|count|
+----------+------------+-----+
|1503960366|  VeryActive|   27|
|1503960366|FairlyActive|    3|
|1644430081|FairlyActive|   14|
|1644430081|  VeryActive|    7|
|1644430081| LightActive|    9|
+----------+------------+-----+
only showing top 5 rows



Lets create a classification for each user

In [0]:
df_main = df_dailyActivity_merged.groupBy('Id').agg(
        functions.avg('VeryActiveMinutes').alias('avg_VeryActiveMinutes'),
        functions.avg('FairlyActiveMinutes').alias('avg_FairlyActiveMinutes'),
        functions.avg('LightlyActiveMinutes').alias('avg_LightlyActiveMinutes')
    )

In [0]:
df_summary_activity =  df_activity.groupBy('Type').agg(functions.avg('ActiveMinutes').alias('avg_ActiveMinutes'))

avg_very = df_summary_activity.filter("Type == 'VeryActive'").select('avg_ActiveMinutes').collect()[0][0]
avg_vairly = df_summary_activity.filter("Type == 'Fairly'").select('avg_ActiveMinutes').collect()[0][0]
avg_light = df_summary_activity.filter("Type == 'Light'").select('avg_ActiveMinutes').collect()[0][0]

In [0]:
avg_very

37.8391959798995

In [0]:
avg_vairly

24.660287081339714

In [0]:
avg_light

205.3135725429017

In [0]:
def _classify(x_m,y_m,z_m):

    if x_m > 21.151685393258425:
        return "veryActive" 
    if y_m > 14.487359550561798:
        return "fairlyActive"
    if z_m > 184.84129213483146 :
        return "lightActive"
    
    return "lightActive"

_classify= udf(_classify,StringType())

In [0]:
display(
    df_main.select('avg_VeryActiveMinutes','avg_FairlyActiveMinutes','avg_LightlyActiveMinutes',\
                   _classify('avg_VeryActiveMinutes','avg_FairlyActiveMinutes','avg_LightlyActiveMinutes')
    ).withColumnRenamed('_classify(avg_VeryActiveMinutes, avg_FairlyActiveMinutes, avg_LightlyActiveMinutes)','Type')\
        .groupby('Type').count()
)

Type,count
veryActive,12
lightActive,15
fairlyActive,6


In [0]:
df_main = df_main.select('Id','avg_VeryActiveMinutes','avg_FairlyActiveMinutes','avg_LightlyActiveMinutes',\
                   _classify('avg_VeryActiveMinutes','avg_FairlyActiveMinutes','avg_LightlyActiveMinutes')
    ).withColumnRenamed('_classify(avg_VeryActiveMinutes, avg_FairlyActiveMinutes, avg_LightlyActiveMinutes)','Type_user')

df_main.show(5)


+----------+---------------------+-----------------------+------------------------+-----------+
|        Id|avg_VeryActiveMinutes|avg_FairlyActiveMinutes|avg_LightlyActiveMinutes|  Type_user|
+----------+---------------------+-----------------------+------------------------+-----------+
|4388161847|   23.161290322580644|      20.35483870967742|       229.3548387096774| veryActive|
|5553957443|   23.419354838709676|                   13.0|      206.19354838709677| veryActive|
|2873212765|   14.096774193548388|      6.129032258064516|                   308.0|lightActive|
|1503960366|                 40.0|                   19.8|      227.26666666666668| veryActive|
|7007744171|    31.03846153846154|      16.26923076923077|       280.7307692307692| veryActive|
+----------+---------------------+-----------------------+------------------------+-----------+
only showing top 5 rows



In [0]:
df_main.groupBy('Type_user').count().show()

+------------+-----+
|   Type_user|count|
+------------+-----+
|  veryActive|   12|
| lightActive|   15|
|fairlyActive|    6|
+------------+-----+



In [0]:
df_activity.ActivityDate

Column<'ActivityDate'>

In [0]:
df_activity.groupby('Type')\
        .agg(
            functions.min('ActivityDate').alias('min'),
            functions.max('ActivityDate').alias('max')
        ).show()
        


+----------+----------+----------+
|      Type|       min|       max|
+----------+----------+----------+
|VeryActive|2016-04-12|2016-05-12|
|    Fairly|2016-04-12|2016-05-12|
|     Light|2016-04-12|2016-05-12|
| Sedentary|2016-04-12|2016-05-12|
|       Gym|2016-04-30|2016-05-12|
+----------+----------+----------+



## Ok now we can start telling the story

In [0]:
df_activity.groupBy('Type').count().show()

+----------+-----+
|      Type|count|
+----------+-----+
|VeryActive|  398|
|    Fairly|  418|
|     Light|  641|
| Sedentary|  630|
|       Gym|   10|
+----------+-----+



In [0]:
y_1 = np.array(df_activity.filter('Type == "VeryActive" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_1 = np.array(df_activity.filter('Type == "VeryActive" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_2 = np.array(df_activity.filter('Type == "Fairly" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_2 = np.array(df_activity.filter('Type == "Fairly" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_3 = np.array(df_activity.filter('Type == "Light" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_3 = np.array(df_activity.filter('Type == "Light" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y = np.array(df_activity.filter('Type == "Sedentary" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x = np.array(df_activity.filter('Type == "Sedentary" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

y_4 = np.array(df_activity.filter('Type == "Gym" ').select('ActiveMinutes').rdd.flatMap(lambda x: x).collect())
x_4 = np.array(df_activity.filter('Type == "Gym" ').select('ActiveDistance').rdd.flatMap(lambda x: x).collect())

plt.scatter(x_3, y_3, c='yellow')
plt.scatter(x_1, y_1, edgecolor='black')
plt.scatter(x_2, y_2, c='green')
plt.scatter(x, y, c='black')
plt.scatter(x_4, y_4, c='purple')

plt.xlabel('ActiveDistance')
plt.ylabel('ActiveMinutes')
plt.legend()

plt.show()


In [0]:
df_activity.show(5)

+----------+------------+-------------+----------------+----------+
|        Id|ActivityDate|ActiveMinutes|  ActiveDistance|      Type|
+----------+------------+-------------+----------------+----------+
|1503960366|  2016-04-12|         25.0|1.87999999523163|VeryActive|
|1503960366|  2016-04-13|         21.0|1.57000005245209|VeryActive|
|1503960366|  2016-04-14|         30.0|2.44000005722046|VeryActive|
|1503960366|  2016-04-15|         29.0|2.14000010490417|VeryActive|
|1503960366|  2016-04-16|         36.0|2.71000003814697|VeryActive|
+----------+------------+-------------+----------------+----------+
only showing top 5 rows



In [0]:
df_main.show(5)

+----------+---------------------+-----------------------+------------------------+-----------+
|        Id|avg_VeryActiveMinutes|avg_FairlyActiveMinutes|avg_LightlyActiveMinutes|  Type_user|
+----------+---------------------+-----------------------+------------------------+-----------+
|4388161847|   23.161290322580644|      20.35483870967742|       229.3548387096774| veryActive|
|5553957443|   23.419354838709676|                   13.0|      206.19354838709677| veryActive|
|2873212765|   14.096774193548388|      6.129032258064516|                   308.0|lightActive|
|1503960366|                 40.0|                   19.8|      227.26666666666668| veryActive|
|7007744171|    31.03846153846154|      16.26923076923077|       280.7307692307692| veryActive|
+----------+---------------------+-----------------------+------------------------+-----------+
only showing top 5 rows



In [0]:
display(
df_activity.join(
    df_main.filter('Type_user = "veryActive"'),
    'Id'
    ).groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_ActiveMinutes'),
        functions.std('ActiveMinutes').alias('std_ActiveMinutes')
    )

)

Type,avg_ActiveMinutes,std_ActiveMinutes
VeryActive,55.31718061674009,39.53989600500615
Fairly,21.489082969432317,14.593501123190402
Light,201.71052631578948,71.57544404797879
Sedentary,879.7280334728033,220.45519488167696
Gym,212.0,95.80187889597993


In [0]:
df_heartrate_seconds_merged.describe().show()

+-------+--------------------+------------------+
|summary|                  Id|             Value|
+-------+--------------------+------------------+
|  count|             2483658|           2483658|
|   mean| 5.513764629269958E9| 77.32842363964765|
| stddev|1.9502237609635072E9|19.404499669443858|
|    min|          2022484408|              36.0|
|    max|          8877689391|             203.0|
+-------+--------------------+------------------+



In [0]:

x = df_heartrate_seconds_merged.join(
    df_main.filter('Type_user = "lightActive"'),
    'Id'
    ).groupBy('Id','Time').agg(
        functions.avg('Value').alias('avg_Value'),
        functions.std('Value').alias('std_Value')
    ).select('Id','Time','avg_Value','std_Value')


y = df_users.withColumnRenamed('ActivityDate','Time')


join_cond = [
    y.Time == x.Time, 
    y.Id == x.Id]

display(
    x.join(y, ['Time' , 'Id'] ).sort(functions.asc('day_Category'))

)

In [0]:

x = df_heartrate_seconds_merged.join(
    df_main.filter('Type_user = "fairlyActive"'),
    'Id'
    ).groupBy('Id','Time').agg(
        functions.avg('Value').alias('avg_Value'),
        functions.std('Value').alias('std_Value')
    ).select('Id','Time','avg_Value','std_Value')


y = df_users.withColumnRenamed('ActivityDate','Time')


join_cond = [
    y.Time == x.Time, 
    y.Id == x.Id]

display(
    x.join(y, ['Time' , 'Id'] ).sort(functions.asc('day_Category'))

)

Time,Id,avg_Value,std_Value,VeryActiveMinutes,FairlyActiveMinutes,LightlyActiveMinutes,Calories,TotalSteps,TotalDistance,day_Category
2016-04-14,6775888955,79.26807673389081,15.151810199946668,14.0,24.0,105.0,2507.0,5162.0,3.70000004768372,fairlyActive
2016-04-15,2347167796,75.68525687977399,13.722963268782276,1.0,24.0,284.0,2133.0,10465.0,6.92000007629395,fairlyActive
2016-04-13,6775888955,82.71974376572867,16.012564332480675,17.0,18.0,85.0,2400.0,4053.0,2.91000008583069,fairlyActive
2016-04-24,6775888955,109.78962536023054,8.76963406597715,11.0,18.0,11.0,2053.0,2153.0,1.53999996185303,fairlyActive
2016-04-17,6775888955,102.36886993603412,6.890657849622816,5.0,24.0,19.0,2067.0,2497.0,1.78999996185303,fairlyActive
2016-04-20,2347167796,78.18208496366685,15.758510144349977,11.0,43.0,269.0,2198.0,10999.0,7.26999998092651,fairlyActive
2016-04-13,2347167796,73.81290461804058,13.014520601143056,19.0,32.0,195.0,2038.0,10352.0,7.01000022888184,fairlyActive
2016-04-14,2347167796,72.57948213700426,10.277766133677726,1.0,48.0,206.0,2010.0,10129.0,6.69999980926514,fairlyActive
2016-05-06,6775888955,98.85751072961374,12.125979150457706,12.0,35.0,75.0,2496.0,4697.0,3.36999988555908,fairlyActive
2016-05-01,6775888955,107.13917050691244,12.408706667349511,9.0,34.0,50.0,2319.0,2487.0,1.77999997138977,fairlyActive


Databricks visualization. Run in Databricks to view.

In [0]:

x = df_heartrate_seconds_merged.join(
    df_main.filter('Type_user = "veryActive"'),
    'Id'
    ).groupBy('Id','Time').agg(
        functions.avg('Value').alias('avg_Value'),
        functions.std('Value').alias('std_Value')
    ).select('Id','Time','avg_Value','std_Value')


y = df_users.withColumnRenamed('ActivityDate','Time')


join_cond = [
    y.Time == x.Time, 
    y.Id == x.Id]

display(
    x.join(y, ['Time' , 'Id'] ).sort(functions.asc('day_Category'))

)

In [0]:

x_1= df_activity.join(
    df_main.filter('Type_user = "veryActive"'),
    'Id'
    ).groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_ActiveMinutes'),
        functions.std('ActiveMinutes').alias('std_ActiveMinutes')
    ).filter("Type != 'Sedentary'")

y = np.array(
    x_1\
    .select('avg_ActiveMinutes')\
    .rdd.flatMap(lambda x: x)\
    .collect()
)


x = np.array(
    x_1\
    .select('Type')\
    .rdd.flatMap(lambda x: x)\
    .collect()
)

c = np.array(
    x_1\
    .select('std_ActiveMinutes')\
    .rdd.flatMap(lambda x: x)\
    .collect()
) 

plt.bar(x, y)
plt.errorbar(x, y, yerr=c, fmt="o", color="r")


plt.xlabel('ActiveDistance')
plt.ylabel('ActiveMinutes')
plt.ylim(0,400)

plt.show()


In [0]:
def _prct_veryactive(x):
    return x / 12

_prct_veryactive = udf(_prct_veryactive, DoubleType())

def _prct_veryactive_2(x):
    return x / 12

_prct_veryactive_2 = udf(_prct_veryactive_2, DoubleType())

In [0]:
display( 
    
    df_main.filter('Type_user = "veryActive"').join(
        df_activity.filter("ActiveDistance == 0"), 'Id').groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_active_minutes'),
        functions.count('*').alias('count')
    ).select('Type', _prct_veryactive('count'), _prct_veryactive_2('avg_active_minutes')
             
             ).withColumnRenamed('_prct_veryactive(count)', 'workout by person')\
                .withColumnRenamed('_prct_veryactive_2(avg_active_minutes)', 'Active Mins by person')
)

Type,workout by person,Active Mins by person
Sedentary,19.666666666666668,73.38735875706215
Gym,0.4166666666666667,17.666666666666668


In [0]:
display( 
    
    df_main.filter('Type_user = "veryActive"').join(
        df_activity.filter("ActiveDistance > 0"), 'Id').groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_active_minutes'),
        functions.count('*').alias('count')
    ).select('Type','count', _prct_veryactive('count'), _prct_veryactive_2('avg_active_minutes')
             
             ).withColumnRenamed('_prct_veryactive(count)', 'workout by person')\
                .withColumnRenamed('_prct_veryactive_2(avg_active_minutes)', 'Active Mins by person')
)

Type,count,workout by person,Active Mins by person
VeryActive,227,18.916666666666668,4.609765051395008
Fairly,229,19.08333333333333,1.7907569141193596
Light,266,22.166666666666668,16.80921052631579
Sedentary,3,0.25,67.27777777777779


In [0]:

x_1= df_activity.join(
    df_main.filter('Type_user = "fairlyActive"'),
    'Id'
    ).groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_ActiveMinutes'),
        functions.std('ActiveMinutes').alias('std_ActiveMinutes')
    ).filter("Type != 'Sedentary'")

y = np.array(
    x_1\
    .select('avg_ActiveMinutes')\
    .rdd.flatMap(lambda x: x)\
    .collect()
)


x = np.array(
    x_1\
    .select('Type')\
    .rdd.flatMap(lambda x: x)\
    .collect()
)


plt.bar(x, y)
#plt.errorbar(x, y, yerr=c, fmt="o", color="r")

plt.xlabel('ActiveDistance')
plt.ylabel('ActiveMinutes')
plt.ylim(0,400)

plt.show()


In [0]:
def _prct_fairlyActive(x):
    return x / 5

def _prct_fairlyActive_2(x):
    return x / 5

_prct_fairlyActive = udf(_prct_fairlyActive, DoubleType())
_prct_fairlyActive_2 = udf(_prct_fairlyActive_2, DoubleType())

In [0]:
display( 
    
    df_main.filter('Type_user = "fairlyActive"').join(
        df_activity.filter("ActiveDistance > 0"), 'Id').groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_active_minutes'),
        functions.count('*').alias('count')
    ).select('Type', _prct_fairlyActive('count'), _prct_fairlyActive_2('avg_active_minutes')
             
             ).withColumnRenamed('_prct_fairlyActive(count)', 'workout by person')\
                .withColumnRenamed('_prct_fairlyActive_2(avg_active_minutes)', 'Active Mins by person')
)

Type,workout by person,Active Mins by person
VeryActive,19.2,3.2125
Fairly,20.0,8.068000000000001
Light,25.0,37.6768
Sedentary,2.0,241.18


In [0]:
display( 
    
    df_main.filter('Type_user = "fairlyActive"').join(
        df_activity.filter("ActiveDistance == 0"), 'Id').groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_active_minutes'),
        functions.count('*').alias('count')
    ).select('Type', _prct_fairlyActive('count'), _prct_fairlyActive_2('avg_active_minutes')
             
             ).withColumnRenamed('_prct_fairlyActive(count)', 'workout by person')\
                .withColumnRenamed('_prct_fairlyActive_2(avg_active_minutes)', 'Active Mins by person')
)

Type,workout by person,Active Mins by person
Sedentary,22.6,190.59469026548672
Gym,0.2,48.2


In [0]:

x_1= df_activity.join(
    df_main.filter('Type_user = "lightActive"'),
    'Id'
    ).groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_ActiveMinutes'),
        functions.std('ActiveMinutes').alias('std_ActiveMinutes')
    ).filter("Type != 'Sedentary'")

y = np.array(
    x_1\
    .select('avg_ActiveMinutes')\
    .rdd.flatMap(lambda x: x)\
    .collect()
)


x = np.array(
    x_1\
    .select('Type')\
    .rdd.flatMap(lambda x: x)\
    .collect()
)

c = np.array(
    x_1\
    .select('std_ActiveMinutes')\
    .rdd.flatMap(lambda x: x)\
    .collect()
) 

plt.bar(x, y)
plt.errorbar(x, y, yerr=c, fmt="o", color="r")


plt.xlabel('ActiveDistance')
plt.ylabel('ActiveMinutes')
plt.ylim(0,400)
plt.legend()

plt.show()


In [0]:
def _prct_lightActive(x):
    return x / 15

def _prct_lightActive_2(x):
    return x / 15

_prct_lightActive = udf(_prct_lightActive, DoubleType())
_prct_lightActive_2 = udf(_prct_lightActive_2, DoubleType())

In [0]:
display( 
    
    df_main.filter('Type_user = "lightActive"').join(
        df_activity.filter("ActiveDistance == 0"), 'Id').groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_active_minutes'),
        functions.count('*').alias('count')
    ).select('Type', _prct_lightActive('count'), _prct_lightActive_2('avg_active_minutes')
             
             ).withColumnRenamed('_prct_lightActive(count)', 'workout by person')\
                .withColumnRenamed('_prct_lightActive_2(avg_active_minutes)', 'Active Mins by person')
)

Type,workout by person,Active Mins by person
Sedentary,17.6,73.09671717171717
Gym,0.2666666666666666,17.55


In [0]:
display( 
    
    df_main.filter('Type_user = "lightActive"').join(
        df_activity.filter("ActiveDistance > 0"), 'Id').groupBy('Type').agg(
        functions.avg('ActiveMinutes').alias('avg_active_minutes'),
        functions.count('*').alias('count')
    ).select('Type', _prct_lightActive('count'), _prct_lightActive_2('avg_active_minutes')
             
             ).withColumnRenamed('_prct_lightActive(count)', 'workout by person')\
                .withColumnRenamed('_prct_lightActive_2(avg_active_minutes)', 'Active Mins by person')
)

Type,workout by person,Active Mins by person
VeryActive,5.0,0.8542222222222222
Fairly,5.933333333333334,1.0134831460674156
Light,16.666666666666668,14.507466666666666
Sedentary,0.2666666666666666,58.25


Lets create an additional views relating Hearthrate and sleep

In [0]:
display( 
    
    df_main.filter('Type_user = "lightActive"').join(
        df_heartrate_seconds_merged, 'Id').describe().select('Summary','Value').withColumnRenamed('Value','lightActive').join(
            df_main.filter('Type_user = "fairlyActive"').join(
                df_heartrate_seconds_merged, 'Id').describe().select('Summary','Value').withColumnRenamed('Value','fairlyActive').join(
                    df_main.filter('Type_user = "veryActive"').join(
                        df_heartrate_seconds_merged, 'Id').describe().select('Summary','Value').withColumnRenamed('Value','veryActive'),"Summary"
        ),"Summary"
    )
)
    


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-581308150446167>, line 3[0m
[1;32m      1[0m display( 
[1;32m      2[0m     
[0;32m----> 3[0m     [43mdf_main[49m[38;5;241m.[39mfilter([38;5;124m'[39m[38;5;124mType_user = [39m[38;5;124m"[39m[38;5;124mlightActive[39m[38;5;124m"[39m[38;5;124m'[39m)[38;5;241m.[39mjoin(
[1;32m      4[0m         df_heartrate_seconds_merged, [38;5;124m'[39m[38;5;124mId[39m[38;5;124m'[39m)[38;5;241m.[39mdescribe()[38;5;241m.[39mselect([38;5;124m'[39m[38;5;124mSummary[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mValue[39m[38;5;124m'[39m)[38;5;241m.[39mwithColumnRenamed([38;5;124m'[39m[38;5;124mValue[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mlightActive[39m[38;5;124m'[39m)[38;5;241m.[39mjoin(
[1;32m      5[0m             df_main[38;5;241m.[39mfilter(

In [0]:
df_sleepDay_merged.describe().show()

+-------+--------------------+------------------+------------------+------------------+
|summary|                  Id| TotalSleepRecords|TotalMinutesAsleep|    TotalTimeInBed|
+-------+--------------------+------------------+------------------+------------------+
|  count|                 413|               413|               413|               413|
|   mean| 5.000979403213075E9|  1.11864406779661|419.46731234866826| 458.6392251815981|
| stddev|2.0603601737440448E9|0.3455207153736887|118.34467933943277|127.10160720753305|
|    min|          1503960366|               1.0|              58.0|              61.0|
|    max|          8792009665|               3.0|             796.0|             961.0|
+-------+--------------------+------------------+------------------+------------------+



In [0]:
def mins_to_hour(x):
    return x/60

mins_to_hour = udf(mins_to_hour, DoubleType())

In [0]:
display(
    df_main.filter('Type_user = "lightActive"').join(
        df_sleepDay_merged, 'Id').describe()
)

summary,Id,avg_VeryActiveMinutes,avg_FairlyActiveMinutes,avg_LightlyActiveMinutes,Type_user,TotalSleepRecords,TotalMinutesAsleep,TotalTimeInBed
count,137.0,137.0,137.0,137.0,137,137.0,137.0,137.0
mean,4437669488.160584,3.106386310359909,4.344940676757051,206.817536806624,,1.1386861313868613,438.2408759124088,471.86131386861314
stddev,2021924095.226104,2.7922802426002216,4.658198398921412,73.38767031835002,,0.3674731233034123,128.5782850083859,149.6436542984382
min,1844505072.0,0.0967741935483871,0.2580645161290322,38.58064516129032,lightActive,1.0,59.0,65.0
max,8792009665.0,10.387096774193548,13.709677419354838,288.35714285714283,lightActive,3.0,750.0,961.0


In [0]:
display(
    df_main.filter('Type_user = "lightActive"').join(
        df_sleepDay_merged.select('Id',mins_to_hour('TotalMinutesAsleep')), 'Id')\
            .withColumnRenamed('mins_to_hour(TotalMinutesAsleep)','lightActive').describe().select('Summary','lightActive').join(

            df_main.filter('Type_user = "fairlyActive"').join(
                df_sleepDay_merged.select('Id',mins_to_hour('TotalMinutesAsleep')), 'Id')\
                .withColumnRenamed('mins_to_hour(TotalMinutesAsleep)','fairlyActive').describe().select('Summary','fairlyActive').join(

                df_main.filter('Type_user = "veryActive"').join(
                    df_sleepDay_merged.select('Id',mins_to_hour('TotalMinutesAsleep')), 'Id')\
                    .withColumnRenamed('mins_to_hour(TotalMinutesAsleep)','fairlyActive').describe().select('Summary','fairlyActive'),"Summary" ),

                "Summary"
            )
)

Summary,lightActive,fairlyActive,fairlyActive.1
count,137.0,78.0,198.0
mean,7.304014598540147,6.183974358974359,7.0925925925925855
stddev,2.142971416806432,1.8676934450087903,1.807241999510688
min,0.9833333333333332,1.9833333333333332,0.9666666666666668
max,12.5,13.266666666666667,12.916666666666666
