# Preprocessing steps

In [5]:
# Calculating partition 15min to clean data
import pyspark.sql.functions as F

import time
# measure time execution
start_time = time.time()

# October 1 - Oct 31
for day in range(1,32):
    traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/8-map-matching/MO_1510{day}/")
    traces = traces.repartition(150) 
    df_enriched = traces\
        .withColumn('minute_avl',F.minute(F.col("dt_avl")))\
        .withColumn("15_min_partition",F.concat(F.col("hour_avl"),F.lit("-"),F.floor(F.col("minute_avl")/15)))\
        .drop("hour_diff","time_variation","trip_id","direction","route_id","trip_head")
    df_enriched.repartition(150).write.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/8-map-matching-enriched/MO_1510{day}/")
    
print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 1800 seconds

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- 1802.7558500766754 seconds ---

In [6]:
# Statistics shape distance

import pyspark.sql.functions as F

import time
# measure time execution
start_time = time.time()

# Oct 1- Oct 31
for day in range(1,32):

    traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/8-map-matching-enriched/MO_1510{day}/")
    
    print("Day", day)
    stats = traces.agg(F.mean('min_distance').alias('mean'),
                           F.min('min_distance').alias('min'),
                           F.max('min_distance').alias('max'),
                           F.stddev('min_distance').alias("stddev")).collect()

    print(stats)

    quantiles = traces.approxQuantile("min_distance", [0.25,0.5,0.55,0.6,0.75,0.8], 0.0001)
    
    print(quantiles)
    
