In [60]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

ModuleNotFoundError: No module named 'sedona.sql.functions'

In [2]:
from sedona.spark import *

config = SedonaContext.builder().getOrCreate()
sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [58]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import datetime, math

In [59]:
data = [
    # USER 1
    ("u1", datetime.datetime(2025, 4, 14, 8, 0), 21.00, 52.00),
    ("u1", datetime.datetime(2025, 4, 14, 8, 1), 21.05, 52.05),   
    ("u1", datetime.datetime(2025, 4, 14, 8, 2), 21.06, 52.06),
    ("u1", datetime.datetime(2025, 4, 14, 8, 3), 21.07, 52.07),
    ("u1", datetime.datetime(2025, 4, 14, 8, 4), 21.10, 52.08),
    ("u1", datetime.datetime(2025, 4, 14, 8, 5), 21.13, 52.09),
    ("u1", datetime.datetime(2025, 4, 14, 8, 6), 21.14, 52.10),
    ("u1", datetime.datetime(2025, 4, 14, 8, 7), 21.15, 52.10),
    ("u1", datetime.datetime(2025, 4, 14, 8, 8), 21.16, 52.10),
    ("u1", datetime.datetime(2025, 4, 14, 8, 9), 21.17, 52.11),

    # USER 2
    ("u2", datetime.datetime(2025, 4, 14, 9, 0), 20.00, 51.00),
    ("u2", datetime.datetime(2025, 4, 14, 9, 2), 20.01, 51.01),
    ("u2", datetime.datetime(2025, 4, 14, 9, 3), 20.02, 51.01),
    ("u2", datetime.datetime(2025, 4, 14, 9, 4), 20.03, 51.01),
    ("u2", datetime.datetime(2025, 4, 14, 9, 5), 20.05, 51.02),
    ("u2", datetime.datetime(2025, 4, 14, 9, 6), 20.10, 51.03),
    ("u2", datetime.datetime(2025, 4, 14, 9, 7), 20.50, 51.10),  
    ("u2", datetime.datetime(2025, 4, 14, 9, 8), 20.51, 51.11),
    ("u2", datetime.datetime(2025, 4, 14, 9, 9), 20.52, 51.12),
    ("u2", datetime.datetime(2025, 4, 14, 9, 10), 20.53, 51.13),
]


In [49]:
schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("lon", DoubleType(), True),
    StructField("lat", DoubleType(), True)
])

# Tworzenie DataFrame
df = sedona.createDataFrame(data, schema)

In [50]:
df.show()

+-------+-------------------+-----+-----+
|user_id|          timestamp|  lon|  lat|
+-------+-------------------+-----+-----+
|     u1|2025-04-14 08:00:00| 21.0| 52.0|
|     u1|2025-04-14 08:01:00|21.05|52.05|
|     u1|2025-04-14 08:02:00|21.06|52.06|
|     u1|2025-04-14 08:03:00|21.07|52.07|
|     u1|2025-04-14 08:04:00| 21.1|52.08|
|     u1|2025-04-14 08:05:00|21.13|52.09|
|     u1|2025-04-14 08:06:00|21.14| 52.1|
|     u1|2025-04-14 08:07:00|21.15| 52.1|
|     u1|2025-04-14 08:08:00|21.16| 52.1|
|     u1|2025-04-14 08:09:00|21.17|52.11|
|     u2|2025-04-14 09:00:00| 20.0| 51.0|
|     u2|2025-04-14 09:02:00|20.01|51.01|
|     u2|2025-04-14 09:03:00|20.02|51.01|
|     u2|2025-04-14 09:04:00|20.03|51.01|
|     u2|2025-04-14 09:05:00|20.05|51.02|
|     u2|2025-04-14 09:06:00| 20.1|51.03|
|     u2|2025-04-14 09:07:00| 20.5| 51.1|
|     u2|2025-04-14 09:08:00|20.51|51.11|
|     u2|2025-04-14 09:09:00|20.52|51.12|
|     u2|2025-04-14 09:10:00|20.53|51.13|
+-------+-------------------+-----

In [51]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)



In [52]:
# Okno
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

# 🚀 Dodanie różnic czasowych i współrzędnych
df = df.withColumn("prev_lat", F.lag("lat").over(window_spec)) \
       .withColumn("prev_lon", F.lag("lon").over(window_spec)) \
       .withColumn("time_diff_min", 
                   (F.unix_timestamp("timestamp") - F.unix_timestamp(F.lag("timestamp").over(window_spec))) / 60)

In [61]:
df = df.withColumn("geom", ST_Point(F.col("lon"), F.col("lat")))

In [62]:
df.show()

+-------+-------------------+-----+-----+--------+--------+-------------+------------------+------------------+-------------------+
|user_id|          timestamp|  lon|  lat|prev_lat|prev_lon|time_diff_min|       distance_km|         speed_kmh|               geom|
+-------+-------------------+-----+-----+--------+--------+-------------+------------------+------------------+-------------------+
|     u1|2025-04-14 08:00:00| 21.0| 52.0|    NULL|    NULL|         NULL|              NULL|              NULL|      POINT (21 52)|
|     u1|2025-04-14 08:01:00|21.05|52.05|    52.0|    21.0|          1.0|6.5279461995612955| 391.6767719736777|POINT (21.05 52.05)|
|     u1|2025-04-14 08:02:00|21.06|52.06|   52.05|   21.05|          1.0| 1.305348806210964| 78.32092837265783|POINT (21.06 52.06)|
|     u1|2025-04-14 08:03:00|21.07|52.07|   52.06|   21.06|          1.0|1.3052686420224018| 78.31611852134411|POINT (21.07 52.07)|
|     u1|2025-04-14 08:04:00| 21.1|52.08|   52.07|   21.07|          1.0| 2.

