Matching noisy GPS trajectory data to underlying road segments using OpenStreetMap road network data.

In [1]:
import json
from shapely.geometry import LineString
from pyspark.sql.window import Window
from pyspark.sql.functions import col, expr, udf, collect_list, struct, row_number, lit
from sedona.spark import *

In [3]:
from pyspark.sql.functions import udf
from sedona.sql.types import GeometryType
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, DoubleType

Define Sedona context

In [2]:
config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

Map Matching
Map matching is a crucial step in many transportation analyses. It involves aligning a sequence of observed user positions (usually from GPS) onto a digital map, identifying the most likely path or sequence of roads that a user has traversed.

Load Road Network Data from OSM File into Spatial Dataframe
Utilizing the OpenStreetMap (OSM) data specific to the region to provide the foundational road network for analysis. OpenStreetMap offers detailed and open-sourced road network data, making it a prime choice for transportation studies.

In [4]:
from wherobots import matcher
dfEdge = matcher.load_osm("s3://wherobots-examples/data/osm_AnnArbor_large.xml", "[car]")
dfEdge.show(5)



+--------------------+----------+--------+----------+-----------+----------+-----------+
|            geometry|       src|     dst|   src_lat|    src_lon|   dst_lat|    dst_lon|
+--------------------+----------+--------+----------+-----------+----------+-----------+
|LINESTRING (-83.7...|  68133325|27254523| 42.238819|-83.7390142|42.2386159|-83.7390153|
|LINESTRING (-83.7...|9405840276|27254523|42.2386058|-83.7388915|42.2386159|-83.7390153|
|LINESTRING (-83.7...|  68133353|27254523|42.2385675|-83.7390856|42.2386159|-83.7390153|
|LINESTRING (-83.7...|2262917109|27254523|42.2384552|-83.7390313|42.2386159|-83.7390153|
|LINESTRING (-83.7...|9979197063|27489080|42.3200426|-83.7272283|42.3200887|-83.7273003|
+--------------------+----------+--------+----------+-----------+----------+-----------+
only showing top 5 rows



                                                                                

In [6]:
type(dfEdge)

pyspark.sql.dataframe.DataFrame

In [7]:
dfEdge.count()

                                                                                

157000

In [8]:
SedonaKepler.create_map(dfEdge, "roads")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'roads': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 2…

Load GPS Tracks Data from VED Dataset
For this analysis Vehicle Energy Dataset (VED) is used. VED is a comprehensive dataset capturing GPS trajectories of 383 vehicles (including gasoline vehicles, HEVs, and PHEV/EVs) in Ann Arbor, Michigan, USA, from Nov 2017 to Nov 2018. The data spans ~374,000 miles and includes details on fuel, energy, speed, and auxiliary power usage. Driving scenarios cover diverse conditions, from highways to traffic-dense downtown areas, across different seasons.

Source: "Vehicle Energy Dataset (VED), A Large-scale Dataset for Vehicle Energy Consumption Research" by Geunseob (GS) Oh, David J. LeBlanc, Huei Peng. Published in IEEE Transactions on Intelligent Transportation Systems (T-ITS), 2020.

GitHub: https://github.com/gsoh/VED

In [9]:
df = sedona.read.csv("s3://wherobots-examples/data/VED_171101_week.csv", header=True, inferSchema=True)

                                                                                

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df.count()

489414

In [12]:
df.printSchema()

root
 |-- DayNum: double (nullable = true)
 |-- VehId: integer (nullable = true)
 |-- Trip: integer (nullable = true)
 |-- Timestamp(ms): integer (nullable = true)
 |-- Latitude[deg]: double (nullable = true)
 |-- Longitude[deg]: double (nullable = true)
 |-- Vehicle Speed[km/h]: double (nullable = true)
 |-- MAF[g/sec]: double (nullable = true)
 |-- Engine RPM[RPM]: double (nullable = true)
 |-- Absolute Load[%]: double (nullable = true)
 |-- OAT[DegC]: double (nullable = true)
 |-- Fuel Rate[L/hr]: double (nullable = true)
 |-- Air Conditioning Power[kW]: double (nullable = true)
 |-- Air Conditioning Power[Watts]: double (nullable = true)
 |-- Heater Power[Watts]: double (nullable = true)
 |-- HV Battery Current[A]: double (nullable = true)
 |-- HV Battery SOC[%]: double (nullable = true)
 |-- HV Battery Voltage[V]: double (nullable = true)
 |-- Short Term Fuel Trim Bank 1[%]: double (nullable = true)
 |-- Short Term Fuel Trim Bank 2[%]: double (nullable = true)
 |-- Long Term Fuel 

Extracting the columns representing the vehicle id, trip id, timestamp, latitude, and longitude. Each row in the dataset represents a spatial-temporal point of a vehicle's journey, with columns detailing:

VehId: Vehicle Identifier.
Trip: Trip Identifier for a vehicle. It helps distinguish between different journeys of the same vehicle.
Timestamp(ms): Timestamp of the data point, typically represented in milliseconds.
Latitude[deg]: Latitude coordinate of the vehicle at the given timestamp.
Longitude[deg]: Longitude coordinate of the vehicle at the given timestamp.

In [13]:
df = df.select(['VehId', 'Trip', 'Timestamp(ms)','Latitude[deg]', 'Longitude[deg]'])

In [16]:
df.createOrReplaceTempView("gps_data")

In [17]:
df.show(10)

+-----+----+-------------+-------------+--------------+
|VehId|Trip|Timestamp(ms)|Latitude[deg]|Longitude[deg]|
+-----+----+-------------+-------------+--------------+
|    8| 706|            0|42.2775583333|-83.6987497222|
|    8| 706|          200|42.2775583333|-83.6987497222|
|    8| 706|         1100|42.2775583333|-83.6987497222|
|    8| 706|         2100|42.2775583333|-83.6987497222|
|    8| 706|         4200|42.2775583333|-83.6987497222|
|    8| 706|         5200|42.2782552778|-83.6988030556|
|    8| 706|         6300|42.2782552778|-83.6988030556|
|    8| 706|         7400|42.2782552778|-83.6988030556|
|    8| 706|         8400|42.2782552778|-83.6988030556|
|    8| 706|        10600|   42.2790125|-83.6989011111|
+-----+----+-------------+-------------+--------------+
only showing top 10 rows



The combination of VehId and Trip together form a unique key for our dataset. This combination allows to isolate individual vehicle trajectories. Every unique pair signifies a specific trajectory of a vehicle. Raw GPS points, while valuable, can be scattered, redundant, and lack context when viewed independently. By organizing these individual points into coherent trajectories represented by Linestrings, we enhance our ability to interpret, analyze, and apply the data in meaningful ways.

Create LineString Geometries from GPS tracks
A groupBy operation is performed on 'VehId' and 'Trip' columns to isolate individual trajectories. 
The resulting LineString essentially captures the responding vehicle's trajectory over time. 
The rows are first sorted by their timestamps to ensure the LineString follows the chronological order of the GPS data points.

A User Defined Function (UDF) is created for Spark that utilizes the function below to process Spatial DataFrame rows into LineString geometries.

In [18]:
result = sedona.sql("""
SELECT 
  VehId, 
  Trip,
  COLLECT_LIST(
    NAMED_STRUCT(
      'timestamp', `Timestamp(ms)`,
      'lat', `Latitude[deg]`,
      'lon', `Longitude[deg]`
    )
  ) AS gps_points
FROM gps_data
GROUP BY VehId, Trip
""")

In [19]:
result.printSchema()

root
 |-- VehId: integer (nullable = true)
 |-- Trip: integer (nullable = true)
 |-- gps_points: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- timestamp: integer (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)



In [20]:
@udf(returnType=ArrayType(
    StructType([
        StructField("timestamp", LongType()),
        StructField("lat", DoubleType()),
        StructField("lon", DoubleType())
    ])
))
def sort_coords(gps_points):
    return sorted(gps_points, key=lambda x: x['timestamp'])

In [23]:
# Dodajemy posortowane punkty i geometrie
with_sorted = result.withColumn("coords_sorted", sort_coords("gps_points"))

In [24]:
with_sorted.show()

[Stage 39:>                                                         (0 + 1) / 1]

+-----+----+--------------------+--------------------+
|VehId|Trip|          gps_points|       coords_sorted|
+-----+----+--------------------+--------------------+
|    8| 707|[{0, 42.277681388...|[{0, 42.277681388...|
|   10|1558|[{0, 42.277065833...|[{0, 42.277065833...|
|   10|1585|[{0, 42.2507675, ...|[{0, 42.2507675, ...|
|  116|2471|[{0, 42.275163611...|[{0, 42.275163611...|
|  116|2480|[{0, 42.277565277...|[{0, 42.277565277...|
|  116|2506|[{0, 42.277501111...|[{0, 42.277501111...|
|  124| 773|[{0, 42.264340833...|[{0, 42.264340833...|
|  124| 776|[{0, 42.281764444...|[{0, 42.281764444...|
|  128| 603|[{0, 42.305195833...|[{0, 42.305195833...|
|  133|1398|[{0, 42.2641025, ...|[{0, 42.2641025, ...|
|  133|1399|[{0, 42.230767222...|[{0, 42.230767222...|
|  140|1222|[{0, 42.276283611...|[{0, 42.276283611...|
|  150| 504|[{0, 42.274460277...|[{0, 42.274460277...|
|  155|1516|[{0, 42.245740833...|[{0, 42.245740833...|
|  155|1533|[{0, 42.284663333...|[{0, 42.284663333...|
|  155|153

                                                                                

In [25]:
@udf(returnType=GeometryType())
def make_linestring(gps_points):
    # Tworzymy pary (lon, lat)
    points = [(p['lon'], p['lat']) for p in gps_points]
    return LineString(points)

In [26]:
final_df = with_sorted.withColumn("geometry", make_linestring("coords_sorted"))

In [27]:
final_df.show()

[Stage 42:>                                                         (0 + 1) / 1]

+-----+----+--------------------+--------------------+--------------------+
|VehId|Trip|          gps_points|       coords_sorted|            geometry|
+-----+----+--------------------+--------------------+--------------------+
|    8| 707|[{0, 42.277681388...|[{0, 42.277681388...|LINESTRING (-83.6...|
|   10|1558|[{0, 42.277065833...|[{0, 42.277065833...|LINESTRING (-83.7...|
|   10|1585|[{0, 42.2507675, ...|[{0, 42.2507675, ...|LINESTRING (-83.7...|
|  116|2471|[{0, 42.275163611...|[{0, 42.275163611...|LINESTRING (-83.7...|
|  116|2480|[{0, 42.277565277...|[{0, 42.277565277...|LINESTRING (-83.7...|
|  116|2506|[{0, 42.277501111...|[{0, 42.277501111...|LINESTRING (-83.7...|
|  124| 773|[{0, 42.264340833...|[{0, 42.264340833...|LINESTRING (-83.7...|
|  124| 776|[{0, 42.281764444...|[{0, 42.281764444...|LINESTRING (-83.7...|
|  128| 603|[{0, 42.305195833...|[{0, 42.305195833...|LINESTRING (-83.6...|
|  133|1398|[{0, 42.2641025, ...|[{0, 42.2641025, ...|LINESTRING (-83.7...|
|  133|1399|

                                                                                

In [29]:
final_df.printSchema()

root
 |-- VehId: integer (nullable = true)
 |-- Trip: integer (nullable = true)
 |-- gps_points: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- timestamp: integer (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |-- coords_sorted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- timestamp: long (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |-- geometry: geometry (nullable = true)



Create a Spatial DataFrame of GPS Tracks

In [32]:
# Using row_number to generate unique IDs
window_spec = Window.partitionBy(lit(5)).orderBy("VehId", "Trip")  # Ordering by existing columns to provide some deterministic order
final_df = final_df.withColumn("ids", row_number().over(window_spec) - 1)
final_df = final_df.filter(final_df['ids'] < 10)
final_df = final_df.select("ids", "VehId", "Trip", "coords_sorted", "geometry")
final_df.show()



+---+-----+----+--------------------+--------------------+
|ids|VehId|Trip|       coords_sorted|            geometry|
+---+-----+----+--------------------+--------------------+
|  0|    8| 706|[{0, 42.277558333...|LINESTRING (-83.6...|
|  1|    8| 707|[{0, 42.277681388...|LINESTRING (-83.6...|
|  2|    8| 708|[{0, 42.261997222...|LINESTRING (-83.7...|
|  3|   10|1558|[{0, 42.277065833...|LINESTRING (-83.7...|
|  4|   10|1561|[{0, 42.286599722...|LINESTRING (-83.7...|
|  5|   10|1567|[{0, 42.281418333...|LINESTRING (-83.7...|
|  6|   10|1568|[{0, 42.281196666...|LINESTRING (-83.7...|
|  7|   10|1572|[{0, 42.286538611...|LINESTRING (-83.7...|
|  8|   10|1578|[{0, 42.275795277...|LINESTRING (-83.7...|
|  9|   10|1582|[{0, 42.275791944...|LINESTRING (-83.7...|
+---+-----+----+--------------------+--------------------+



                                                                                

In [34]:
SedonaKepler.create_map(final_df, "roads")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'roads': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'columns': ['ids', 'VehId', 'Trip', 'coords_…

Perform Map Matching

In [35]:
sedona.conf.set("wherobots.tools.mm.maxdist", "100")
sedona.conf.set("wherobots.tools.mm.maxdistinit", "100")
sedona.conf.set("wherobots.tools.mm.obsnoise", "40")

dfMmResult = matcher.match(dfEdge, final_df, "geometry", "geometry")

                                                                                

results of a map matching process on GPS trajectories:

ids: A unique identifier for each trajectory, representing a distinct vehicle journey.
observed_points: Represents the original GPS trajectories. These are the linestrings formed from the raw GPS points collected during each vehicle journey.
matched_points: The processed trajectories post map-matching. These linestrings are aligned onto the actual road network, correcting for any GPS inaccuracies.
matched_nodes: A list of node identifiers from the road network that the matched trajectory passes through. 
These nodes correspond to intersections, turns, or other significant points in the road network.

In [36]:
dfMmResult.show()

                                                                                

+---+--------------------+--------------------+--------------------+
|ids|     observed_points|      matched_points|       matched_nodes|
+---+--------------------+--------------------+--------------------+
|  3|LINESTRING (-83.7...|LINESTRING (-83.7...|[62486100, 535279...|
|  4|LINESTRING (-83.7...|LINESTRING (-83.7...|[8358943871, 6255...|
|  5|LINESTRING (-83.7...|LINESTRING (-83.7...|[6801984236, 2329...|
|  6|LINESTRING (-83.7...|LINESTRING (-83.7...|[62542029, 680179...|
|  7|LINESTRING (-83.7...|LINESTRING (-83.7...|[4939334651, 6248...|
|  8|LINESTRING (-83.7...|LINESTRING (-83.7...|[4609790945, 6250...|
|  9|LINESTRING (-83.7...|LINESTRING (-83.7...|[4609790927, 4949...|
|  0|LINESTRING (-83.6...|LINESTRING (-83.6...|[2375582419, 6250...|
|  1|LINESTRING (-83.6...|LINESTRING (-83.6...|[5366563446, 6254...|
|  2|LINESTRING (-83.7...|LINESTRING (-83.7...|[62540699, 625473...|
+---+--------------------+--------------------+--------------------+



Visualize the result using SedonaKepler

In [40]:
mapAll = SedonaKepler.create_map()

SedonaKepler.add_df(mapAll, dfEdge, name="Road Network")
SedonaKepler.add_df(mapAll, dfMmResult.selectExpr("observed_points AS geometry"), name="Observed Points")
SedonaKepler.add_df(mapAll, dfMmResult.selectExpr("matched_points AS geometry"), name="Matched Points")
# mapAll.config = map_config

mapAll

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


KeplerGl(data={'Road Network': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19…

displaying the data corresponding to 'id' value 2

In [None]:
mapFil = SedonaKepler.create_map()

SedonaKepler.add_df(mapFil, dfEdge, name="Road Network")
SedonaKepler.add_df(mapFil, dfMmResult.filter(dfMmResult['ids']==2).selectExpr("observed_points AS geometry"), name="Observed Points")
SedonaKepler.add_df(mapFil, dfMmResult.filter(dfMmResult['ids']==2).selectExpr("matched_points AS geometry"), name="Matched Points")
# mapFil.config = map_config

mapFil

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter
