In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

path = "/Volumes/workspace/default/metcs777termproject/Geolife Trajectories 1.3/Geolife Trajectories 1.3/Data/**/**/*.plt"

#Read in Raw Data
df_raw = (
    spark.read.format("text")
         .load(path)
         .select(
             F.col("value"),
             F.col("_metadata")["file_path"].alias("file_path")
         )
)

# 2. Skip the first 6 header lines per file
w = Window.partitionBy("file_path").orderBy(F.monotonically_increasing_id())

df_clean = (
    df_raw
      .withColumn("rn", F.row_number().over(w))
      .filter(F.col("rn") > 6)
      .drop("rn")
)

# 3. Split the lines into fields
df_split = (
    df_clean
      .withColumn("parts", F.split(F.col("value"), ","))

      .withColumn(
          "user_id",
          F.regexp_extract("file_path", r"/Data/([^/]+)/Trajectory/", 1)
      )

      .withColumn(
          "traj_id",
          F.regexp_extract("file_path", r"/Trajectory/([^/]+)\.plt$", 1)
      )

      .select(
          "user_id",
          "traj_id",
          F.col("parts")[0].cast("double").alias("lat"),
          F.col("parts")[1].cast("double").alias("lon"),
          F.col("parts")[4].cast("double").alias("altitude"),
          F.col("parts")[5].alias("date"),
          F.col("parts")[6].alias("time")
      )
)

# 4. Combine date + time into a timestamp
df_points = df_split.withColumn(
    "timestamp",
    F.to_timestamp(F.concat_ws(" ", F.col("date"), F.col("time")), "yyyy-MM-dd HH:mm:ss")
)

df_points.show(10)

+-------+--------------+---------+----------+----------------+----------+--------+-------------------+
|user_id|       traj_id|      lat|       lon|        altitude|      date|    time|          timestamp|
+-------+--------------+---------+----------+----------------+----------+--------+-------------------+
|    000|20081103101336|39.999976|116.326565|39755.4261111111|2008-11-03|10:13:36|2008-11-03 10:13:36|
|    000|20081103101336|40.000032|116.326175|39755.4261689815|2008-11-03|10:13:41|2008-11-03 10:13:41|
|    000|20081103101336|40.000038|116.326154|39755.4262268519|2008-11-03|10:13:46|2008-11-03 10:13:46|
|    000|20081103101336|40.000035|116.326081|39755.4262847222|2008-11-03|10:13:51|2008-11-03 10:13:51|
|    000|20081103101336|39.996877|116.326645|39755.4276736111|2008-11-03|10:15:51|2008-11-03 10:15:51|
|    000|20081103101336|39.996825|116.326536|39755.4277314815|2008-11-03|10:15:56|2008-11-03 10:15:56|
|    000|20081103101336|39.996785|116.326341|39755.4277893519|2008-11-03|

In [0]:

# Read in the label data and split lines into fields
labels_path = "/Volumes/workspace/default/metcs777termproject/Geolife Trajectories 1.3/Geolife Trajectories 1.3/Data/**/*.txt"
df_labels = (
    spark.read.format("csv")
        .option("header", "true")
        .option("sep", "\t")
        .load(labels_path)
        .select(
            F.col("_metadata")["file_path"].alias("label_file_path"),
            F.col("Start Time").alias("start_ts_str"),
            F.col("End Time").alias("end_ts_str"),
            F.col("Transportation Mode").alias("mode")
        )
        .withColumn(
            "user_id",
            F.regexp_extract("label_file_path", r"/Data/([^/]+)/", 1)
        )
        .withColumn("start_ts", F.to_timestamp("start_ts_str", "yyyy/MM/dd HH:mm:ss"))
        .withColumn("end_ts",   F.to_timestamp("end_ts_str",   "yyyy/MM/dd HH:mm:ss"))
        .drop("label_file_path") 
)

#Fix the taxi as car for classification purpose while both is driving behaviour
df_label_fixed = df_labels.replace({"taxi":"car"},subset=["mode"])
df_label_fixed.show(10)

