In [2]:
import pandas
import dask.array as da
import numpy as np
import struct
from haversine import haversine, Unit
from math import sqrt
from scipy.signal import medfilt
from datetime import datetime, timedelta
from timezonefinder import TimezoneFinder
import pytz

In [3]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml import Pipeline

MAX_MEMORY = "5g"
spark = SparkSession \
        .builder \
        .appName("FitRec") \
        .config("spark.executor.memory", MAX_MEMORY) \
        .config("spark.driver.memory", MAX_MEMORY) \
        .getOrCreate()

sc = spark.sparkContext


In [4]:
data = spark.read.json("data/endomondoHR_proper.json")
print("Schema of the data set:")
data.printSchema()


Schema of the data set:
root
 |-- altitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- gender: string (nullable = true)
 |-- heart_rate: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- id: long (nullable = true)
 |-- latitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- longitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- speed: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- sport: string (nullable = true)
 |-- timestamp: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- url: string (nullable = true)
 |-- userId: long (nullable = true)



In [5]:
num_rows = data.count()
print("Number of rows: " + str(num_rows))


Number of rows: 167783


In [6]:
sample = data.rdd.sample(False, 200, 123)
sample = sample.filter(lambda x: all(heart_rate > 40 for heart_rate in x.heart_rate))
sample.cache()


PythonRDD[15] at RDD at PythonRDD.scala:53

In [7]:
number_rows_sample = sample.count()
print("Number of rows of sample that has been cached: " + str(number_rows_sample))


Number of rows of sample that has been cached: 167010


In [8]:
def get_time(timestamp, long, lat):
    utc_time = datetime.fromtimestamp(timestamp)
    true_time = utc_time - timedelta(hours=7)
    return true_time


In [9]:
def derive_feature(row):
    """
    Calculate distance
    """
    lat = medfilt(row.latitude, 3).tolist()
    long = medfilt(row.longitude, 3).tolist()
    
    alt = [0.0001893939*i for i in row.altitude] #Convert from ft to mile
    indices = range(1,len(lat))
    
    diff_alt = [0.0]
    diff_alt += [alt[i] - alt[i-1] for i in indices] 
    
    diff_time = [0.0]
    diff_time += [row.timestamp[i] - row.timestamp[i-1] for i in indices]
    
    # Calculate different of heart rate between 2 consecutive timestamp
    diff_heart = [0.0]
    diff_heart += [row.heart_rate[i] - row.heart_rate[i-1] for i in indices]
    
    # Calculate distance derive between 2 consecutive timestamps.
    # Unit: mile
    distance = [0.0]
    distance += [haversine((lat[i-1],long[i-1]), (lat[i], long[i]), unit=Unit.MILES) for i in indices]
    d_distance = [sqrt(d**2 + a**2) for d, a in zip(distance, diff_alt)] #approximate 
    
    # Calculate average derived speed between 2 consecutive timestamps.
    # Unit: MPH
    d_speed = [0.0]
    try:
        d_speed += [dist/time*3600 for dist, time in zip(d_distance[1:], diff_time[1:])]
    except:
        d_speed = [0.0] * len(row.timestamp)
        
    # Get local hours
    hours = []
    minutes = []
    for (lg, lt, ts) in zip(row.longitude, row.latitude, row.timestamp):
        local_time = get_time(ts,lg,lt)
        hours.append(local_time.hour)
        minutes.append(local_time.minute)
    return Row(altitude = row.altitude,\
               gender = row.gender,\
               heart_rate = row.heart_rate,\
               id = row.id,\
               latitude = row.latitude,\
               longitude = row.longitude,\
               speed = row.speed,\
               sport = row.sport,\
               timestamp = row.timestamp,\
               url = row.url,\
               userId = row.userId,\
               distance = d_distance,\
               derive_speed = d_speed,\
               diff_time = diff_time,\
               diff_heart_rate = diff_heart,\
               hours = hours,\
               minutes = minutes)


In [10]:
def transform(row, lag=2):
    """
    Transform a workout session to multiples window frames.
    """
    prefix = [row.id, row.url, row.userId, row.sport, row.gender]
    flatted = []
    speed = row.speed if row.speed is not None else row.derive_speed
    a_features = [row.longitude, row.latitude, row.hours]
    b_features = [speed, row.distance, row.diff_time]
    c_features = [row.heart_rate, row.diff_heart_rate]
    for idx in range(len(row.timestamp)):
        a_row = []
        b_row = []
        c_row = []
        if idx < lag:
            mask = [0.0] * (lag-idx) #[0, 0, 1]
            for a in a_features:
                a_row += mask + a[0:idx+1]
            for b in b_features:
                roller = mask + b[0:idx+1]
                b_row += roller + [float(np.min(roller)), float(np.max(roller)), float(np.mean(roller)),\
                          float(np.std(roller))]
            for c in c_features:
                roller = mask + c[0:idx+1]
                b_row += roller + [float(np.min(roller[:-1])), float(np.max(roller[:-1])), \
                                   float(np.mean(roller[:-1])), float(np.std(roller[:-1]))]
        else:
            for a in a_features:
                a_row += a[idx-lag:idx+1]
            for b in b_features:
                roller = b[idx-lag:idx+1]
                b_row += roller + [float(np.min(roller)), float(np.max(roller)), float(np.mean(roller)),\
                          float(np.std(roller))]
            for c in c_features:
                roller = c[idx-lag:idx+1]
                c_row += roller + [float(np.min(roller[:-1])), float(np.max(roller[:-1])), \
                                   float(np.mean(roller[:-1])), float(np.std(roller[:-1]))]
                
        tmp = a_row + b_row + c_row
        tmp = [float(x) for x in a_row + b_row + c_row]
        flatted.append(prefix + tmp)
    return flatted


