In [1]:
# Calculating speed for each vehicle in a day

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1609603797915_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc.install_pypi_package("haversine")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting haversine
  Using cached https://files.pythonhosted.org/packages/f4/52/a13286844780c7b1740edbbee8a8f0524e2a6d51c068b59dda39a6a119f5/haversine-2.3.0-py2.py3-none-any.whl
Installing collected packages: haversine
Successfully installed haversine-2.3.0

In [5]:
from haversine import haversine, Unit
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

# calculating speed for each register
def calculate_speed(lon1,lat1,lon2,lat2,time_variation):
    
    # if lon2 and lat2 are available
    if lon2 and lat2 and time_variation != 0:
        coord_1 = float(lat1),float(lon1)
        coord_2 = float(lat2),float(lon2)
        distance = haversine(coord_1,coord_2,unit=Unit.METERS)
        
        # converting the speed from m/s to km/h multiplying by 3.6
        return (distance/float(time_variation)) * 3.6
    
    # it there is no lat and long or time_variation = 0 
    
    else:
        if time_variation == 0:
            return 0
        else:
            return -1
    
get_speed_udf = F.udf(calculate_speed, FloatType())

window = Window.partitionBy("id_avl","line_id").orderBy('dt_avl') 

for day in range(1,32):
    
    # reading traces
    traces = spark.read.parquet("s3://mobility-traces-sp/processed-data-avl-date/9-map-matching-filtered/MO_1510"+str(day)+"/")
    
    # getting time variation
    traces_time_variation = traces.select("*", (F.to_timestamp('dt_avl').cast(LongType()) - F.to_timestamp(F.lag("dt_avl").over(window)).cast(LongType())).alias("time_variation"))
    
    # getting speed based on bus location
    traces_speed_bus_location = traces_time_variation.select("*", get_speed_udf(F.col("longitude"),F.col("latitude"),F.lag(F.col("longitude")).over(window),F.lag(F.col("latitude")).over(window),F.col("time_variation")).alias("speed"))
    
    traces_speed_bus_location.write.parquet("s3://mobility-traces-sp/processed-data-avl-date/10-speed-calculation-based-bus-location/MO_1510"+str(day)+"/")

    
    # getting speed based on shape location
    traces_speed_shape_location = traces_time_variation.select("*", get_speed_udf(F.col("min_shape_coord_lon"),F.col("min_shape_coord_lat"),F.lag(F.col("min_shape_coord_lon")).over(window),F.lag(F.col("min_shape_coord_lat")).over(window),F.col("time_variation")).alias("speed"))
    
    traces_speed_shape_location.write.parquet("s3://mobility-traces-sp/processed-data-avl-date/11-speed-calculation-based-shape-location/MO_1510"+str(day)+"/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [1]:
sc.install_pypi_package("boto3")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1609603797915_0006,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3
  Using cached https://files.pythonhosted.org/packages/28/78/4067ce89180daf0b2027df4b3e4c4734d73b99c3a664d262a4c4d5ac1021/boto3-1.16.47-py2.py3-none-any.whl
Collecting botocore<1.20.0,>=1.19.47 (from boto3)
  Using cached https://files.pythonhosted.org/packages/8f/4a/16ffdfc33d93f02604ae9ed1ddb6369030b6f61b583f31dc84e0d0da05c1/botocore-1.19.47-py2.py3-none-any.whl
Collecting s3transfer<0.4.0,>=0.3.0 (from boto3)
  Using cached https://files.pythonhosted.org/packages/69/79/e6afb3d8b0b4e96cefbdc690f741d7dd24547ff1f94240c997a26fa908d3/s3transfer-0.3.3-py2.py3-none-any.whl
Collecting python-dateutil<3.0.0,>=2.1 (from botocore<1.20.0,>=1.19.47->boto3)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Collecting urllib3<1.27,>=1.25.4; python_version != "3.4" (from botocore<1.20.0,>=1.19.47->boto3)
  Using cached https://files.pythonhosted.org/packages/f5/71/45d3

In [None]:
import pyspark.sql.functions as F
#Statistics from each method to calculate speed
csv_out_shape_location = "day,mean,min,max,stddev,0.0625,0.125,0.25,0.5,0.75\n"
csv_out_bus_location = "day,mean,min,max,stddev,0.0625,0.125,0.25,0.5,0.75\n"


for day in range(1,32):
    
    # getting statistics from the speed calculated with bus location
    traces_bus_location = spark.read.parquet("s3://mobility-traces-sp/processed-data-avl-date/10-speed-calculation-based-bus-location/MO_1510"+str(day)+"/")

    stats_bus_location = traces_bus_location.agg(F.mean('speed').alias('mean'),
                       F.min('speed').alias('min'),
                       F.max('speed').alias('max'),
                       F.stddev('speed').alias("stddev")).collect()


    quantiles_bus_location = traces_bus_location.approxQuantile("speed", [0.0625,0.125,0.25,0.5,0.75], 0.0001)

    csv_out_bus_location += "{},{},{},{},{},{},{},{},{},{}"\
        .format("MO_1510"+str(day),stats_bus_location[0]["mean"],stats_bus_location[0]["min"],stats_bus_location[0]["max"],
            stats_bus_location[0]["stddev"],quantiles_bus_location[0],quantiles_bus_location[1],
                quantiles_bus_location[2],quantiles_bus_location[3],quantiles_bus_location[4]
        ) + "\n"
    
    # getting statistics from the speed calculated with shape location
    traces_shape_location = spark.read.parquet("s3://mobility-traces-sp/processed-data-avl-date/11-speed-calculation-based-shape-location/MO_1510"+str(day)+"/")

    stats_shape_location = traces_shape_location.agg(F.mean('speed').alias('mean'),
                       F.min('speed').alias('min'),
                       F.max('speed').alias('max'),
                       F.stddev('speed').alias("stddev")).collect()


    quantiles_shape_location = traces_shape_location.approxQuantile("speed", [0.0625,0.125,0.25,0.5,0.75], 0.0001)
    
    csv_out_shape_location += "{},{},{},{},{},{},{},{},{},{}"\
        .format("MO_1510"+str(day),stats_shape_location[0]["mean"],stats_shape_location[0]["min"],stats_shape_location[0]["max"],
            stats_shape_location[0]["stddev"],quantiles_shape_location[0],quantiles_shape_location[1],
                quantiles_shape_location[2],quantiles_shape_location[3],quantiles_shape_location[4]
        ) + "\n"

import boto3   
s3 = boto3.client('s3')

# writing results in S3
s3.put_object(Body=bytes(csv_out_bus_location,"utf-8"), Bucket='mobility-traces-sp', Key='statistics/exploring-data/11-speed_analysis_bus_location.csv')
s3.put_object(Body=bytes(csv_out_shape_location,"utf-8"), Bucket='mobility-traces-sp', Key='statistics/exploring-data/12-speed_analysis_shape_location.csv')