+-------------------+-------------------+----+-------+-------------------+-------------------+
|       start_ts_str|         end_ts_str|mode|user_id|           start_ts|             end_ts|
+-------------------+-------------------+----+-------+-------------------+-------------------+
|2007/04/12 05:03:18|2007/04/12 05:34:45|bike|    163|2007-04-12 05:03:18|2007-04-12 05:34:45|
|2007/04/12 10:21:16|2007/04/12 14:56:56|bike|    163|2007-04-12 10:21:16|2007-04-12 14:56:56|
|2007/04/13 00:53:06|2007/04/13 01:06:15|bike|    163|2007-04-13 00:53:06|2007-04-13 01:06:15|
|2007/04/13 05:19:33|2007/04/13 05:27:34| car|    163|2007-04-13 05:19:33|2007-04-13 05:27:34|
|2007/04/13 07:01:26|2007/04/13 07:18:02| car|    163|2007-04-13 07:01:26|2007-04-13 07:18:02|
|2007/04/13 14:50:00|2007/04/13 15:05:00|walk|    163|2007-04-13 14:50:00|2007-04-13 15:05:00|
|2007/04/14 04:30:12|2007/04/14 04:33:22|walk|    163|2007-04-14 04:30:12|2007-04-14 04:33:22|
|2007/04/15 04:09:53|2007/04/15 04:33:58|walk|    

In [0]:
df_label_fixed.select("mode").distinct().show()

+----------+
|      mode|
+----------+
|      boat|
|  airplane|
|      bike|
|      walk|
|       car|
|motorcycle|
|       bus|
|    subway|
|       run|
|     train|
+----------+



In [0]:
from pyspark.sql.functions import radians, sin, cos, sqrt, atan2

w_feat = Window.partitionBy("user_id","traj_id").orderBy("timestamp")

df_feat = (
    df_points
      .withColumn("lat_prev",  F.lag("lat").over(w_feat))
      .withColumn("lon_prev",  F.lag("lon").over(w_feat))
      .withColumn("t_prev",    F.lag("timestamp").over(w_feat))
)

# Î”t
df_feat = df_feat.withColumn(
    "dt",
    F.unix_timestamp("timestamp") - F.unix_timestamp("t_prev")
).withColumn(
    "dt",
    F.when(F.col("dt") <= 0, None).otherwise(F.col("dt"))
)

# Haversine distance
R = 6371000.0  # Earth Radius in meters
df_feat = (
    df_feat
      .withColumn("lat_r",      radians("lat"))
      .withColumn("lon_r",      radians("lon"))
      .withColumn("lat_prev_r", radians("lat_prev"))
      .withColumn("lon_prev_r", radians("lon_prev"))
      .withColumn("dlat", F.col("lat_r") - F.col("lat_prev_r"))
      .withColumn("dlon", F.col("lon_r") - F.col("lon_prev_r"))
      .withColumn(
          "a",
          sin(F.col("dlat")/2)**2 +
          cos("lat_prev_r") * cos("lat_r") * sin(F.col("dlon")/2)**2
      )
      .withColumn("c", 2 * atan2(sqrt("a"), sqrt(1 - F.col("a"))))
      .withColumn("distance", R * F.col("c"))
)
# Speed and Acceleration
df_feat = df_feat.withColumn(
    "speed",
    F.try_divide(F.col("distance"), F.col("dt"))
).withColumn(
    "speed_prev", 
    F.coalesce(F.lag("speed").over(w_feat), F.lit(0))
).withColumn(
    "accel",
    F.try_divide(F.col("speed") - F.col("speed_prev"), F.col("dt"))
)
# Stop duration per step
df_feat = df_feat.withColumn(
    "stop_time",
    F.when(F.col("speed") < 0.5, F.col("dt")).otherwise(0.0)
)
df_feat.show(10)

+-------+--------------+---------+----------+----------------+----------+--------+-------------------+---------+----------+-------------------+----+------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+---------+
|user_id|       traj_id|      lat|       lon|        altitude|      date|    time|          timestamp| lat_prev|  lon_prev|             t_prev|  dt|             lat_r|             lon_r|        lat_prev_r|        lon_prev_r|                dlat|                dlon|                   a|                   c|          distance|             speed|        speed_prev|               accel|stop_time|
+-------+--------------+---------+----------+----------------+----------+--------+-------------------+---------+----------+-------------------+----+------------------+------------------+------------------+-

