In [2]:
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib
from mpl_toolkits.mplot3d import Axes3D
import math
from IPython.core.interactiveshell import InteractiveShell
from datetime import *
import statistics as stats

InteractiveShell.ast_node_interactivity = 'all'
%matplotlib inline


## pyspark
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row, DataFrame, SparkSession, SQLContext, functions
from pyspark.sql.functions import lit, desc, col, size, array_contains, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import *

MAX_MEMORY = '15G'
# initialize a spark session
conf = pyspark.SparkConf().setMaster('local[*]')\
                .set('spark.executor.heartbeatInterval', 10000)\
                .set('spark.network.timeout', 10000)\
                .set('spark.core.connection.ack.wait.timeout', '3600')\
                .set('spark.executor.memory', MAX_MEMORY)\
                .set('spark.driver.memory', MAX_MEMORY)

def init_spark():
    spark = SparkSession\
                .builder\
                .appName('FitRec Dataset')\
                .config(conf=conf)\
                .getOrCreate()
    return spark

spark = init_spark()
df = spark.read.json('endomondoHR_proper.json', mode='DROPMALFORMED')

                                                                                

In [3]:
print(df.printSchema())

root
 |-- altitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- gender: string (nullable = true)
 |-- heart_rate: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- id: long (nullable = true)
 |-- latitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- longitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- speed: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sport: string (nullable = true)
 |-- timestamp: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- url: string (nullable = true)
 |-- userId: long (nullable = true)

None


In [6]:
df.count()

                                                                                

55635

# Detect missing values and abnormal zeros

In [8]:
string_columns = ['gender', 'sport', 'url']
numeric_columns = ['id', 'userId']
array_columns = ['altitude', 'heart_rate', 'latitude', 'longitude', 'speed', 'timestamp']
missing_values = {}
for index,column in enumerate(df.columns):
    if column in string_columns:
        missing_count = df.filter(col(column).eqNullSafe(None) | col(column).isNull()).count()
        missing_values.update({column: missing_count})
    if column in numeric_columns:
        missing_count = df.where(col(column).isin([0, None, np.nan])).count()
        missing_values.update({column: missing_count})
    if column in array_columns:
        missing_count = df.filter(array_contains(df[column], 0) | array_contains(df[column], np.nan)).count()
        missing_values.update({column: missing_count})

missing_df = pd.DataFrame.from_dict([missing_count])
missing_df

                                                                                

Unnamed: 0,0
0,0


In [None]:
# create a new column to count the number of timestamps recorded per row/workout
df = df.withColumn('perWorkoutRecordCount', size(col('timestamp')))

def user_activity_workout_summarize(df):
    user_count = format(df.select)