The purpose of this script is to convert timeseries GPS data to Linestring format. The conversion will be performed:
1. via UDF functions using decorators.
2. performing UDF function registration in PySpark.

The output is a csv file from GPS Tracks Data from VED Dataset (Vehicle Energy Dataset). VED is a comprehensive dataset capturing GPS trajectories of 383 vehicles (including gasoline vehicles, HEVs, and PHEV/EVs) in Ann Arbor, Michigan, USA, from Nov 2017 to Nov 2018. The data spans ~374,000 miles and includes details on fuel, energy, speed, and auxiliary power usage. Driving scenarios cover diverse conditions, from highways to traffic-dense downtown areas, across different seasons.

Source: "Vehicle Energy Dataset (VED), A Large-scale Dataset for Vehicle Energy Consumption Research" by Geunseob (GS) Oh, David J. LeBlanc, Huei Peng. Published in IEEE Transactions on Intelligent Transportation Systems (T-ITS), 2020.

GitHub: https://github.com/gsoh/VED

In [1]:
from shapely.geometry import LineString
from pyspark.sql.window import Window
from pyspark.sql.functions import col, expr, udf, collect_list, struct, row_number, lit
from sedona.spark import *
from pyspark.sql.functions import udf
from sedona.sql.types import GeometryType
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, DoubleType

In [2]:
config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [3]:
df = sedona.read.csv("s3://wherobots-examples/data/VED_171101_week.csv", header=True, inferSchema=True)

                                                                                

In [4]:
df = df.select(['VehId', 'Trip', 'Timestamp(ms)','Latitude[deg]', 'Longitude[deg]'])

In [5]:
df.createOrReplaceTempView("gps_data")

In [6]:
df.show()

+-----+----+-------------+-------------+--------------+
|VehId|Trip|Timestamp(ms)|Latitude[deg]|Longitude[deg]|
+-----+----+-------------+-------------+--------------+
|    8| 706|            0|42.2775583333|-83.6987497222|
|    8| 706|          200|42.2775583333|-83.6987497222|
|    8| 706|         1100|42.2775583333|-83.6987497222|
|    8| 706|         2100|42.2775583333|-83.6987497222|
|    8| 706|         4200|42.2775583333|-83.6987497222|
|    8| 706|         5200|42.2782552778|-83.6988030556|
|    8| 706|         6300|42.2782552778|-83.6988030556|
|    8| 706|         7400|42.2782552778|-83.6988030556|
|    8| 706|         8400|42.2782552778|-83.6988030556|
|    8| 706|        10600|   42.2790125|-83.6989011111|
|    8| 706|        11700|   42.2790125|-83.6989011111|
|    8| 706|        12800|   42.2790125|-83.6989011111|
|    8| 706|        13800|   42.2790125|-83.6989011111|
|    8| 706|        14900|   42.2790125|-83.6989011111|
|    8| 706|        16000|42.2798258333|   -83.6

In [9]:
df.printSchema()

root
 |-- VehId: integer (nullable = true)
 |-- Trip: integer (nullable = true)
 |-- Timestamp(ms): integer (nullable = true)
 |-- Latitude[deg]: double (nullable = true)
 |-- Longitude[deg]: double (nullable = true)



1. UDF functions using decorators

In [10]:
result = sedona.sql("""
SELECT 
  VehId, 
  Trip,
  COLLECT_LIST(
    NAMED_STRUCT(
      'timestamp', `Timestamp(ms)`,
      'lat', `Latitude[deg]`,
      'lon', `Longitude[deg]`
    )
  ) AS gps_points
FROM gps_data
GROUP BY VehId, Trip
""")

In [11]:
result.printSchema()

root
 |-- VehId: integer (nullable = true)
 |-- Trip: integer (nullable = true)
 |-- gps_points: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- timestamp: integer (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)



COLLECT_LIST(...) AS gps_points:

Collects all rows in each group (VehId, Trip) into one list.

Each element of the list is:

NAMED_STRUCT('timestamp', ..., 'lat', ..., 'lon', ...)

That is, a structure (record) with three fields: timestamp, lat, lon.

NAMED_STRUCT(...):

Creates a structure (something like an object or dictionary) with named fields.

We use the data from the original columns:

Timestamp(ms) → as timestamp

Latitude[deg] → as lat

Longitude[deg] → as lon

Each row represents one vehicle trip and contains a list of all GPS points from that trip (in any order – sorting is done separately later).

In [12]:
result.show()



