In [20]:
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
# from Ipython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats
%matplotlib inline

In [21]:
import pyspark
from pyspark.sql import Row, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import lit, desc, col, size, array_contains, isnan, udf, hour,\
    array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'

# Initialize a spark session
conf = pyspark.SparkConf().setMaster('local[*]')\
            .set('spark.executor.heartbeatInterval', 10000)\
            .set('spark.network.timeout', 10000)\
            .set('spark.core.connection.ack.wait.timeout', '3600')\
            .set('spark.executor.memory', MAX_MEMORY)\
            .set('spark.driver.memory', MAX_MEMORY)


def init_spark():
    spark = SparkSession\
        .builder\
        .appName('EDA Pyspark')\
        .config(conf=conf)\
        .getOrCreate()
    return spark

spark = init_spark()
df = spark.read.format('json').option('mode', 'DROPMALFORMED')\
            .load('endomondoHR.json')

                                                                                

# Overview of the Dataset

In [22]:
df.show(5)

+--------------------+------+--------------------+---------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------+
|            altitude|gender|          heart_rate|       id|            latitude|           longitude|               speed|           sport|           timestamp|                 url|  userId|
+--------------------+------+--------------------+---------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------+
|[41.6, 40.6, 40.6...|  male|[100, 111, 120, 1...|396826535|[60.1733487658202...|[24.6497704088687...|[6.8652, 16.4736,...|            bike|[1408898746, 1408...|https://www.endom...|10921915|
|[38.4, 39.0, 39.0...|  male|[100, 105, 111, 1...|392337038|[60.1732475962489...|[24.6498552337288...|[9.0792, 13.284, ...|            bike|[1408221682, 1408...|https://www.endom...|10921915|
|[76.4, 73.2, 72.4...|  male|[99, 105, 1

In [23]:
df.printSchema()

root
 |-- altitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- gender: string (nullable = true)
 |-- heart_rate: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- id: long (nullable = true)
 |-- latitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- longitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- speed: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sport: string (nullable = true)
 |-- timestamp: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- url: string (nullable = true)
 |-- userId: long (nullable = true)



In [24]:
df.describe().show()



+-------+-------+-------------------+--------+--------------------+------------------+
|summary| gender|                 id|   sport|                 url|            userId|
+-------+-------+-------------------+--------+--------------------+------------------+
|  count| 253020|             253020|  253020|              253020|            253020|
|   mean|   null|3.566244412926132E8|    null|                null| 4619648.939783417|
| stddev|   null|1.574845634895318E8|    null|                null|3932877.7296880507|
|    min| female|              99296|aerobics|https://www.endom...|                69|
|    max|unknown|          674008008|    yoga|https://www.endom...|          15481421|
+-------+-------+-------------------+--------+--------------------+------------------+



                                                                                

# Detect missing values and abnormal zeroes

In [25]:
string_cols = ['gender', 'sport', 'url']
numeric_cols = ['id', 'userId']
array_cols = ['altitude', 'heart_rate', 'latitude', 'longitude', 'speed', 'timestamp']

missing_values = {}
for index, column in enumerate(df.columns):
    if column in string_cols:
        # check string columns with none and null values
        missing_count = df.filter(
            col(column).eqNullSafe(None) | col(column).isNull()
        ).count()
        missing_values.update({column:missing_count})

    if column in numeric_cols:
        missing_count = df.where(
            col(column).isin([0, None, np.nan])
        ).count()
        missing_values.update({column:missing_count})

    if column in array_cols:
        missing_count = df.filter(
            array_contains(df[column], 0) | array_contains(df[column], np.nan)
        ).count()
        missing_values.update({column:missing_count})

missing_df = pd.DataFrame.from_dict([missing_values])
missing_df


                                                                                

Unnamed: 0,altitude,gender,heart_rate,id,latitude,longitude,speed,sport,timestamp,url,userId
0,40848,0,1280,0,113,113,7741,0,0,0,0


In [26]:
df.count()

                                                                                

253020

In [30]:
# We create new column to count the number of timestamps recorded per row/workout, named as 'PerWorkoutRecordCount' column
df = df.withColumn('PerWorkoutRecordCount', size(col('timestamp')))


# This part is writen as a function to be used again later
def user_activity_workout_summarize(df):
    user_count = format(df.select('userId').distinct().count(), ',d')
    workout_count = format(df.select('id').distinct().count(), ',d')
    activity_count = str(df.select('sport').distinct().count())
    sum_temp = df.agg(functions.sum('PerWorkoutRecordCount')).toPandas()
    total_records_count = format(sum_temp['sum(PerWorkoutRecordCount)'][0],',d')
    columns=['Users count', 'Activity types count','Workouts count', 'Total records count']
    data = [[user_count], [activity_count], [workout_count], [total_records_count]]
    sum_dict = {column: data[i] for i, column in enumerate(columns)}
    sum_df = pd.DataFrame.from_dict(sum_dict)[columns]
    gender_user_count = df.select('gender','userId').distinct().groupBy('gender').count().toPandas()
    gender_activities_count = df.groupBy('gender').count().toPandas()
    gender_user_activity_count = gender_user_count.join(
        gender_activities_count.set_index('gender'), on='gender'
        , how='inner', lsuffix='_gu'
    )
    gender_user_activity_count.columns = ['Gender', '# of users', 'Activities (workouts) count']
    
    return sum_df, gender_user_activity_count

sum_dfs= user_activity_workout_summarize(df)
print('\nOverall data set summary on users, workouts and number of records (pre-filtering):')
sum_dfs[0]





Overall data set summary on users, workouts and number of records (pre-filtering):


                                                                                

Unnamed: 0,Users count,Activity types count,Workouts count,Total records count
0,1104,49,253020,111541956


In [31]:
print('Number of workouts that have less than 50 records and statistic summary:')
removed_df = df.select('PerWorkoutRecordCount').where(df.PerWorkoutRecordCount < 50) \
               .toPandas().describe().astype(int)
removed_df.rename(columns = {'PerWorkoutRecordCount': 'PerWorkoutRecordCount <50'}, inplace=True)
removed_df.T

Number of workouts that have less than 50 records and statistic summary:


                                                                                

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
PerWorkoutRecordCount <50,5541,23,14,1,11,22,36,49


# Pyspark Lazy Evaluation
Lazy evaluation enhances the power of apache spark by reducing the execution time of the RDD operations. It maintains the lineage graph to remember the operations on RDD. . We can simply remember that all processing in pyspark is abstraction, when we want to return the results, actually, we tell spark what is the eventual answer you're interested and figures out the best way to get there. As a result, it optmizes the performance and achives fault tolerance. <br>
In order to see the result, we have to call Spark.collect()<br>
Normally, we would use df.take(k) or df.limit(k). But when k becomes a alarge number, these 2 ways above take a long time to complete the process. Because this syntax above does not utilize the power of pyspark processing (lazy evaluation). In order to quickly process, we should use df.collect()[:k]

In [32]:
ranked_sport_users_df = df.select(df.sport, df.userId)\
                                .distinct()\
                                .groupBy(df.sport)\
                                .count()\
                                .orderBy('count', ascending=False)

# Top 5 workout types
highest_sport_users_df = ranked_sport_users_df.limit(5).toPandas()

# Rename column count to users count
highest_sport_users_df.rename(columns={'count': 'Users count'}, inplace=True)

# calculate total users
total_sports_users = ranked_sport_users_df.groupBy().sum().collect()[0][0]


                                                                                

In [33]:
ranked_sport_users_df.collect()[:5]

                                                                                

[Row(sport='run', count=865),
 Row(sport='bike', count=794),
 Row(sport='mountain bike', count=336),
 Row(sport='bike (transport)', count=252),
 Row(sport='walk', count=209)]

# Exploratory Data Analysis