In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyarrow
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/21 13:42:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_green = spark.read.parquet('data/raw/green/*/*')

                                                                                

In [10]:
df_green.registerTempTable('green')

In [4]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [44]:
df_green_revenue = spark.sql("""
SELECT 
    -- Revenue grouping 
    date_trunc('hour', lpep_pickup_datetime) AS hour,  
    PULocationID AS zone,
    
    -- Revenue calculation 
    SUM(total_amount) AS amount,
    COUNT(1) as number_records

FROM
    green
where lpep_pickup_datetime >= '2020-01-01 00:00'
GROUP BY
    1, 2
""")

In [45]:
df_green_revenue.write.parquet('data/report/revenue/green', mode='overwrite')

                                                                                

In [27]:
df_yellow = spark.read.parquet('data/raw/yellow/*/*')

In [28]:
df_yellow.registerTempTable('yellow')



In [42]:
df_yellow_revenue = spark.sql("""
SELECT 
    -- Revenue grouping 
    date_trunc('hour', tpep_pickup_datetime) AS hour,  
    PULocationID AS zone,
    
    -- Revenue calculation 
    SUM(total_amount) AS amount,
    COUNT(1) as number_records

FROM
    yellow
where tpep_pickup_datetime >= '2020-01-01 00:00'
GROUP BY
    1, 2
""")

In [55]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')

                                                                                

In [56]:
df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow/')
df_green_revenue = spark.read.parquet('data/report/revenue/green/')

In [57]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records') 

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')


In [58]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

In [60]:
df_join \
    .write.parquet('data/report/revenue/total', mode='overwrite')

                                                                                

In [3]:
df_join = spark.read.parquet('data/report/revenue/total/')

In [62]:
df_join.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|   4|              null|                null|1004.2999999999995|                   57|
|2020-01-01 00:00:00|   7| 769.7299999999997|                  45| 455.1700000000002|                   38|
|2020-01-01 00:00:00|  22|              15.8|                   1|              null|                 null|
|2020-01-01 00:00:00|  25|             531.0|                  26| 324.3500000000001|                   16|
|2020-01-01 00:00:00|  33|317.27000000000004|                  11|            255.56|                    8|
|2020-01-01 00:00:00|  34|              null|                null|              19.3|                    1|
|2020-01-01 00:00:00|  41| 1

In [2]:
df_zones = spark.read.parquet('zones/')

                                                                                

In [68]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [69]:
df_join

DataFrame[hour: timestamp, zone: bigint, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]

In [4]:
df_result = df_join.join(df_zones, df_join.zone==df_zones.LocationID)

In [6]:
df_result \
    .drop('LocationID', 'zone') \
    .write.parquet('tmp/revenue-zones', mode='overwrite')

                                                                                