In [1]:
# Import
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# Start Spark Session
spark = SparkSession \
    .builder \
    .appName("Aggregation Payment Per Year") \
    .getOrCreate()

24/12/25 14:38:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Path lists
zone_lookup = "hdfs://10.128.0.59:8020/raw_data/updated_zone_lookup.csv"
fact_trip = "hdfs://10.128.0.59:8020/data_warehouse/fact_trip"
dim_vendor = "hdfs://10.128.0.59:8020/data_warehouse/dim_vendor"
dim_datetime = "hdfs://10.128.0.59:8020/data_warehouse/dim_datetime"
dim_rate_code = "hdfs://10.128.0.59:8020/data_warehouse/dim_rate_code"
dim_pickup_location = "hdfs://10.128.0.59:8020/data_warehouse/dim_pickup_location"
dim_dropoff_location = "hdfs://10.128.0.59:8020/data_warehouse/dim_dropoff_location"
dim_payment = "hdfs://10.128.0.59:8020/data_warehouse/dim_payment"

# uber-analysis-439804.query_result. + the table's name
output = "uber-analysis-439804.query_result.payment_per_year"

In [4]:
# Read data into dataframe
df_fact = spark.read \
    .format("parquet") \
    .option("path", fact_trip) \
    .load() \
    .select("trip_id", "datetimestamp_id", "payment_id")

df_datetime = spark.read \
    .format("parquet") \
    .option("path", dim_datetime) \
    .load() \
    .select("datetime_id", "pick_year")

df_payment = spark.read \
    .format("parquet") \
    .option("path", dim_payment) \
    .load() \
    .filter(~col("payment_type").isin([4, 6]))

# df_payment.show()

                                                                                

In [5]:
# SELECT
#     d.pick_year AS year,
#     p.payment_type AS payment_id,
#     p.payment_type_name AS payment_name,
#     COUNT(f.trip_id) AS total_trips
# FROM df_fact f
#     INNER JOIN df_datetime d
#         ON f.datetimestamp_id = d.datetime_id
#     INNER JOIN df_payment p
#         ON f.payment_id = p.payment_type
# WHERE p.payment_type NOT IN (4, 6);

In [6]:
# Join
df_joined = df_fact.alias("fact_data") \
    .join(df_datetime.alias("dim_datetime"),
         col("fact_data.datetimestamp_id") == col("dim_datetime.datetime_id"), "inner") \
    .join(broadcast(df_payment.alias("dim_payment")),
         col("fact_data.payment_id") == col("dim_payment.payment_type"), "inner") \
    .select(
        col("fact_data.trip_id").alias("trip_id"),
        col("dim_datetime.pick_year").alias("year"),
        col("dim_payment.payment_type").alias("payment_id"),
        col("dim_payment.payment_type_name").alias("payment_name")
    )

# df_joined.show(5, truncate = False)

# Query
df_result = df_joined.groupBy("year", "payment_id", "payment_name") \
    .agg(count("trip_id").alias("total_trips")) \
    .select(
        col("year"),
        col("payment_id"),
        col("payment_name"),
        col("total_trips")
    )

# df_result.show()

In [7]:
# Save to BigQuery
df_result.write \
    .format("bigquery") \
    .option("table", output) \
    .option("temporaryGcsBucket", "uber-pyspark-jobs/temp") \
    .mode("overwrite") \
    .save()

24/12/25 14:39:12 WARN DAGScheduler: Broadcasting large task binary with size 1047.7 KiB
24/12/25 14:47:53 WARN DAGScheduler: Broadcasting large task binary with size 1129.6 KiB
24/12/25 15:05:21 WARN DAGScheduler: Broadcasting large task binary with size 1351.0 KiB
                                                                                

In [8]:
spark.stop()