VIDEO 5.4.2 GroupBy in Spark

In [3]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [5]:
df_green.registerTempTable('green')



In [6]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [7]:
df_green_revenue.show()

+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2020-01-02 13:00:00|  74|1037.6299999999997|            71|
|2020-01-05 08:00:00|  42|            338.27|            18|
|2020-01-01 01:00:00|  17|            598.15|            18|
|2020-01-04 20:00:00|  25|            369.57|            23|
|2020-01-03 12:00:00|  74| 770.0400000000002|            62|
|2020-01-02 05:00:00|   7|             54.22|             2|
|2020-01-04 22:00:00|  83|            298.16|            15|
|2020-01-05 02:00:00| 130|             41.61|             4|
|2020-01-02 15:00:00| 236|            209.32|             8|
|2020-01-02 09:00:00|  66|229.40000000000003|            12|
|2020-01-02 15:00:00|  66|            241.25|            10|
|2020-01-04 15:00:00| 112|             21.55|             1|
|2020-01-05 00:00:00| 244|181.98999999999998|             9|
|2020-01-01 10:00:00| 18

In [9]:
df_green_revenue.repartition(20).write.parquet('data/report/revenue/green', mode='overwrite')

In [10]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')
df_yellow.registerTempTable('yellow')



In [11]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', tpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    tpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")

In [12]:
df_yellow_revenue.repartition(20).write.parquet('data/report/revenue/yellow', mode='overwrite')

VIDEO 5.4.3 - Joins in Spark

In [13]:
df_green_revenue = spark.read.parquet('data/report/revenue/green')
df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')

In [14]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

In [15]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

In [16]:
df_join.write.parquet('data/report/revenue/total', mode='overwrite')

In [17]:
df_join = spark.read.parquet('data/report/revenue/total')

In [19]:
df_join.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|   3|              NULL|                NULL|              25.0|                    1|
|2020-01-01 00:00:00|  40|            168.98|                   8| 89.97000000000001|                    5|
|2020-01-01 00:00:00|  45|              NULL|                NULL|            732.48|                   42|
|2020-01-01 00:00:00|  47|              13.3|                   1|               8.3|                    1|
|2020-01-01 00:00:00|  51|              17.8|                   2|              31.0|                    1|
|2020-01-01 00:00:00|  77| 75.99000000000001|                   2|            134.24|                    3|
|2020-01-01 00:00:00|  95|  

In [21]:
#download zones
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2024-02-27 02:54:45--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.201.8, 52.217.204.0, 54.231.193.64, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.201.8|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2024-02-27 02:54:46 (1.35 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [24]:
#read zones csv and write zones parquet files
df_zones = spark.read.option("header", "true").csv('taxi+_zone_lookup.csv')
df_zones.write.parquet('zones')

In [25]:
df_zones = spark.read.parquet('zones/')

In [26]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [27]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)

In [28]:
df_result.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+----------+---------+--------------------+------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|LocationID|  Borough|                Zone|service_zone|
+-------------------+----+------------------+--------------------+------------------+---------------------+----------+---------+--------------------+------------+
|2020-01-01 00:00:00|   3|              NULL|                NULL|              25.0|                    1|         3|    Bronx|Allerton/Pelham G...|   Boro Zone|
|2020-01-01 00:00:00|  40|            168.98|                   8| 89.97000000000001|                    5|        40| Brooklyn|     Carroll Gardens|   Boro Zone|
|2020-01-01 00:00:00|  45|              NULL|                NULL|            732.48|                   42|        45|Manhattan|           Chinatown| Yellow Zone|
|2020-01-01 00:00:00| 

In [29]:
df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')