In [79]:
SedonaKepler.create_map(df, "speed points")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
  content = self.pack(content)


KeplerGl(data={'speed points': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19…

In [64]:
def haversine(lat1, lon1, lat2, lon2):
    if None in (lat1, lon1, lat2, lon2):
        return None
    R = 6371
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    d_phi = math.radians(lat2 - lat1)
    d_lambda = math.radians(lon2 - lon1)
    a = math.sin(d_phi / 2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(d_lambda / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

haversine_udf = F.udf(haversine, DoubleType())

In [72]:
df = df.withColumn("distance_km", haversine_udf("lat", "lon", "prev_lat", "prev_lon")) \
       .withColumn("speed_kmh", (F.col("distance_km") / F.col("time_diff_min")) * 60)

In [74]:
df.select("user_id", "timestamp", "lon", "lat", "time_diff_min", "distance_km", "speed_kmh", "geom") \
  .orderBy("user_id", "timestamp") \
  .show(30, truncate=False)

+-------+-------------------+-----+-----+-------------+------------------+------------------+-------------------+
|user_id|timestamp          |lon  |lat  |time_diff_min|distance_km       |speed_kmh         |geom               |
+-------+-------------------+-----+-----+-------------+------------------+------------------+-------------------+
|u1     |2025-04-14 08:00:00|21.0 |52.0 |NULL         |NULL              |NULL              |POINT (21 52)      |
|u1     |2025-04-14 08:01:00|21.05|52.05|1.0          |6.5279461995612955|391.6767719736777 |POINT (21.05 52.05)|
|u1     |2025-04-14 08:02:00|21.06|52.06|1.0          |1.305348806210964 |78.32092837265783 |POINT (21.06 52.06)|
|u1     |2025-04-14 08:03:00|21.07|52.07|1.0          |1.3052686420224018|78.31611852134411 |POINT (21.07 52.07)|
|u1     |2025-04-14 08:04:00|21.1 |52.08|1.0          |2.332424337801054 |139.94546026806324|POINT (21.1 52.08) |
|u1     |2025-04-14 08:05:00|21.13|52.09|1.0          |2.332020608948471 |139.9212365369

In [75]:
df.filter(F.col("speed_kmh") > 100) \
  .select("user_id", "timestamp", "prev_lon", "prev_lat", "lon", "lat", "geom",
          "time_diff_min", "distance_km", "speed_kmh") \
  .orderBy("user_id", "timestamp") \
  .show(truncate=False)

+-------+-------------------+--------+--------+-----+-----+-------------------+-------------+------------------+------------------+
|user_id|timestamp          |prev_lon|prev_lat|lon  |lat  |geom               |time_diff_min|distance_km       |speed_kmh         |
+-------+-------------------+--------+--------+-----+-----+-------------------+-------------+------------------+------------------+
|u1     |2025-04-14 08:01:00|21.0    |52.0    |21.05|52.05|POINT (21.05 52.05)|1.0          |6.5279461995612955|391.6767719736777 |
|u1     |2025-04-14 08:04:00|21.07   |52.07   |21.1 |52.08|POINT (21.1 52.08) |1.0          |2.332424337801054 |139.94546026806324|
|u1     |2025-04-14 08:05:00|21.1    |52.08   |21.13|52.09|POINT (21.13 52.09)|1.0          |2.332020608948471 |139.92123653690828|
|u2     |2025-04-14 09:05:00|20.03   |51.01   |20.05|51.02|POINT (20.05 51.02)|1.0          |1.7871457858897948|107.22874715338769|
|u2     |2025-04-14 09:06:00|20.05   |51.02   |20.1 |51.03|POINT (20.1 51.03

In [76]:
df_speeding = df.filter(F.col("speed_kmh") > 100) \
  .select(
      "user_id", 
      "timestamp", 
      "prev_lon", 
      "prev_lat", 
      "geom",
      "lon", 
      "lat", 
      "time_diff_min", 
      "distance_km", 
      "speed_kmh"
  ) \
  .orderBy("user_id", "timestamp")

df_speeding.show(truncate=False)

+-------+-------------------+--------+--------+-------------------+-----+-----+-------------+------------------+------------------+
|user_id|timestamp          |prev_lon|prev_lat|geom               |lon  |lat  |time_diff_min|distance_km       |speed_kmh         |
+-------+-------------------+--------+--------+-------------------+-----+-----+-------------+------------------+------------------+
|u1     |2025-04-14 08:01:00|21.0    |52.0    |POINT (21.05 52.05)|21.05|52.05|1.0          |6.5279461995612955|391.6767719736777 |
|u1     |2025-04-14 08:04:00|21.07   |52.07   |POINT (21.1 52.08) |21.1 |52.08|1.0          |2.332424337801054 |139.94546026806324|
|u1     |2025-04-14 08:05:00|21.1    |52.08   |POINT (21.13 52.09)|21.13|52.09|1.0          |2.332020608948471 |139.92123653690828|
|u2     |2025-04-14 09:05:00|20.03   |51.01   |POINT (20.05 51.02)|20.05|51.02|1.0          |1.7871457858897948|107.22874715338769|
|u2     |2025-04-14 09:06:00|20.05   |51.02   |POINT (20.1 51.03) |20.1 |51.

In [78]:
df_speeding.count()

6

In [77]:
SedonaKepler.create_map(df_speeding, "speeding points")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


KeplerGl(data={'speeding points': {'index': [0, 1, 2, 3, 4, 5], 'columns': ['user_id', 'timestamp', 'prev_lon'…