In [0]:
#Join the labels into the feature dataframe and keeps the required columns only
df_feat_labeled = (
    df_feat.alias("p")
      .join(
          df_label_fixed.alias("l"),
          (F.col("p.user_id") == F.col("l.user_id")) &
          (F.col("p.timestamp") >= F.col("l.start_ts")) &
          (F.col("p.timestamp") < F.col("l.end_ts")),
          how="left"
      )
      .select(
          "p.user_id",
          "p.traj_id",
          "p.timestamp",
          "p.speed",
          "p.accel",
          "p.stop_time",
          "p.distance",
          "l.mode"
      )
)

In [0]:
#Seperated the labeled and unlabeled data into two different dataframes
df_feat_labeled_only = df_feat_labeled.filter(F.col("mode").isNotNull())
df_feat_labeled_only.show(10)

+-------+--------------+-------------------+------------------+--------------------+---------+------------------+-----+
|user_id|       traj_id|          timestamp|             speed|               accel|stop_time|          distance| mode|
+-------+--------------+-------------------+------------------+--------------------+---------+------------------+-----+
|    010|20080328160001|2008-03-29 01:32:52|  0.21469416856395|-5.32441555508695...|  12693.0| 2725.113081582217|train|
|    010|20080328160001|2008-03-29 01:33:13|17.285932499027563|  0.8129161109744578|      0.0|363.00458247957886|train|
|    010|20080328160001|2008-03-29 01:34:13|15.259862563459313|-0.03376783225947084|      0.0| 915.5917538075588|train|
|    010|20080328160001|2008-03-29 01:35:12|16.848716376175968| 0.02692972563926534|      0.0| 994.0742661943822|train|
|    010|20080328160001|2008-03-29 01:36:11|19.439675933384724| 0.04391456876625009|      0.0|1146.9408800696988|train|
|    010|20080328160001|2008-03-29 01:37

In [0]:
#Seperated the labeled and unlabeled data into two different dataframes
df_feat_unlabeled_only = df_feat_labeled.filter(F.col("mode").isNull())
df_feat_unlabeled_only.show(10)

+-------+--------------+-------------------+------------------+--------------------+---------+------------------+----+
|user_id|       traj_id|          timestamp|             speed|               accel|stop_time|          distance|mode|
+-------+--------------+-------------------+------------------+--------------------+---------+------------------+----+
|    000|20081110013637|2008-11-10 01:36:37|              NULL|                NULL|      0.0|              NULL|NULL|
|    000|20081110013637|2008-11-10 01:36:42|1.5830595206607072| 0.31661190413214146|      0.0| 7.915297603303536|NULL|
|    000|20081110013637|2008-11-10 01:36:47|0.9275780424581349|-0.13109629564051445|      0.0| 4.637890212290674|NULL|
|    000|20081110013637|2008-11-10 01:36:52|0.9640095051297042|0.007286292534313854|      0.0| 4.820047525648521|NULL|
|    000|20081110013637|2008-11-10 01:36:57|1.9541359652474106|  0.1980252920235413|      0.0| 9.770679826237053|NULL|
|    000|20081110013637|2008-11-10 01:37:02|2.43

In [0]:
# Aggregate per-trajectory descriptive statistics for labeled data
# For each (user_id, traj_id, mode), compute distance, speed, acceleration, stop duration, start/end timestamps, trip duration, and mean speed.
# Save the resulting trajectory-level dataset as a Parquet file.
df_traj_labeled = (
    df_feat_labeled_only.groupBy("user_id", "traj_id", "mode")
    .agg(
          F.sum("distance").alias("total_distance_m"),
          F.max("speed").alias("max_speed"),
          F.expr("percentile(speed, 0.5)").alias("median_speed"),
          F.variance("speed").alias("var_speed"),
          F.mean("accel").alias("mean_accel"),
          F.max("accel").alias("max_accel"),
          F.sum("stop_time").alias("stop_duration_seconds"),
          F.min("timestamp").alias("start_time"),
          F.max("timestamp").alias("end_time")
      )
    .withColumn(
        "duration_seconds",
        F.col("end_time").cast("long") - F.col("start_time").cast("long")
    )
    .filter(F.col("duration_seconds") > 0)
    .withColumn(
        "mean_speed_calculated",
        F.try_divide(F.col("total_distance_m"), F.col("duration_seconds"))
    )
)
df_traj_labeled.write.mode("overwrite").parquet("/Volumes/workspace/default/metcs777termproject/df_traj_labeled_parquet")

