In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [None]:
bucket_name = 'dezoomcamp2024_project'
spark_sql = (
    SparkSession.builder.master("spark://dezoomcamp2024.us-east1-b.c.dezoomcamp2024-week5.internal:7077").appName("App_local_cluster")
    .config(
        "spark.jars.packages",
        "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta,com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.1.6",
    )
    .config(
        "spark.jars",
        "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
    )
    .getOrCreate()
)

spark_sql._jsc.hadoopConfiguration().set(
    "fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark_sql.conf.set("temporaryGcsBucket", bucket_name)

In [None]:
df_green = spark_sql.read.parquet("gs://dezoomcamp2024_project/processed/green/2020/*")

In [7]:
df_yellow = spark_sql.read.parquet("gs://dezoomcamp2024_project/processed/yellow/2020/*")

In [8]:
df_yellow = df_yellow \
            .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime') \
            .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')

In [9]:
df_green = df_green \
            .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime') \
            .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime')

In [10]:
#  create a common column list which is present in both green and yellow dataframes and order them same as green dataframe
common_columns = []
yellow_columns = df_yellow.columns
for column in df_green.columns:
    if column in yellow_columns:
        common_columns.append(column)

In [12]:
# add service type column to green dataframe and yellow dataframe
from pyspark.sql import functions as F
df_green = df_green.withColumn('service_type', F.lit('green'))    

In [13]:
df_yellow = df_yellow.withColumn('service_type', F.lit('yellow'))

In [14]:
df_yellow_select = df_yellow.select(common_columns + ['service_type'])

In [15]:
df_green_select = df_green.select(common_columns + ['service_type'])

In [16]:
# union both green and yellow dataframes
df_trip_data = df_green_select.unionAll(df_yellow_select)

In [None]:
# create temporary table from the dataframe
df_trip_data.registerTempTable('trip_data')

In [53]:
df_result = spark_sql.sql('''
select 
    -- Reveneue grouping 
    PULocationID as revenue_zone,
    date_trunc("month", "pickup_datetime") as revenue_month, 
    service_type, 

    -- Revenue calculation 
    sum(fare_amount) as revenue_monthly_fare,
    sum(extra) as revenue_monthly_extra,
    sum(mta_tax) as revenue_monthly_mta_tax,
    sum(tip_amount) as revenue_monthly_tip_amount,
    sum(tolls_amount) as revenue_monthly_tolls_amount,
    sum(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    sum(total_amount) as revenue_monthly_total_amount,

    -- Additional calculations
    avg(passenger_count) as avg_monthly_passenger_count,
    avg(trip_distance) as avg_monthly_trip_distance

from trip_data
group by 1,2,3
''')


In [56]:
df_result.repartition(4).write.parquet('data/report/revenue', mode='overwrite')

                                                                                