print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 60 seconds

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Day 1
[Row(mean=436.36319595663747, min=0.0, max=43217.784192039915, stddev=1366.5488256293363)]
[9.602056040873755, 20.891104443831036, 24.37105313880389, 28.67433231896753, 53.26550776259943, 77.95274141254562]
Day 2
[Row(mean=441.88437239728944, min=0.0, max=54529.50900703268, stddev=1382.2144781286675)]
[9.848074753554256, 21.07382093061351, 24.55635231142709, 28.880025548690814, 53.76824384577271, 79.45578282938614]
Day 3
[Row(mean=1024.170590014718, min=0.0, max=42763.82291702403, stddev=1923.7103492875638)]
[12.707925599866817, 38.388459189277725, 53.23005741874364, 95.58318178783884, 1385.362561469755, 1887.9414857372176]
Day 4
[Row(mean=1404.9278918485875, min=0.0, max=47059.875464543045, stddev=2134.757835324653)]
[18.070756268892563, 205.84379574592904, 429.1106460696577, 929.9500850366549, 2089.3772051961523, 2627.9150612196154]
Day 5
[Row(mean=456.1455057304434, min=0.0, max=37157.14456906165, stddev=1426.9824488544034)]
[9.636552342924771, 21.042093583851045, 24.599655240

In [3]:
# Filtering data based on shape matching min_distance (distance between trace and shape sequence)

import time
# measure time execution
start_time = time.time()

for day in range(1,32):
    traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/8-map-matching-enriched/MO_1510{day}/")
    traces_filtered = traces.filter("min_distance <= 450")
    traces_filtered.write.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/9a-filtered-traces-based-shape-distance/MO_1510{day}/")

print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 128 seconds

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- 128.23097229003906 seconds ---

In [7]:
# Counting the number of shapes and traces per bus after the first filter based on shape_distance

import time
# measure time execution
start_time = time.time()

import pyspark.sql.functions as F

# Oct 1 - Oct 31
for day in range(1,32):

    traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/9a-filtered-traces-based-shape-distance/MO_1510{day}/")
    
    # count the number of shapes in the 15 min partition
    count_shape_per_15 = traces.groupby("id_avl","15_min_partition").agg(F.countDistinct("min_shape_sequence").alias("count_shape"))
    
    count_shape_per_15.write.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/statistics/count-shape-per-15-min/MO_1510{day}/")
    
    # count the number of traces per bus per day 
    count_traces_per_bus = traces.groupby("id_avl").agg(F.count("id_avl").alias("count_traces"))
    
    count_traces_per_bus.write.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/statistics/count-traces-per-bus/MO_1510{day}/")
    
print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 194 seconds

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- 194.5266535282135 seconds ---

In [9]:
# Statistics shape count

for day in range(1,32):

    count_shape = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/statistics/count-shape-per-15-min/MO_1510{day}/")

    print("Day", day)
    stats = count_shape.agg(F.mean('count_shape').alias('mean'),
                           F.min('count_shape').alias('min'),
                           F.max('count_shape').alias('max'),
                           F.stddev('count_shape').alias("stddev")).collect()

    print(stats)

    quantiles = count_shape.approxQuantile("count_shape", [0.0625,0.125,0.25,0.5,0.75], 0.0001)

    print(quantiles)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Day 1
[Row(mean=13.095092878064778, min=1, max=118, stddev=7.316025382543044)]
[1.0, 2.0, 7.0, 14.0, 19.0]
Day 2
[Row(mean=13.22835431693853, min=1, max=142, stddev=7.245230970352681)]
[1.0, 3.0, 8.0, 15.0, 19.0]
Day 3
[Row(mean=12.28124938291108, min=1, max=139, stddev=7.978452832646986)]
[1.0, 1.0, 5.0, 14.0, 19.0]
Day 4
[Row(mean=10.717148531410055, min=1, max=132, stddev=8.552036228283928)]
[1.0, 1.0, 2.0, 10.0, 19.0]
Day 5
[Row(mean=13.234250047295275, min=1, max=142, stddev=7.3439870342518265)]
[1.0, 2.0, 7.0, 15.0, 19.0]
Day 6
[Row(mean=13.268405719352364, min=1, max=133, stddev=7.319598599612858)]
[1.0, 3.0, 8.0, 15.0, 19.0]
Day 7
[Row(mean=13.329297435674386, min=1, max=141, stddev=7.239826742567762)]
[1.0, 3.0, 8.0, 15.0, 19.0]
Day 8
[Row(mean=13.280173448584263, min=1, max=134, stddev=7.292365174355095)]
[1.0, 2.0, 8.0, 15.0, 19.0]
Day 9
[Row(mean=13.36215483580825, min=1, max=152, stddev=7.201804037456983)]
[1.0, 3.0, 8.0, 15.0, 19.0]
Day 10
[Row(mean=12.241808592426572, mi

In [10]:
# Statistics traces count per bus per day

for day in range(1,32):

    count_traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/statistics/count-traces-per-bus/MO_1510{day}/")

    print("Day", day)
    stats = count_traces.agg(F.mean('count_traces').alias('mean'),
                           F.min('count_traces').alias('min'),
                           F.max('count_traces').alias('max'),
                           F.stddev('count_traces').alias("stddev")).collect()

    print(stats)

    quantiles = count_traces.approxQuantile("count_traces", [0.0625,0.125,0.25,0.5,0.75], 0.0001)

    print(quantiles)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Day 1
[Row(mean=1122.1033356990774, min=1, max=25956, stddev=755.7302778034827)]
[372.0, 539.0, 741.0, 1232.0, 1403.0]
Day 2
[Row(mean=1129.6661931818182, min=1, max=24974, stddev=704.9896598980055)]
[387.0, 551.0, 775.0, 1243.0, 1403.0]
Day 3
[Row(mean=1081.5757823129252, min=1, max=28270, stddev=741.9633661413093)]
[217.0, 448.0, 696.0, 1225.0, 1390.0]
Day 4
[Row(mean=1052.7197056794666, min=1, max=30499, stddev=676.3675794440411)]
[161.0, 407.0, 651.0, 1203.0, 1376.0]
Day 5
[Row(mean=1134.1929649672459, min=1, max=27104, stddev=786.9617648107253)]
[379.0, 547.0, 781.0, 1244.0, 1406.0]
Day 6
[Row(mean=1145.5489219383762, min=1, max=29889, stddev=834.4138551176568)]
[401.0, 564.0, 808.0, 1248.0, 1407.0]
Day 7
[Row(mean=1142.036148865011, min=1, max=30216, stddev=755.4333933870798)]
[405.0, 560.0, 809.0, 1252.0, 1406.0]
Day 8
[Row(mean=1147.0251885584175, min=1, max=30193, stddev=775.8133065138508)]
[412.0, 568.0, 816.0, 1253.0, 1410.0]
Day 9
[Row(mean=1152.2321390412424, min=1, max=31

Exception in thread cell_monitor-10:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1297



In [3]:
# Filtering data based on the number of shapes 
# If the partition of 15min for a bus contains 1 or 2 shapes

import time
# measure time execution
start_time = time.time()

import pyspark.sql.functions as f

# Oct 1 - Oct 31
for day in range(1,32):
    count_shape = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/statistics/count-shape-per-15-min/MO_1510{day}/")
        
    counts_15_min_to_exclude = count_shape.filter("count_shape < 3")
    
    # concatenating id_avl + 15_min_parition
    traces_to_exclude = [f"{row['id_avl']}-{row['15_min_partition']}" for row in counts_15_min_to_exclude.collect()]
    
    traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/9a-filtered-traces-based-shape-distance/MO_1510{day}/")
    
    traces_filtered = traces.withColumn("combined_col", f.concat(f.col("id_avl"), f.lit("-"), f.col("15_min_partition")))\
                        .filter((f.col("combined_col").isin(traces_to_exclude) == False))\
                        .drop("combined_col")
    
    traces_filtered.write.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/9b-filtered-stopped-buses-noise-based-number-shapes/MO_1510{day}/")

    
print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 478 seconds 2 items --> calculate again

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- 478.6115417480469 seconds ---

In [2]:
# install haversine
sc.install_pypi_package("haversine")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting haversine
  Downloading https://files.pythonhosted.org/packages/f4/52/a13286844780c7b1740edbbee8a8f0524e2a6d51c068b59dda39a6a119f5/haversine-2.3.0-py2.py3-none-any.whl
Installing collected packages: haversine
Successfully installed haversine-2.3.0

In [3]:
# Calculating instantaneous speed = Speed between consecutive traces per bus

from haversine import haversine, Unit
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

import time
# measure time execution
start_time = time.time()

# calculating speed for each register
def calculate_speed(lon1,lat1,lon2,lat2,time_variation):
    
    # if lon2 and lat2 are available
    if lon2 and lat2 and time_variation != 0:
        coord_1 = float(lat1),float(lon1)
        coord_2 = float(lat2),float(lon2)
        distance = haversine(coord_1,coord_2,unit=Unit.METERS)
        
        # converting the speed from m/s to km/h multiplying by 3.6
        return (distance/float(time_variation)) * 3.6
    
    # it there is no lat and long or time_variation = 0 
    
    else:
        if time_variation == 0:
            return 0
        else:
            return -1
    
get_speed_udf = F.udf(calculate_speed, FloatType())

window = Window.partitionBy("id_avl").orderBy('dt_avl') 

for day in range(1,32):
    
    # reading traces
    traces = spark.read.parquet(f"s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/9b-filtered-stopped-buses-noise-based-number-shapes/MO_1510{day}/")
    
    traces = traces.withColumnRenamed("trace_x", "longitude").withColumnRenamed("trace_y", "latitude")
    
    # getting time variation
    traces_time_variation = traces.select("*", (F.to_timestamp('dt_avl').cast(LongType()) - F.to_timestamp(F.lag("dt_avl").over(window)).cast(LongType())).alias("time_variation"))
    
    # getting speed based on bus location
    traces_speed_bus_location = traces_time_variation.select("*", get_speed_udf(F.col("longitude"),F.col("latitude"),F.lag(F.col("longitude")).over(window),F.lag(F.col("latitude")).over(window),F.col("time_variation")).alias("speed"))
    
    traces_speed_bus_location.write.parquet("s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/10-speed-calculation/MO_1510"+str(day)+"/")
    
print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 435 seconds 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- 435.7866313457489 seconds ---

In [4]:
# Speed Filter 
# Filtering Speed between 0.1 km/h and 80 km/h

import time
# measure time execution
start_time = time.time()

# October 1 - October 31
for day in range(1,32):
    traces = spark.read.parquet("s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/10-speed-calculation/MO_1510"+str(day)+"/")
    traces_new = traces.filter("speed >= 0.1").filter("speed <= 80")
    traces_new.write.parquet("s3://mobility-traces-sp/processed-data-avl-date/using-15-min-filter/11-speed-calculation-filtered/MO_1510"+str(day)+"/")

    
print("--- %s seconds ---" % (time.time() - start_time))
# Cluster 1 Master Node - c5.9xlarge, Core Nodes - 8 c5.9xlarge
# 269 seconds 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

--- 269.15069794654846 seconds ---