In [None]:
import pandas as pd
import pyarrow.parquet as pa
from pathlib import Path
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import argparse
from datetime import datetime, date, time, timedelta, timezone
from zoneinfo import ZoneInfo



data_path = '/Users/athulparvelikudy/Personal/ACRTA/tech-test-data/supporting-data'
file_name_0 = 'drive/part-00000-tid-4109877695252048813-a3139a95-1807-419c-af03-4877385b4c8c-11-1-c000.snappy.parquet'
file_name_1 = 'drive/part-00001-tid-4109877695252048813-a3139a95-1807-419c-af03-4877385b4c8c-12-1-c000.snappy.parquet'
file_name_2 = 'drive/part-00002-tid-4109877695252048813-a3139a95-1807-419c-af03-4877385b4c8c-13-1-c000.snappy.parquet'
file_name_3 = 'drive/part-00003-tid-4109877695252048813-a3139a95-1807-419c-af03-4877385b4c8c-14-1-c000.snappy.parquet'
file_name_4 = 'drive/part-00004-tid-4109877695252048813-a3139a95-1807-419c-af03-4877385b4c8c-15-1-c000.snappy.parquet'
vehicle_file = 'vehicle.csv'

vehicle_csv = str(Path(data_path) / vehicle_file)

full_file_path0 = str(Path(data_path) / file_name_0)
full_file_path1 = str(Path(data_path) / file_name_1)
full_file_path2 = str(Path(data_path) / file_name_2)
full_file_path3 = str(Path(data_path) / file_name_3)
full_file_path4 = str(Path(data_path) / file_name_4)

spark = (
        SparkSession.builder
        .appName("ReadLocalParquet")
        .getOrCreate()
    )
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")



In [None]:
output_path = "/Users/athulparvelikudy/Personal/ACRTA/output/daily_trip_parquet"

In [None]:
input_date = "2017-01-07"

In [None]:
df = spark.read.parquet(full_file_path0)
df = (
    df
    .withColumn("date", F.to_date("datetime"))
    .withColumn("time", F.date_format("datetime", "HH:mm:ss"))
)


In [None]:
df.groupBy("date").count().orderBy("date").show(truncate=False)


In [None]:
df = df.filter(F.col("date") == F.lit(input_date))

In [None]:
df_vehicle = (
    spark.read
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv(vehicle_csv)
)

In [None]:


w_asc  = Window.partitionBy("trip_id").orderBy(F.col("datetime").asc())
w_desc = Window.partitionBy("trip_id").orderBy(F.col("datetime").desc())

df_start = (
    df
    .withColumn("rn", F.row_number().over(w_asc))
    .filter(F.col("rn") == 1)
    .select(
        "trip_id",
        "vehicle_spec_id",
        F.col("datetime").alias("start_datetime"),
        F.col("lat").alias("start_lat"),
        F.col("long").alias("start_long"),
    )
)

df_end = (
    df
    .withColumn("rn", F.row_number().over(w_desc))
    .filter(F.col("rn") == 1)
    .select(
        "trip_id",
        F.col("datetime").alias("end_datetime"),
        F.col("lat").alias("end_lat"),
        F.col("long").alias("end_long"),
    )
)

df_trip_summary = (
    df_start.alias("s")
    .join(df_end.alias("e"), on="trip_id", how="inner")
    .withColumn(
        "trip_duration_seconds",
        F.unix_timestamp("end_datetime") - F.unix_timestamp("start_datetime")
    )
    # Haversine distance (km) between start and end coordinates
    .withColumn("start_lat_r", F.radians("start_lat"))
    .withColumn("end_lat_r", F.radians("end_lat"))
    .withColumn("dlat", F.radians(F.col("end_lat") - F.col("start_lat")))
    .withColumn("dlon", F.radians(F.col("end_long") - F.col("start_long")))
    .withColumn(
        "a",
        F.pow(F.sin(F.col("dlat") / 2), 2)
        + F.cos("start_lat_r") * F.cos("end_lat_r") * F.pow(F.sin(F.col("dlon") / 2), 2)
    )
    .withColumn("c", 2 * F.atan2(F.sqrt("a"), F.sqrt(1 - F.col("a"))))
    .withColumn("distance_km", F.lit(6371.0) * F.col("c"))
    .drop("start_lat_r", "end_lat_r", "dlat", "dlon", "a", "c")
)

df_final = (
    df_trip_summary
    .join(
        df_vehicle.select("vehicle_spec_id", "make", "model"),
        on="vehicle_spec_id",
        how="left"
    )
)

df_final = df_final.select(
    "trip_id",
    F.to_date(F.from_utc_timestamp("end_datetime", "America/Los_Angeles")).alias("date_pst"),
    (F.col("trip_duration_seconds") / F.lit(60)).alias("trip_duration_minutes"),
    F.col("distance_km").alias("distance_travelled"),
    "make",
    "model"
)



try:
    df_existing = spark.read.parquet(output_path).cache()
    df_existing.count()

except Exception:
    df_existing = None

if df_existing is None:
    df_merged = df_final
else:
    df_merged = df_existing.filter(F.col("date_pst") != F.lit(input_date)).unionByName(df_final)

(
    df_merged
    .coalesce(1)
    .write
    .mode("overwrite")
    .parquet(output_path)
)



In [None]:
output_file_path = '/Users/athulparvelikudy/Personal/ACRTA/output/daily_trip_parquet/part-00000-23e143b4-9f9a-424f-b627-5e2254fd07a8-c000.snappy.parquet'
df_out = spark.read.parquet(output_file_path)
# df_out.show(20, truncate=False)
df_out.groupBy("date_pst").count().orderBy("date_pst").show(truncate=False)
