<a href="https://colab.research.google.com/github/LeoVogiatzis/Decentralized_technologies/blob/main/Pyspark_trip_metrics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install pyspark and open-jdk

In [166]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark --no-cache-dir

In [167]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

SparkSession 

In [168]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [169]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, ntile
import sys

In [170]:
# Load the csv into a dataframe
from google.colab import files
uploaded = files.upload()
trip_df = spark.read.csv("/content/trip_data.csv", header=True, inferSchema=True)
trip_df

driver_id,trip_id,started_at,acc_3050_avg,acc_5090_avg,gyroscope_distance_avg,acc_highway_sq_of_diff
17,00FADF7F-2476-4AS...,2021-03-06 10:54:...,,,,
25,68D01DEE-0762-43F...,2020-06-24 22:56:...,1.248,0.0,0.0839204,0.0
25,B9787774-A4A5-4D7...,2020-06-24 22:56:...,2.42077,0.854118,0.0577057,0.0
25,B9FC3426-54D7-422...,2020-06-25 05:42:...,1.01929,0.645,0.0518536,0.0
29,003EBA72-BCD4-494...,2020-06-07 18:02:...,1.8231,1.52599,0.115597,0.0
29,005434FE-2A50-455...,2021-10-16 13:09:...,1.34385,1.10739,0.117654,0.0
29,006D71BD-924E-456...,2021-06-22 17:16:...,1.43,1.00235,0.179869,0.0
29,00FCDF7F-2476-4AB...,2021-03-06 10:54:...,,,,
29,0111FE7B-FD62-44A...,2021-08-19 17:31:...,1.44327,1.27143,0.0635855,1.60433
29,014FC193-93AA-4D3...,2021-03-18 17:52:...,,,,


In [171]:
trip_df.printSchema()

