In [12]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local[*]')\
        .appName('test') \
        .getOrCreate()

In [None]:
df_green = spark.read.parquet('data/pq/green/*/*')
df_yellow = spark.read.parquet("data/pq/yellow/*/*")

In [None]:
common_cols = []
yellow_cols = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_cols:
        common_cols.append(col)

In [None]:
from pyspark.sql import functions as funcs

In [None]:
df_green_sel = df_green.select(common_cols)
df_yellow_sel = df_yellow.select(common_cols)

In [None]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [None]:
df_results = spark.sql("""
SELECT  
    -- Revenue grouping 
    date_trunc('month', 'pickup_datetime') as revenue_month, 

    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) as revenue_monthly_fare,
    SUM(extra) as revenue_monthly_extra,
    SUM(mta_tax) as revenue_monthly_mta_tax,
    SUM(tip_amount) as revenue_monthly_tip_amount,
    SUM(tolls_amount) as revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    SUM(total_amount) as revenue_monthly_total_amount,

    -- Additional calculations
    AVG(passenger_count) as avg_monthly_passenger_count,
    AVG(trip_distance) as avg_monthly_trip_distance

FROM trips_data
    GROUP BY revenue_month, service_type

        """)

In [None]:
df_results.show()

In [13]:
YELLOW_SCHEMA = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [14]:
df = spark.read.parquet('yellow_tripdata_2023-12.parquet', schema = YELLOW_SCHEMA)

In [24]:
from pyspark.sql import types 
from pyspark.sql.functions import md5, concat_ws, col, coalesce, concat, lit

In [25]:
df = df.withColumn(
    "unique_row_id",
    md5(
        concat(
            coalesce(col("VendorID").cast("string"), lit('')),
            coalesce(col("tpep_pickup_datetime").cast("string"), lit('')),
            coalesce(col("tpep_dropoff_datetime").cast("string"), lit('')),
            coalesce(col("PULocationID").cast("string"), lit('')),
            coalesce(col("DOLocationID").cast("string"), lit('')),
            coalesce(col("fare_amount").cast("string"), lit('')),
            coalesce(col("trip_distance").cast("string"), lit(''))
        )
    )
)

In [26]:
df

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double, unique_row_id: string]