**Import libraries**

In [49]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [3]:
# Initialize a SparkSession
spark = SparkSession.builder.master("local[4]").appName('SparkSQL').getOrCreate()
spark

**Import Data**

`import green tripsdata`

In [27]:
df_green = spark.read.parquet("data/pq/green/*/*/")

In [28]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime')
df_green = df_green.withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [29]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



`import yellow tripsdata`

In [30]:
df_yellow = spark.read.parquet("data/pq/yellow/*/*/")

In [31]:
# Rename columns
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
df_yellow = df_yellow.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [32]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



`Union green and yellow tripsdata`

In [47]:
common_columns = []

for col in df_green.columns:
    if col in df_yellow.columns:
        common_columns.append(col)

In [52]:
df_green = df_green.select(common_columns).withColumn('service_type', F.lit('green'))
df_yellow = df_yellow.select(common_columns).withColumn('service_type', F.lit('yellow'))

In [59]:
trips_data = df_green.unionAll(df_yellow)
trips_data.count()

                                                                                

99588780

In [60]:
trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 6291456|
|      yellow|93297324|
+------------+--------+





**Querying Spark DataFrames with SQL**

In [62]:
# Register DF as temporary table
trips_data.registerTempTable('trips_data')

`Select statement`

In [71]:
results = spark.sql(
"""
    SELECT
        VendorID,
        pickup_datetime,
        dropoff_datetime,
        trip_distance,
        total_amount
        
    FROM 
        trips_data
"""
)

results.show(5)

+--------+-------------------+-------------------+-------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|trip_distance|total_amount|
+--------+-------------------+-------------------+-------------+------------+
|       2|2020-01-22 21:34:16|2020-01-22 21:46:45|         4.22|       18.96|
|       2|2020-01-30 08:48:21|2020-01-30 09:02:11|         1.55|        13.5|
|       2|2020-01-08 20:36:00|2020-01-08 21:13:00|         9.88|       22.64|
|       2|2020-01-31 11:22:00|2020-01-31 11:43:45|         7.04|       31.86|
|       2|2020-01-27 16:56:00|2020-01-27 17:27:00|        10.53|       42.75|
+--------+-------------------+-------------------+-------------+------------+
only showing top 5 rows



In [79]:
df_revenue_report = spark.sql(
"""
    SELECT 
        -- Reveneue grouping
        PULocationID as revenue_zone,
        date_trunc('month', pickup_datetime) AS revenue_month, 

        service_type, 

        -- Revenue calculation 
        SUM(fare_amount) AS revenue_monthly_fare,
        SUM(extra) AS revenue_monthly_extra,
        SUM(mta_tax) AS revenue_monthly_mta_tax,
        SUM(tip_amount) AS revenue_monthly_tip_amount,
        SUM(tolls_amount) AS revenue_monthly_tolls_amount,
        SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
        SUM(total_amount) AS revenue_monthly_total_amount,
        SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

        -- Additional calculations
        AVG(passenger_count) AS avg_montly_passenger_count,
        AVG(trip_distance) AS avg_montly_trip_distance

    FROM 
        trips_data
    GROUP BY 
        revenue_zone, revenue_month, service_type
"""
)

`Export DataFrame to parquet`

In [80]:
df_revenue_report.write.parquet("data/report/revenue/")

                                                                                