root
 |-- driver_id: integer (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- acc_3050_avg: double (nullable = true)
 |-- acc_5090_avg: double (nullable = true)
 |-- gyroscope_distance_avg: double (nullable = true)
 |-- acc_highway_sq_of_diff: double (nullable = true)



In [172]:
trip_df.limit(5)

driver_id,trip_id,started_at,acc_3050_avg,acc_5090_avg,gyroscope_distance_avg,acc_highway_sq_of_diff
17,00FADF7F-2476-4AS...,2021-03-06 10:54:...,,,,
25,68D01DEE-0762-43F...,2020-06-24 22:56:...,1.248,0.0,0.0839204,0.0
25,B9787774-A4A5-4D7...,2020-06-24 22:56:...,2.42077,0.854118,0.0577057,0.0
25,B9FC3426-54D7-422...,2020-06-25 05:42:...,1.01929,0.645,0.0518536,0.0
29,003EBA72-BCD4-494...,2020-06-07 18:02:...,1.8231,1.52599,0.115597,0.0


Cast the columns into the proper type


In [173]:
trip_df.selectExpr("cast(acc_3050_avg as float) acc_3050_avg")
trip_df.selectExpr("cast(acc_5090_avg as float) acc_5090_avg")
trip_df.selectExpr("cast(gyroscope_distance_avg as float) gyroscope_distance_avg")
trip_df.selectExpr("cast(acc_highway_sq_of_diff as float) acc_highway_sq_of_diff")
trip_df.selectExpr("cast(started_at as timestamp) started_at")

started_at
2021-03-06 10:54:54
2020-06-24 22:56:36
2020-06-24 22:56:38
2020-06-25 05:42:47
2020-06-07 18:02:00
2021-10-16 13:09:25
2021-06-22 17:16:49
2021-03-06 10:54:54
2021-08-19 17:31:29
2021-03-18 17:52:11


In [174]:
trip_df.printSchema()

root
 |-- driver_id: integer (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- acc_3050_avg: double (nullable = true)
 |-- acc_5090_avg: double (nullable = true)
 |-- gyroscope_distance_avg: double (nullable = true)
 |-- acc_highway_sq_of_diff: double (nullable = true)



In [175]:
trip_df.dropDuplicates().show()

+---------+--------------------+--------------------+------------+------------+----------------------+----------------------+
|driver_id|             trip_id|          started_at|acc_3050_avg|acc_5090_avg|gyroscope_distance_avg|acc_highway_sq_of_diff|
+---------+--------------------+--------------------+------------+------------+----------------------+----------------------+
|       29|1951302F-5B1C-46E...|2021-04-29 13:29:...|     1.49032|         0.0|                0.1072|                   0.0|
|       29|54CE41D7-ED04-40B...|2021-08-18 15:11:...|     2.14824|     1.45594|             0.0763101|                   0.0|
|       29|9BEFD512-2CCA-48C...|2021-08-18 17:13:...|        null|        null|                  null|                  null|
|       29|B18D6BE3-D55B-4C4...|2021-07-12 10:12:...|     1.35283|     1.03636|               0.38295|                   0.0|
|       29|EF1CDA67-0E73-406...|2021-09-26 18:16:...|    0.771148|         0.0|             0.0512305|                

In [176]:
trip_df.describe()

summary,driver_id,trip_id,started_at,acc_3050_avg,acc_5090_avg,gyroscope_distance_avg,acc_highway_sq_of_diff
count,1329.0,1329,1329,1154.0,1154.0,1154.0,981.0
mean,42.12716328066215,,,1.6466904757365664,1.0337048396880402,0.1230781030329288,0.3103469480122326
stddev,22.818520362642648,,,0.6946612782267817,0.8469978320510017,0.0920971289381675,0.8690187539767591
min,17.0,003EBA72-BCD4-494...,2020-04-23 12:45:...,0.0,0.0,0.0189641,0.0
max,89.0,TEST_P20210619083703,2021-11-19 08:29:...,5.10462,12.6,0.835739,6.98592


In [177]:
trip_df.count()

1329

In [178]:
trip_df.dropDuplicates().count()

1329

In [179]:
trip_df = trip_df.na.drop(subset=["acc_3050_avg", "acc_5090_avg", "gyroscope_distance_avg","acc_highway_sq_of_diff"])
  #  .show(truncate=False)

In [180]:
trip_df.count()

981

Groupy and get the sum per driver

In [181]:
exprs = {x: "sum" for x in trip_df.columns[3:]}
trip_df.groupBy("trip_id","driver_id").agg(exprs).show()

+--------------------+---------+---------------------------+---------------------------+-----------------+-----------------+
|             trip_id|driver_id|sum(gyroscope_distance_avg)|sum(acc_highway_sq_of_diff)|sum(acc_5090_avg)|sum(acc_3050_avg)|
+--------------------+---------+---------------------------+---------------------------+-----------------+-----------------+
|7B0B4DB8-9BFA-4CA...|       29|                   0.311806|                        0.0|              0.0|             1.32|
|8CC77450-6399-464...|       29|                   0.340049|                        0.0|             0.57|          1.45846|
|7C3BDC1F-213C-401...|       29|                   0.100504|                    1.66102|          2.23756|          2.42118|
|B5078781-D63D-411...|       29|                   0.234943|                   0.825221|            1.404|              1.6|
|TEST_D20210619182959|       64|                   0.122699|                   0.740295|         0.727679|          1.34972|


In [182]:
trip_df.orderBy(trip_df.driver_id.desc(),trip_df.started_at.desc())

driver_id,trip_id,started_at,acc_3050_avg,acc_5090_avg,gyroscope_distance_avg,acc_highway_sq_of_diff
89,423E8537-DAA1-42E...,2021-11-19 08:29:...,1.5216,1.22923,0.0894158,0.9045
89,2186F7E8-6859-44E...,2021-11-11 21:02:...,1.57233,1.03448,0.0755129,0.0
89,5EE71E65-7590-40D...,2021-11-11 17:27:...,1.55613,2.337,0.0779973,0.0
89,830D844D-E39C-441...,2021-11-11 07:45:...,1.26323,1.31321,0.0852725,3.01935
89,454BE33C-84DB-424...,2021-11-04 19:46:...,1.19023,0.78895,0.0782507,0.545582
89,D8132ED3-8B6A-43D...,2021-11-04 17:32:...,1.50696,2.64,0.0889523,0.0
89,9AA8842C-6339-4D2...,2021-11-04 17:28:...,1.48174,0.0,0.10475,0.0
89,30755727-254F-4C9...,2021-11-04 17:28:...,1.6906,1.62323,0.0912253,0.0
89,0A40A18D-8911-4EC...,2021-10-15 06:45:...,1.33071,0.806846,0.0649742,0.0
89,4E682BA3-1500-458...,2021-10-15 06:45:...,1.49938,1.16118,0.0994467,0.0


2 rolling windows
1) windowDept => partitionBy("driver_id") => yield the 50 rows per driver 
2) w => get the previous instances of the last trip and calc avg and std

In [190]:
windowDept = Window.partitionBy("driver_id").orderBy(col("started_at").desc())

w = (Window()
     .partitionBy("driver_id")
    #  .orderBy(F.col("started_at").cast('long').desc())
     .orderBy(F.col("rnk").desc())
     .rangeBetween(-sys.maxsize, 0)
     )