In [0]:
# Aggregate per-trajectory descriptive statistics for unlabeled data
# For each (user_id, traj_id, mode), compute distance, speed, acceleration, stop duration, start/end timestamps, trip duration, and mean speed.
# Save the resulting trajectory-level dataset as a Parquet file.
df_traj_unlabeled = (
    df_feat_unlabeled_only.groupBy("user_id", "traj_id")
    .agg(
          F.sum("distance").alias("total_distance_m"),
          F.max("speed").alias("max_speed"),
          F.expr("percentile(speed, 0.5)").alias("median_speed"),
          F.variance("speed").alias("var_speed"),
          F.mean("accel").alias("mean_accel"),
          F.max("accel").alias("max_accel"),
          F.sum("stop_time").alias("stop_duration_seconds"),
          F.min("timestamp").alias("start_time"),
          F.max("timestamp").alias("end_time")
      )
    .withColumn(
        "duration_seconds",
        F.col("end_time").cast("long") - F.col("start_time").cast("long")
    )
    .filter(F.col("duration_seconds") > 0)
    .withColumn(
        "mean_speed_calculated",
        F.try_divide(F.col("total_distance_m"), F.col("duration_seconds"))
    )
)
df_traj_unlabeled.write.mode("overwrite").parquet("/Volumes/workspace/default/metcs777termproject/df_traj_unlabeled_parquet")

In [0]:
# Load back the parquet files to check whether the file is correctly outputted
df_p = spark.read.parquet("/Volumes/workspace/default/metcs777termproject/df_traj_labeled_parquet/")
df_p.show(20)

+-------+--------------+------+------------------+------------------+------------------+-------------------+--------------------+-------------------+---------------------+-------------------+-------------------+----------------+---------------------+
|user_id|       traj_id|  mode|  total_distance_m|         max_speed|      median_speed|          var_speed|          mean_accel|          max_accel|stop_duration_seconds|         start_time|           end_time|duration_seconds|mean_speed_calculated|
+-------+--------------+------+------------------+------------------+------------------+-------------------+--------------------+-------------------+---------------------+-------------------+-------------------+----------------+---------------------+
|    010|20080329160048| train| 41009.78506013511| 31.31921360124908|23.831840067025766| 116.92387986602569| 0.00435165959825546| 0.3839747201051625|                 59.0|2008-03-29 16:00:48|2008-03-29 16:33:24|            1956|   20.9661477812551

In [0]:
# Load back the parquet files to check whether the file is correctly outputted
df_u = spark.read.parquet("/Volumes/workspace/default/metcs777termproject/df_traj_unlabeled_parquet/")
df_u.show(20)

+-------+--------------+------------------+------------------+------------------+------------------+--------------------+------------------+---------------------+-------------------+-------------------+----------------+---------------------+
|user_id|       traj_id|  total_distance_m|         max_speed|      median_speed|         var_speed|          mean_accel|         max_accel|stop_duration_seconds|         start_time|           end_time|duration_seconds|mean_speed_calculated|
+-------+--------------+------------------+------------------+------------------+------------------+--------------------+------------------+---------------------+-------------------+-------------------+----------------+---------------------+
|    000|20081029093038|3490.3734598028113| 21.71761817108005|1.4315351366822713|24.265368016061565|0.004532475327346371| 1.611890447605606|                225.0|2008-10-29 09:30:38|2008-10-29 09:46:43|             965|   3.6169673158578357|
|    000|20081103232153|14947.29