## Trip Data Aggregation 
### Group By Columns
1. year
2. Month
3. Pickup Location ID
4. Drop Off Location ID

### Aggregated Columns
1. Total Trip Count
2. Total Fare Amount

### Purpose of the notebook

Demonstrate the integration between Spark Pool and Serverless SQL Pool

1. Create the aggregated table in Spark Pool
2. Access the data from Serverless SQL Pool 

In [5]:
#Set the folder paths so that it can be used later. 
bronze_folder_path = 'abfss://nyc-taxi-data@synapsecoursedlnikolas.dfs.core.windows.net/raw'
silver_folder_path = 'abfss://nyc-taxi-data@synapsecoursedlnikolas.dfs.core.windows.net/silver'
gold_folder_path = 'abfss://nyc-taxi-data@synapsecoursedlnikolas.dfs.core.windows.net/gold'

In [6]:
#Set the spark config to be able to get the partitioned columns year and month as strings rather than integers
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")

In [7]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc_taxi_ldw_spark
LOCATION 'abfss://nyc-taxi-data@synapsecoursedlnikolas.dfs.core.windows.net/gold';

In [8]:
from pyspark.sql.functions import col, when, sum, to_date


# Load the data from the views
trip_data_green_df = spark.read.parquet(f"{silver_folder_path}/trip_data_green") 
taxi_zone_df = spark.read.parquet(f"{silver_folder_path}/taxi_zone")
calendar_df = spark.read.parquet(f"{silver_folder_path}/calendar")
payment_type_df = spark.read.parquet(f"{silver_folder_path}/payment_type")

# Perform the join operations
result_df = trip_data_green_df.alias("td") \
    .join(taxi_zone_df.alias("tz"), col("td.pu_location_id") == col("tz.location_id")) \
    .join(calendar_df.alias("cal"), to_date(col("td.lpep_pickup_datetime")) == col("cal.date")) \
    .join(payment_type_df.alias("pt"), col("td.payment_type") == col("pt.payment_type")) \
    .groupBy(
        col("td.year"),
        col("td.month"),
        col("tz.borough"),
        to_date(col("td.lpep_pickup_datetime")).alias("trip_date"),
        col("cal.day_name")
    ) \
    .agg(
        sum(when(col("pt.description") == 'Credit card', 1).otherwise(0)).alias("card_trip_count"),
        sum(when(col("pt.description") == 'Cash', 1).otherwise(0)).alias("cash_trip_count"),
        when(col("cal.day_name").isin('Saturday', 'Sunday'), 'Y').otherwise('N').alias("trip_day_weekend_ind")
    )

# Ensure the DataFrame has the necessary columns for writing
trip_data_green_agg_df = result_df.select(
    col("year"),
    col("month"),
    col("borough"),
    col("trip_date"),
    col("day_name").alias("trip_day"),
    col("trip_day_weekend_ind"),
    col("card_trip_count"),
    col("cash_trip_count")
)

# Write the aggregated DataFrame to a table
trip_data_green_agg_df.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .format("parquet") \
    .saveAsTable("nyc_taxi_ldw_spark.trip_data_green_agg")