trip_df = trip_df.withColumn("row",row_number().over(windowDept)).filter(col("row") <= 50).withColumn(
  "rnk",
  F.dense_rank().over(Window.partitionBy().orderBy(trip_df.driver_id.desc(),trip_df.started_at.desc()))
).withColumn('acc_3050_avg_rolling_avg',   F.avg("acc_3050_avg").over(w)).withColumn('acc_5090_rolling_avg',   F.avg("acc_5090_avg").over(w)).withColumn('gyroscope_distance_rolling_avg',   F.avg("gyroscope_distance_avg").over(w)).withColumn('acc_highway_sq_of_diff_avg',   F.avg("acc_highway_sq_of_diff").over(w)).withColumn('acc_3050_avg_rolling_std',   F.stddev("acc_3050_avg").over(w)).withColumn('acc_5090_rolling_std',   F.stddev("acc_5090_avg").over(w)).withColumn('gyroscope_distance_rolling_std',   F.stddev("gyroscope_distance_avg").over(w)).withColumn('acc_highway_sq_of_diff_std',   F.stddev("acc_highway_sq_of_diff").over(w))


In [191]:
trip_df.show(900)

+---------+--------------------+--------------------+------------+------------+----------------------+----------------------+---+---+------------------------+--------------------+------------------------------+--------------------------+------------------------+--------------------+------------------------------+--------------------------+
|driver_id|             trip_id|          started_at|acc_3050_avg|acc_5090_avg|gyroscope_distance_avg|acc_highway_sq_of_diff|rnk|row|acc_3050_avg_rolling_avg|acc_5090_rolling_avg|gyroscope_distance_rolling_avg|acc_highway_sq_of_diff_avg|acc_3050_avg_rolling_std|acc_5090_rolling_std|gyroscope_distance_rolling_std|acc_highway_sq_of_diff_std|
+---------+--------------------+--------------------+------------+------------+----------------------+----------------------+---+---+------------------------+--------------------+------------------------------+--------------------------+------------------------+--------------------+------------------------------+--

In [185]:
cols = ('started_at','acc_3050_avg', 'acc_5090_avg', 'gyroscope_distance_avg', 'acc_highway_sq_of_diff', 'rnk','row')
trip_df.drop(*cols).printSchema()

root
 |-- driver_id: integer (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- acc_3050_avg_rolling_avg: double (nullable = true)
 |-- acc_5090_rolling_avg: double (nullable = true)
 |-- gyroscope_distance_rolling_avg: double (nullable = true)
 |-- acc_highway_sq_of_diff_avg: double (nullable = true)
 |-- acc_3050_avg_rolling_std: double (nullable = true)
 |-- acc_5090_rolling_std: double (nullable = true)
 |-- gyroscope_distance_rolling_std: double (nullable = true)
 |-- acc_highway_sq_of_diff_std: double (nullable = true)



In [186]:
trip_df.drop(*cols).write.parquet("rolling_metrics.parquet") 

In [188]:
parDF1=spark.read.parquet("rolling_metrics.parquet")


In [189]:
parDF1

driver_id,trip_id,acc_3050_avg_rolling_avg,acc_5090_rolling_avg,gyroscope_distance_rolling_avg,acc_highway_sq_of_diff_avg,acc_3050_avg_rolling_std,acc_5090_rolling_std,gyroscope_distance_rolling_std,acc_highway_sq_of_diff_std
25,68D01DEE-0762-43F...,1.248,0.0,0.0839204,0.0,,,,
25,B9787774-A4A5-4D7...,1.834385,0.427059,0.07081305,0.0,0.8292736197721474,0.6039526297334916,0.0185365921367709,0.0
25,B9FC3426-54D7-422...,1.5626866666666668,0.499706,0.0644932333333333,0.0,0.7518692181711747,0.4452102304619695,0.0170769690555242,0.0
29,897A78C8-EF6C-4D2...,1.37095,0.892606,0.199229,0.0,,,,
29,0F5548E4-81BD-467...,1.6907299999999998,1.163063,0.2154329999999999,0.0,0.4522372129756684,0.3824839574387401,0.0229159165646936,0.0
29,FCB8C9D5-B8BA-4D1...,1.7632266666666665,1.264602,0.1862623333333333,0.0,0.3435499213117846,0.3226104375434868,0.0530599000878566,0.0
29,9CAA9BC1-388F-47B...,1.8593525,1.2739515,0.159790625,0.0,0.3400662714212238,0.2640731884831678,0.0684098484617712,0.0
29,B6E32C47-0844-4AD...,1.981082,1.6840032,0.1493077,0.0,0.4010288737360442,0.9449936349104155,0.063713335399271,0.0
29,81117A1F-3BB3-415...,1.8989016666666665,1.518336,0.1333032833333333,0.0,0.4113161426161958,0.9375948501095768,0.0691690639173443,0.0
29,9BB54228-4899-43F...,2.07013,1.676125142857143,0.1291695285714285,0.0,0.5884030386846532,0.9522877616658088,0.0640826159606153,0.0
