In [None]:
# Importing the libraries we're going to use.
import matplotlib.pyplot as plt
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType
from pyspark.sql.functions import coalesce, col, lag, lit, round, sum, avg, radians, acos, sin, cos
from pyspark.sql.window import Window

import findspark
import gpxpy

# This library helps to the Python session within the current Jupyter Kernel to find Spark in the container. 
findspark.init()

In [None]:
# Innitialize the Spark Session and get the SparkContext from it.
spark = SparkSession\
    .builder\
    .enableHiveSupport()\
    .getOrCreate()

sc = spark.sparkContext

display(spark, sc)

In [None]:
# Defines the struct for the DataFrame
columns = StructType([
    StructField('time', TimestampType(), True),
    StructField('latitude', DoubleType(), True),
    StructField('longitude', DoubleType(), True)
])

# Opens and parse the GPS track stored within the *.gpx file.
# After parsing the file, the geo point are then loaded into a Spark DataFrame.
with open('/opt/etl/data/vehicles/motorcycle/AAA_11B/recovery.05-Mar-2022.1025.gpx') as fr:
    gpx_parser = gpxpy.parse(fr)
    raw_geo_df = spark.createDataFrame(map(
        lambda p: (
            p.time,
            p.latitude,
            p.longitude
        ),
        gpx_parser.tracks[0].segments[0].points
    ), columns).orderBy(col('time'), asceding=True)

raw_geo_df.show(10)

In [None]:
# Defines the haversine formula within a Spark UDF for retrieving the accurate distances between points.
# Refer to https://en.wikipedia.org/wiki/Haversine_formula for deeper knowledge of this formula.
def haversine(lat1, lon1, lat2, lon2):
    """
    Allows to calculate distance between two points using
    Haversine, reference https://en.wikipedia.org/wiki/Haversine_formula
    """
    # Convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    distance = acos(sin(lat1) * sin(lat2) + cos(lat1) * cos(lat2) * cos(lat1 - lat2))
    # Radius of earth in kilometers is 6371.01

    return distance * 6371.01

# Created a Windows with 1 lag to apply it into the latitude, longitude and times.
w = Window().partitionBy().orderBy(col('time'))

# Applies the window into the DataFrame containing the Geo points.
# This will shift by 1 position all the rows within the DataFrame.
windowed_df = raw_geo_df.select(
    '*',
    lag(col('time')).over(w).alias('old_time'),
    lag(col('latitude')).over(w).alias('old_latitude'),
    lag(col('longitude')).over(w).alias('old_longitude')
)

# Adds the columns:
# distance_km: float - distance between point P(n) and P(n-1)
# time_elapsed_hours: long - time elapsed between meassurements T(n) and T(n-1)
# speed_kmh: float - speed at a given time in km/h
consolidated_df = windowed_df\
    .withColumn('distance_km', coalesce(haversine(col('old_latitude'), col('old_longitude'), col('latitude'), col('longitude')), lit(0)))\
    .withColumn('time_elapsed_hours', coalesce(col('time').cast('long') - col('old_time').cast('long'), lit(0)) / 3600)\
    .withColumn('speed_kmh', round(coalesce(col('distance_km') / col('time_elapsed_hours'), lit(0)), 2)).na.drop()

consolidated_df.show(10)

In [None]:
# Creates a Pandas DataFrame after deleting any NA found in the DataFrame
pandas_df = consolidated_df.na.drop().toPandas().set_index('time')

# Gets a summary of:
# total_time_ride_hours: float - total time taken within the ride from start to end in hours.
# total_distance_km: float - total distance rode from start to end.
# avg_speed_ride_kmh: float - Average speed during the entire ride.
# avg_measurement_interval_seconds: float - Average time between intervals in seconds.
summary_df = consolidated_df.na.drop().select(
    sum('time_elapsed_hours').alias('total_time_ride_hours'),
    sum('distance_km').alias('total_distance_ride_km'),
    avg('speed_kmh').alias('avg_speed_ride_kmh'),
    (avg('time_elapsed_hours') * 3600).alias('avg_measurement_interval_seconds')
)

# Converts the summary DataFrame into a Python dictionary.
summary = summary_df.rdd.map(lambda row: row.asDict()).collect()[0]

In [None]:
# Plots the speed during the entire ride.
# There are several peaks that look abnormal in the diagram.
# This could mean an error of the GPS tracking app while was recording the Geolocations.
# This can be solved by normalizing the Time Series, cleaning the raw data, replacing the points by the average of P(n-1) and P(n+1),
# removing those point or setting a range to ignore those gaps.
pandas_df['speed_kmh'].plot()

In [None]:
# For this example let's set a range between 0 and 120 km/h to ignore those gaps.

pandas_df[pandas_df['speed_kmh'] > 120]

In [None]:
summary_df = consolidated_df.na.drop().filter(col('speed_kmh') <= 120).select(
    sum('time_elapsed_hours').alias('total_time_ride_hours'),
    sum('distance_km').alias('total_distance_ride_km'),
    avg('speed_kmh').alias('avg_speed_ride_kmh'),
    (avg('time_elapsed_hours') * 3600).alias('avg_measurement_interval_seconds')
)

summary = summary_df.rdd.map(lambda row: row.asDict()).collect()[0]

summary

In [None]:
pandas_df[pandas_df['speed_kmh'] <= 120]['speed_kmh'].plot()
plt.axhline(y=summary['avg_speed_ride_kmh'], color='red')