In [None]:
%pip uninstall pyspark

: 

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    input_file_name, regexp_extract, split, col, row_number, monotonically_increasing_id,
    concat_ws, to_timestamp, unix_timestamp, lag, floor, round
)
from pyspark.sql.window import Window

# Step 1: Start Spark Session
spark = SparkSession.builder.appName("GeolifeTrajectoryProcessing").getOrCreate()

# Step 2: Load all .plt files from nested directory structure
# Example path: /path/to/Data/001/Trajectory/*.plt
raw = spark.read.text("Data/*/Trajectory/*.plt") \
         .withColumn("file_path", input_file_name())

# Step 3: Remove first 6 header lines from each .plt file using row_number windowing
window_spec = Window.partitionBy("file_path").orderBy(monotonically_increasing_id())
filtered = raw.withColumn("row_num", row_number().over(window_spec)) \
              .filter(col("row_num") > 6)

# Step 4: Extract user_id (from folder name) and trajectory_id (from file name)
# Ensures we're extracting exactly the 3-digit folder directly after '/Data/'
filtered = filtered.withColumn("user_id", regexp_extract(col("file_path"), r"/Data/(\d{3})/", 1)) \
                   .withColumn("trajectory_id", regexp_extract(col("file_path"), r"Trajectory/([0-9]+)\.plt", 1))

# Step 5: Split each line into GPS fields
split_df = filtered.withColumn("fields", split(col("value"), ","))

# Step 6: Select and clean up structured columns
df = split_df.select(
    col("user_id"),
    col("trajectory_id"),
    col("fields")[0].cast("double").alias("latitude"),
    col("fields")[1].cast("double").alias("longitude"),
    col("fields")[3].cast("double").alias("altitude"),
    col("fields")[5].alias("date"),
    col("fields")[6].alias("time"),
    col("file_path")
)

# Step 7: Combine date and time into a proper timestamp column
df = df.withColumn("timestamp", to_timestamp(concat_ws(' ', col("date"), col("time")), "yyyy-MM-dd HH:mm:ss"))

# Step 8: Assign each point to a spatial zone using grid-based rounding (e.g., 0.01 degrees ~ 1km)
grid_size = 0.01
df = df.withColumn("zone_lat", round(floor(col("latitude") / grid_size) * grid_size, 2)) \
       .withColumn("zone_lon", round(floor(col("longitude") / grid_size) * grid_size, 2))

# Step 9: Add Unix timestamp for time-based calculations
df = df.withColumn("timestamp_sec", unix_timestamp("timestamp"))

# Step 10: For resampling prep — compute previous timestamp in same trajectory
trajectory_window = Window.partitionBy("user_id", "trajectory_id").orderBy("timestamp")
df = df.withColumn("prev_timestamp_sec", lag("timestamp_sec", 1).over(trajectory_window))

# Resample — Keep only points at least 10 seconds apart
# df = df.filter((col("timestamp_sec") - col("prev_timestamp_sec") >= 10) | col("prev_timestamp_sec").isNull())

# Step 11: Show the processed results
df.show(truncate=False)


25/05/03 14:12:30 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
                                                                                

+-------+--------------+---------+----------+--------+----------+--------+-------------------------------------------------------------------+-------------------+--------+--------+-------------+------------------+
|user_id|trajectory_id |latitude |longitude |altitude|date      |time    |file_path                                                          |timestamp          |zone_lat|zone_lon|timestamp_sec|prev_timestamp_sec|
+-------+--------------+---------+----------+--------+----------+--------+-------------------------------------------------------------------+-------------------+--------+--------+-------------+------------------+
|000    |20081027115449|39.994622|116.326757|492.0   |2008-10-27|11:54:49|file:///Users/ydub/Downloads/Data/000/Trajectory/20081027115449.plt|2008-10-27 11:54:49|39.99   |116.32  |1225108489   |NULL              |
|000    |20081027115449|39.994622|116.326757|492.0   |2008-10-27|11:54:52|file:///Users/ydub/Downloads/Data/000/Trajectory/20081027115449.plt|20

In [6]:
# Drop file_path now that it's no longer needed
df = df.drop("file_path")

df.show(truncate=False)



+-------+--------------+---------+----------+--------+----------+--------+-------------------+--------+--------+-------------+------------------+------------+
|user_id|trajectory_id |latitude |longitude |altitude|date      |time    |timestamp          |zone_lat|zone_lon|timestamp_sec|prev_timestamp_sec|zone_id     |
+-------+--------------+---------+----------+--------+----------+--------+-------------------+--------+--------+-------------+------------------+------------+
|000    |20081027115449|39.994622|116.326757|492.0   |2008-10-27|11:54:49|2008-10-27 11:54:49|39.99   |116.32  |1225108489   |NULL              |39.99_116.32|
|000    |20081027115449|39.994622|116.326757|492.0   |2008-10-27|11:54:52|2008-10-27 11:54:52|39.99   |116.32  |1225108492   |1225108489        |39.99_116.32|
|000    |20081027115449|39.994614|116.326751|492.0   |2008-10-27|11:54:54|2008-10-27 11:54:54|39.99   |116.32  |1225108494   |1225108492        |39.99_116.32|
|000    |20081027115449|39.994602|116.326769|4


                                                                                