In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


In [4]:

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
df_green=spark.read.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/pq/green/*/*')

In [6]:
df_green.createOrReplaceTempView('green')

In [7]:
df_green_revenue=spark.sql("""
SELECT
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS green_amount,
    COUNT(1) AS green_number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1,2
""")

In [8]:
df_green_revenue.show()

+-------------------+----+------------------+--------------------+
|               hour|zone|      green_amount|green_number_records|
+-------------------+----+------------------+--------------------+
|2020-01-03 09:00:00| 226|108.33999999999999|                   6|
|2020-01-02 14:00:00|  26| 327.4800000000001|                  11|
|2020-01-06 15:00:00| 241|             184.7|                   3|
|2020-01-22 13:00:00|  74| 666.2399999999999|                  44|
|2020-01-14 23:00:00| 223|             48.31|                   4|
|2020-01-09 23:00:00| 260|202.33000000000004|                  16|
|2020-01-30 09:00:00|  75|1010.3499999999995|                  67|
|2020-01-11 21:00:00|  41| 700.5999999999997|                  50|
|2020-01-07 18:00:00|  45|112.39000000000001|                   4|
|2020-01-02 09:00:00|  42| 555.7500000000001|                  33|
|2020-01-04 23:00:00|  25|             376.4|                  26|
|2020-01-25 11:00:00| 259|            161.21|                 

In [9]:
df_green_revenue \
    .repartition(20) \
    .write.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/report/revenue/green', mode='overwrite')

In [10]:
df_yellow=spark.read.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/pq/yellow/*/*')

In [11]:
df_yellow.createOrReplaceTempView('yellow')

In [12]:
df_yellow_revenue=spark.sql("""
SELECT
    date_trunc('hour', tpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS yellow_amount,
    COUNT(1) AS yellow_number_records
FROM
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1,2
""")

In [13]:
df_yellow_revenue.show()

+-------------------+----+------------------+---------------------+
|               hour|zone|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+---------------------+
|2088-01-24 00:00:00|  41|               5.3|                    1|
|2033-04-27 13:00:00| 263|               8.3|                    1|
|2033-04-27 12:00:00| 229|              10.8|                    1|
|2020-01-02 00:00:00| 100| 840.5600000000002|                   43|
|2020-01-02 00:00:00| 163| 756.5000000000002|                   55|
|2020-01-05 02:00:00| 163|1444.6499999999992|                   92|
|2020-01-15 17:00:00| 233| 4806.989999999999|                  236|
|2020-01-16 16:00:00| 211|1691.4799999999996|                   88|
|2020-02-28 18:00:00|  48| 7228.720000000013|                  433|
|2020-02-14 14:00:00| 113| 3181.679999999998|                  197|
|2020-01-02 04:00:00| 100|383.14000000000004|                   27|
|2020-02-16 05:00:00| 107| 553.3600000000001|   

In [14]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/report/revenue/yellow', mode='overwrite')

### Joins

In [15]:
df_yellow_revenue.show()

+-------------------+----+------------------+---------------------+
|               hour|zone|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+---------------------+
|2088-01-24 00:00:00|  41|               5.3|                    1|
|2033-04-27 13:00:00| 263|               8.3|                    1|
|2033-04-27 12:00:00| 229|              10.8|                    1|
|2020-01-02 00:00:00| 100| 840.5600000000002|                   43|
|2020-01-02 00:00:00| 163| 756.5000000000002|                   55|
|2020-01-05 02:00:00| 163|1444.6499999999992|                   92|
|2020-01-15 17:00:00| 233| 4806.989999999999|                  236|
|2020-01-16 16:00:00| 211|1691.4799999999996|                   88|
|2020-02-28 18:00:00|  48| 7228.720000000013|                  433|
|2020-02-14 14:00:00| 113| 3181.679999999998|                  197|
|2020-01-02 04:00:00| 100|383.14000000000004|                   27|
|2020-02-16 05:00:00| 107| 553.3600000000001|   

In [16]:
df_join=df_green_revenue.join(df_yellow_revenue,on=['hour','zone'],how='outer')
df_join.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|  15|              NULL|                NULL|             34.09|                    1|
|2020-01-01 00:00:00|  25|             531.0|                  26|            324.35|                   16|
|2020-01-01 00:00:00|  29|              61.3|                   1|              NULL|                 NULL|
|2020-01-01 00:00:00|  33| 317.2700000000001|                  11|            255.56|                    8|
|2020-01-01 00:00:00|  34|              NULL|                NULL|              19.3|                    1|
|2020-01-01 00:00:00|  41|1363.9599999999982|                  84|1256.5300000000002|                   80|
|2020-01-01 00:00:00|  42| 7

In [19]:
df_join.write.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/report/revenue/total',mode='overwrite')

In [20]:
df_join=spark.read.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/report/revenue/total')

In [22]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2023-10-31 16:01:34--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 16.182.97.128, 52.216.209.136, 54.231.227.16, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|16.182.97.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: 'taxi+_zone_lookup.csv'

     0K .......... ..                                         100% 1013K=0.01s

2023-10-31 16:01:37 (1013 KB/s) - 'taxi+_zone_lookup.csv' saved [12322/12322]



In [30]:
df=spark.read \
    .option('header','True') \
    .csv('./taxi+_zone_lookup.csv')

In [32]:
df.write.parquet('zones')

In [33]:
df_zones= spark.read.parquet('zones/')
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [34]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)

In [35]:
df_result.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+----------+---------+-------------------+------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|LocationID|  Borough|               Zone|service_zone|
+-------------------+----+------------------+--------------------+------------------+---------------------+----------+---------+-------------------+------------+
|2020-01-01 00:00:00|  12|              NULL|                NULL|             107.0|                    6|        12|Manhattan|       Battery Park| Yellow Zone|
|2020-01-01 00:00:00|  13|              NULL|                NULL|            1214.8|                   56|        13|Manhattan|  Battery Park City| Yellow Zone|
|2020-01-01 00:00:00|  18|               7.8|                   1|               5.8|                    1|        18|    Bronx|       Bedford Park|   Boro Zone|
|2020-01-01 00:00:00|  24|  

In [37]:
df_result.drop('LocationID', 'zone').write.parquet('C:/data_engineering_project/06_Batch_processing_with_spark/data/report/revenue/revenue-zones')