In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/04 16:59:41 WARN Utils: Your hostname, Davids-MacBook-Pro-3.local resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/03/04 16:59:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 16:59:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read.parquet("../data/pq/green/*/*")
df_yellow = spark.read.parquet("../data/pq/yellow/*/*")

In [38]:
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)


In [42]:
df_green.createOrReplaceTempView("green_data")
df_yellow.createOrReplaceTempView("yellow_data")

In [43]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    date_trunc('hour', tpep_pickup_datetime) AS revenue_month,
    PULocationID AS revenue_zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS revenue_monthly_total_trips
FROM
    yellow_data
WHERE
    tpep_pickup_datetime >= '2020-01-01' AND tpep_pickup_datetime < '2022-01-01'
GROUP BY
    1, 2
""")

In [41]:
df_result.show(5)



+-------------------+------------+------------------+---------------------------+
|      revenue_month|revenue_zone|            amount|revenue_monthly_total_trips|
+-------------------+------------+------------------+---------------------------+
|2020-01-20 15:00:00|         262|2161.0599999999986|                        140|
|2020-01-15 04:00:00|         238|237.42000000000002|                         14|
|2020-01-12 13:00:00|         246| 6737.700000000007|                        390|
|2020-01-04 01:00:00|          79| 9696.750000000018|                        572|
|2020-01-17 09:00:00|          50|1518.9599999999991|                         97|
+-------------------+------------+------------------+---------------------------+


                                                                                

In [44]:
df_result.write.parquet("../data/report/revenue/yellow", mode="overwrite")

24/03/03 20:44:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/03/03 20:44:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/03 20:44:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/03/03 20:44:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/03 20:44:33 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

# Next day (Joins)

In [3]:
df_yellow_revenue = spark.read.parquet("../data/report/revenue/yellow")
df_green_revenue = spark.read.parquet("../data/report/revenue/green")

In [10]:
df_green_revenue = df_green_revenue \
    .withColumnRenamed("amount", "green_amount") \
    .withColumnRenamed("revenue_monthly_total_trips", "green_trips")

df_yellow_revenue = df_yellow_revenue \
    .withColumnRenamed("amount", "yellow_amount") \
    .withColumnRenamed("revenue_monthly_total_trips", "yellow_trips")

In [11]:
df_join = df_green_revenue.join(df_yellow_revenue, on=["revenue_month", "revenue_zone"], how="outer")

In [12]:
df_join.show(5)

[Stage 13:>                                                       (0 + 10) / 11]

+-------------------+------------+------------------+-----------+------------------+------------+
|      revenue_month|revenue_zone|      green_amount|green_trips|     yellow_amount|yellow_trips|
+-------------------+------------+------------------+-----------+------------------+------------+
|2020-01-01 00:00:00|          24|              87.6|          3|            754.95|          45|
|2020-01-01 00:00:00|          34|              NULL|       NULL|              19.3|           1|
|2020-01-01 00:00:00|          41|1363.9599999999987|         84|1256.5299999999997|          80|
|2020-01-01 00:00:00|          71|              23.8|          1|              NULL|        NULL|
|2020-01-01 00:00:00|          87|              NULL|       NULL| 2456.669999999999|         112|
+-------------------+------------+------------------+-----------+------------------+------------+


                                                                                

In [13]:
df_join.write.parquet("../data/report/revenue/total", mode="overwrite")

24/03/04 17:18:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/03/04 17:18:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/04 17:18:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/03/04 17:18:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/04 17:18:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [14]:
df_join = spark.read.parquet("../data/report/revenue/total")

In [15]:
df_join.show(5)

+-------------------+------------+------------------+-----------+------------------+------------+
|      revenue_month|revenue_zone|      green_amount|green_trips|     yellow_amount|yellow_trips|
+-------------------+------------+------------------+-----------+------------------+------------+
|2020-01-01 00:00:00|          24|              87.6|          3|            754.95|          45|
|2020-01-01 00:00:00|          34|              NULL|       NULL|              19.3|           1|
|2020-01-01 00:00:00|          41|1363.9599999999987|         84|1256.5299999999997|          80|
|2020-01-01 00:00:00|          71|              23.8|          1|              NULL|        NULL|
|2020-01-01 00:00:00|          87|              NULL|       NULL| 2456.669999999999|         112|
+-------------------+------------+------------------+-----------+------------------+------------+


In [16]:
df_zones = spark.read.parquet("../data/pq/zones")

In [17]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+


In [19]:
df_result = df_join.join(df_zones, df_join.revenue_zone == df_zones.LocationID, how="left")

In [20]:
df_result.show(5)

+-------------------+------------+------------------+-----------+------------------+------------+----------+---------+--------------------+------------+
|      revenue_month|revenue_zone|      green_amount|green_trips|     yellow_amount|yellow_trips|LocationID|  Borough|                Zone|service_zone|
+-------------------+------------+------------------+-----------+------------------+------------+----------+---------+--------------------+------------+
|2020-01-01 00:00:00|          24|              87.6|          3|            754.95|          45|        24|Manhattan|        Bloomingdale| Yellow Zone|
|2020-01-01 00:00:00|          34|              NULL|       NULL|              19.3|           1|        34| Brooklyn|  Brooklyn Navy Yard|   Boro Zone|
|2020-01-01 00:00:00|          41|1363.9599999999987|         84|1256.5299999999997|          80|        41|Manhattan|      Central Harlem|   Boro Zone|
|2020-01-01 00:00:00|          71|              23.8|          1|              NUL

In [ ]:
df_result.write.parquet("../data/report/revenue/total_with_zones", mode="overwrite")

In [21]:
rdd_test = df_result.rdd

In [23]:
rdd_test.getNumPartitions()

11