In [11]:
agg_name = ['min', 'max', 'mean', 'std']
a_name = ['longitude', 'latitude', 'hours']
b_name = ['speed', 'distance', 'diff_time', 'heart_rate', 'diff_heart_rate']
column = ['id', 'url', 'userId', 'sport', 'gender']

for name in a_name:
    column += [name + '_{}'.format(i) for i in range(2, -1, -1)]
for name in b_name:
    column += [name + '_{}'.format(i) for i in range(2, -1, -1)]
    column += [name + '_{}'.format(i) for i in agg_name]
print("Columns in the dataset:")
print(column)

# Export feature importances
# 2 rounds of training:
# First round: basic model => feature importances
# Second round: importances features => train a model => better performance

# square(heart_rate), heart_rate^3, ...


Columns in the dataset:
['id', 'url', 'userId', 'sport', 'gender', 'longitude_2', 'longitude_1', 'longitude_0', 'latitude_2', 'latitude_1', 'latitude_0', 'hours_2', 'hours_1', 'hours_0', 'speed_2', 'speed_1', 'speed_0', 'speed_min', 'speed_max', 'speed_mean', 'speed_std', 'distance_2', 'distance_1', 'distance_0', 'distance_min', 'distance_max', 'distance_mean', 'distance_std', 'diff_time_2', 'diff_time_1', 'diff_time_0', 'diff_time_min', 'diff_time_max', 'diff_time_mean', 'diff_time_std', 'heart_rate_2', 'heart_rate_1', 'heart_rate_0', 'heart_rate_min', 'heart_rate_max', 'heart_rate_mean', 'heart_rate_std', 'diff_heart_rate_2', 'diff_heart_rate_1', 'diff_heart_rate_0', 'diff_heart_rate_min', 'diff_heart_rate_max', 'diff_heart_rate_mean', 'diff_heart_rate_std']


In [12]:
df = sample.map(derive_feature).flatMap(transform).toDF(column)
category_cols = ["sport", "gender"]

stages =[]
for category_col in category_cols:
    str_indexer = StringIndexer(inputCol = category_col, outputCol = category_col + "_index")
    encoder = OneHotEncoderEstimator(inputCols=[str_indexer.getOutputCol()]\
                                     , outputCols=[category_col + "_vec"])
    stages += [str_indexer, encoder]
numeric_cols = [x for x in column[5:] if x != "diff_heart_rate_0" and x != "heart_rate_0"]
assembler_input = [c + "_vec" for c in category_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_input, outputCol="features", handleInvalid='skip')
stages += [assembler]

partial_pipeline = Pipeline().setStages(stages)
pipeline_model = partial_pipeline.fit(df)
prepared_df = pipeline_model.transform(df)


In [13]:
prepared_df.select("features").show()


+--------------------+
|            features|
+--------------------+
|(86,[0,42,46,49,5...|
|(86,[0,42,45,46,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
|(86,[0,42,44,45,4...|
+--------------------+
only showing top 20 rows



In [14]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor

train, test = prepared_df.randomSplit([0.8, 0.2], seed=123)
rf = RandomForestRegressor(labelCol="diff_heart_rate_0", featuresCol="features")
rf_model = rf.fit(train)

predict = rf_model.transform(test)


Py4JJavaError: An error occurred while calling o314.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 12.0 failed 1 times, most recent failure: Lost task 30.0 in stage 12.0 (TID 322, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:52)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:156)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:477)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplitsBySorting(RandomForest.scala:927)
	at org.apache.spark.ml.tree.impl.RandomForest$.findSplits(RandomForest.scala:904)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:121)
	at org.apache.spark.ml.regression.RandomForestRegressor$$anonfun$train$1.apply(RandomForestRegressor.scala:133)
	at org.apache.spark.ml.regression.RandomForestRegressor$$anonfun$train$1.apply(RandomForestRegressor.scala:119)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:119)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:52)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:156)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getSortedIterator(UnsafeExternalSorter.java:477)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 33794)
----------------------------------------


Traceback (most recent call last):
  File "/home/datle/workspace/Tools/miniconda3/envs/bigdata-lab/lib/python3.5/socketserver.py", line 313, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/datle/workspace/Tools/miniconda3/envs/bigdata-lab/lib/python3.5/socketserver.py", line 341, in process_request
    self.finish_request(request, client_address)
  File "/home/datle/workspace/Tools/miniconda3/envs/bigdata-lab/lib/python3.5/socketserver.py", line 354, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/datle/workspace/Tools/miniconda3/envs/bigdata-lab/lib/python3.5/socketserver.py", line 681, in __init__
    self.handle()
  File "/home/datle/workspace/Tools/miniconda3/envs/bigdata-lab/lib/python3.5/site-packages/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/home/datle/workspace/Tools/miniconda3/envs/bigdata-lab/lib/python3.5/site-packages/pyspark/accumulators.py", line 241

In [None]:
# print(assembler_input)


In [None]:
predict.count()


In [None]:
rf_model.featureImportances


In [None]:
value_and_pred = predict.rdd.map(lambda x: (float(x.diff_heart_rate_0), float(x.prediction)))
# ((real value, preditec value)) 
metrics = RegressionMetrics(value_and_pred)
# Scale of heart rate from 60 - 200

print("MSE = %s" % metrics.meanSquaredError)
print("RMSE = %s" % metrics.rootMeanSquaredError)


In [None]:
predict.select("prediction").show(1)