+-----+----+--------------------+
|VehId|Trip|          gps_points|
+-----+----+--------------------+
|    8| 707|[{0, 42.277681388...|
|   10|1558|[{0, 42.277065833...|
|   10|1585|[{0, 42.2507675, ...|
|  116|2471|[{0, 42.275163611...|
|  116|2480|[{0, 42.277565277...|
|  116|2506|[{0, 42.277501111...|
|  124| 773|[{0, 42.264340833...|
|  124| 776|[{0, 42.281764444...|
|  128| 603|[{0, 42.305195833...|
|  133|1398|[{0, 42.2641025, ...|
|  133|1399|[{0, 42.230767222...|
|  140|1222|[{0, 42.276283611...|
|  150| 504|[{0, 42.274460277...|
|  155|1516|[{0, 42.245740833...|
|  155|1533|[{0, 42.284663333...|
|  155|1534|[{363000, 42.2470...|
|  156| 912|[{0, 42.309303055...|
|  156| 913|[{0, 42.293580555...|
|  161| 360|[{0, 42.2355225, ...|
|  174|1145|[{0, 42.244144722...|
+-----+----+--------------------+
only showing top 20 rows



                                                                                

In the next steps gps_points: are sorted by timestamp, transformed into LineString (i.e. route geometry)

This code defines and uses a User Defined Function (UDF) in PySpark to sort a list of GPS points by time (timestamp).

@udf(...) decorator
1. Registers the sort_coords function as a UDF (User Defined Function) in PySpark.

returnType=ArrayType(StructType(...)):

Specifies that this function returns a list of structures (i.e. Array of Structs), where each structure contains:

timestamp: integer (LongType)

lat: floating point number (DoubleType)

lon: floating point number (DoubleType)

2. The sort_coords function
Takes as an argument the gps_points column, which is a list of structures generated earlier (in result).

Sorts this list by the timestamp field so that the GPS points are in time order.

Returns a sorted list of structures.

In [14]:
@udf(returnType=ArrayType(
    StructType([
        StructField("timestamp", LongType()),
        StructField("lat", DoubleType()),
        StructField("lon", DoubleType())
    ])
))
def sort_coords(gps_points):
    return sorted(gps_points, key=lambda x: x['timestamp'])

In [15]:
# Dodajemy posortowane punkty i geometrie
with_sorted = result.withColumn("coords_sorted", sort_coords("gps_points"))

In [16]:
with_sorted.show()

[Stage 11:>                                                         (0 + 1) / 1]

+-----+----+--------------------+--------------------+
|VehId|Trip|          gps_points|       coords_sorted|
+-----+----+--------------------+--------------------+
|    8| 707|[{0, 42.277681388...|[{0, 42.277681388...|
|   10|1558|[{0, 42.277065833...|[{0, 42.277065833...|
|   10|1585|[{0, 42.2507675, ...|[{0, 42.2507675, ...|
|  116|2471|[{0, 42.275163611...|[{0, 42.275163611...|
|  116|2480|[{0, 42.277565277...|[{0, 42.277565277...|
|  116|2506|[{0, 42.277501111...|[{0, 42.277501111...|
|  124| 773|[{0, 42.264340833...|[{0, 42.264340833...|
|  124| 776|[{0, 42.281764444...|[{0, 42.281764444...|
|  128| 603|[{0, 42.305195833...|[{0, 42.305195833...|
|  133|1398|[{0, 42.2641025, ...|[{0, 42.2641025, ...|
|  133|1399|[{0, 42.230767222...|[{0, 42.230767222...|
|  140|1222|[{0, 42.276283611...|[{0, 42.276283611...|
|  150| 504|[{0, 42.274460277...|[{0, 42.274460277...|
|  155|1516|[{0, 42.245740833...|[{0, 42.245740833...|
|  155|1533|[{0, 42.284663333...|[{0, 42.284663333...|
|  155|153

                                                                                

In GPS data, the order of points matters - without sorting, the route can be illogical (e.g. jump around in time or "jump" on the map).

Sorting before creating the geometry (LineString) ensures that the line is temporally and spatially correct.

@udf(returnType=GeometryType()) decorator:

1. Registers a function as a UDF in Spark SQL.

GeometryType() means that the result of this function is a spatial (geometric) object - here: LineString (a line connecting points).

2. The make_linestring function:

Takes a list of GPS points (gps_points), which are already sorted by time.

Each point is a dictionary with lat and lon fields.

Creates a list of coordinates in the format (lon, lat) - this is important because LineString in a spatial library (e.g. Shapely, Sedona) expects the order (x, y) → (longitude, latitude).

Creates a LineString(points) object, which represents the route.

In [17]:
@udf(returnType=GeometryType())
def make_linestring(gps_points):
    # Tworzymy pary (lon, lat)
    points = [(p['lon'], p['lat']) for p in gps_points]
    return LineString(points)

In [18]:
final_df = with_sorted.withColumn("geometry", make_linestring("coords_sorted"))

In [19]:
final_df.show()

[Stage 14:>                                                         (0 + 1) / 1]

+-----+----+--------------------+--------------------+--------------------+
|VehId|Trip|          gps_points|       coords_sorted|            geometry|
+-----+----+--------------------+--------------------+--------------------+
|    8| 707|[{0, 42.277681388...|[{0, 42.277681388...|LINESTRING (-83.6...|
|   10|1558|[{0, 42.277065833...|[{0, 42.277065833...|LINESTRING (-83.7...|
|   10|1585|[{0, 42.2507675, ...|[{0, 42.2507675, ...|LINESTRING (-83.7...|
|  116|2471|[{0, 42.275163611...|[{0, 42.275163611...|LINESTRING (-83.7...|
|  116|2480|[{0, 42.277565277...|[{0, 42.277565277...|LINESTRING (-83.7...|
|  116|2506|[{0, 42.277501111...|[{0, 42.277501111...|LINESTRING (-83.7...|
|  124| 773|[{0, 42.264340833...|[{0, 42.264340833...|LINESTRING (-83.7...|
|  124| 776|[{0, 42.281764444...|[{0, 42.281764444...|LINESTRING (-83.7...|
|  128| 603|[{0, 42.305195833...|[{0, 42.305195833...|LINESTRING (-83.6...|
|  133|1398|[{0, 42.2641025, ...|[{0, 42.2641025, ...|LINESTRING (-83.7...|
|  133|1399|

                                                                                

In [20]:
final_df.printSchema()

root
 |-- VehId: integer (nullable = true)
 |-- Trip: integer (nullable = true)
 |-- gps_points: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- timestamp: integer (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |-- coords_sorted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- timestamp: long (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lon: double (nullable = true)
 |-- geometry: geometry (nullable = true)



This step turns GPS (point) data into spatial (geometric) data that can:

be visualized on a map,

analyzed (e.g. route length, intersections, proximity),

saved to GIS formats (e.g. GeoJSON, WKT, shapefile),

combined with other spatial data.

2. Second method with performing UDF function registration in PySpark.

In [21]:
df.show()

+-----+----+-------------+-------------+--------------+
|VehId|Trip|Timestamp(ms)|Latitude[deg]|Longitude[deg]|
+-----+----+-------------+-------------+--------------+
|    8| 706|            0|42.2775583333|-83.6987497222|
|    8| 706|          200|42.2775583333|-83.6987497222|
|    8| 706|         1100|42.2775583333|-83.6987497222|
|    8| 706|         2100|42.2775583333|-83.6987497222|
|    8| 706|         4200|42.2775583333|-83.6987497222|
|    8| 706|         5200|42.2782552778|-83.6988030556|
|    8| 706|         6300|42.2782552778|-83.6988030556|
|    8| 706|         7400|42.2782552778|-83.6988030556|
|    8| 706|         8400|42.2782552778|-83.6988030556|
|    8| 706|        10600|   42.2790125|-83.6989011111|
|    8| 706|        11700|   42.2790125|-83.6989011111|
|    8| 706|        12800|   42.2790125|-83.6989011111|
|    8| 706|        13800|   42.2790125|-83.6989011111|
|    8| 706|        14900|   42.2790125|-83.6989011111|
|    8| 706|        16000|42.2798258333|   -83.6

Create LineString Geometries from GPS tracks
A groupBy operation is performed on 'VehId' and 'Trip' columns to isolate individual trajectories. 
The resulting LineString essentially captures the responding vehicle's trajectory over time. 
The rows are first sorted by their timestamps to ensure the LineString follows the chronological order of the GPS data points.

A User Defined Function (UDF) is created for Spark that utilizes the function below to process Spatial DataFrame rows into LineString geometries.

In [22]:
def rows_to_linestring(rows):
    sorted_rows = sorted(rows, key=lambda x: x['Timestamp(ms)'])
    coords = [(row['Longitude[deg]'], row['Latitude[deg]']) for row in sorted_rows]
    linestring = LineString(coords)
    return linestring

linestring_udf = udf(rows_to_linestring, GeometryType())

In [23]:
# Group by VehId and Trip and aggregate
dfPath = (df
          .groupBy("VehId", "Trip")
          .agg(collect_list(struct("Timestamp(ms)", "Latitude[deg]", "Longitude[deg]")).alias("coords"))
          .withColumn("geometry", linestring_udf("coords"))
         )

In [24]:
dfPath.show()

+-----+----+--------------------+--------------------+
|VehId|Trip|              coords|            geometry|
+-----+----+--------------------+--------------------+
|    8| 707|[{0, 42.277681388...|LINESTRING (-83.6...|
|   10|1558|[{0, 42.277065833...|LINESTRING (-83.7...|
|   10|1585|[{0, 42.2507675, ...|LINESTRING (-83.7...|
|  116|2471|[{0, 42.275163611...|LINESTRING (-83.7...|
|  116|2480|[{0, 42.277565277...|LINESTRING (-83.7...|
|  116|2506|[{0, 42.277501111...|LINESTRING (-83.7...|
|  124| 773|[{0, 42.264340833...|LINESTRING (-83.7...|
|  124| 776|[{0, 42.281764444...|LINESTRING (-83.7...|
|  128| 603|[{0, 42.305195833...|LINESTRING (-83.6...|
|  133|1398|[{0, 42.2641025, ...|LINESTRING (-83.7...|
|  133|1399|[{0, 42.230767222...|LINESTRING (-83.6...|
|  140|1222|[{0, 42.276283611...|LINESTRING (-83.7...|
|  150| 504|[{0, 42.274460277...|LINESTRING (-83.7...|
|  155|1516|[{0, 42.245740833...|LINESTRING (-83.6...|
|  155|1533|[{0, 42.284663333...|LINESTRING (-83.8...|
|  155|153

                                                                                