In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql import functions as F

In [2]:
# Start the Spark Session
spark = SparkSession.builder \
    .appName("TaxiTrajectoryAnalysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Schema definition
schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("taxi_id", LongType(), True),
    StructField("timestamp", LongType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True)
])

# Read csv from local
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("/workspace/data/gps_cleaned.csv")

# Print Schema 
df.printSchema()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/07 17:14:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- trip_id: string (nullable = true)
 |-- taxi_id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)



In [3]:
trip_counts = df.groupBy("trip_id").count()

# Filter-out outliers
clean_trips = trip_counts.filter("count < 10000").drop("count")

df_clean = df.join(clean_trips, "trip_id", "inner")

# Aggregate paths by row
df_paths = df_clean.groupBy("trip_id").agg(
    F.collect_list(F.struct("timestamp", "latitude", "longitude")).alias("trajectory")
)
