In [58]:
from pyspark.sql import SparkSession, types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
spark

In [3]:
df_green = spark.read.parquet("../datasets/pq/green/*/*")
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
df_green.rdd.getNumPartitions()

19

In [31]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd
rdd.take(10)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 4, 5, 54), PULocationID=129, total_amount=8.51),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 19, 34, 40), PULocationID=82, total_amount=8.8),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 19, 7, 7), PULocationID=75, total_amount=8.76),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 17, 26, 4), PULocationID=210, total_amount=11.44),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 19, 12, 18, 31), PULocationID=129, total_amount=5.8),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 19, 43, 11), PULocationID=65, total_amount=26.55),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 13, 39), PULocationID=82, total_amount=18.94),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 14, 17, 55, 4), PULocationID=260, total_amount=11.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 24, 8, 16), PULocationID=59, total_amount=23.58),
 Row(lpep_pickup_datetime=datetime.datetime(2020

In [17]:
from datetime import datetime

start = datetime(year=2021,month=1,day=1)

In [16]:
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [20]:
# Applying filter
rdd.filter(filter_outliers).take(1)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2021, 10, 29, 13, 51, 40), PULocationID=166, total_amount=15.3)]

In [44]:
# Extract a row
row = rdd.take(1)[0]
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 4, 5, 54), PULocationID=129, total_amount=8.51)

In [None]:
# This is the equivalent of groupBy but performing on RDD level
# Performing map() to create key value pairs

def prepare_for_grouping(row): 
    # Create a key
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone) # Composite Key
    
    # Create value
    amount = row.total_amount
    count = 1
    value = (amount, count) # Composite Value

    return (key, value) # Returns a tuple

prepare_for_grouping(row)

((datetime.datetime(2020, 1, 11, 4, 0), 129), (8.51, 1))

In [None]:
# Map Phase
"""
Creates key value pairs for aggregation via map()
"""
print("{key, value}")
print("((hour, zone), (amount, count))")
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(10)

{key, value}
((hour, zone), (amount, count))


                                                                                

[((datetime.datetime(2021, 10, 29, 13, 0), 166), (15.3, 1)),
 ((datetime.datetime(2021, 10, 19, 10, 0), 188), (18.5, 1)),
 ((datetime.datetime(2021, 10, 14, 8, 0), 75), (9.3, 1)),
 ((datetime.datetime(2021, 10, 14, 2, 0), 264), (18.36, 1)),
 ((datetime.datetime(2021, 10, 8, 23, 0), 82), (12.25, 1)),
 ((datetime.datetime(2021, 10, 26, 16, 0), 95), (12.3, 1)),
 ((datetime.datetime(2021, 10, 18, 9, 0), 7), (21.96, 1)),
 ((datetime.datetime(2021, 10, 18, 12, 0), 75), (7.8, 1)),
 ((datetime.datetime(2021, 10, 17, 10, 0), 35), (23.98, 1)),
 ((datetime.datetime(2021, 10, 9, 16, 0), 19), (48.11, 1))]

In [38]:
# This is the equivalent of groupBy but performing on RDD level
# Performing reduce() to collect values of like keys together (via reduceByKey for local aggregation)

# Take 2 tuples with like keys
# (K1, V1), (K1, V2)
# left_value = V1, right_value = V2
def calculate_revenue(left_value, right_value):
    # left  => V1 = (amount, count)
    # right => V2 = (amount, count)
    left_amount, left_count = left_value # amount
    right_amount, right_count = right_value # count
    
    output_amount = left_amount + right_amount # add amount of the same key
    output_count = left_count + right_count # add count of the same key
    
    return (output_amount, output_count)

In [41]:
# Reduce Phase
"""
Perform local aggreagation (sum) on keys via reduceByKey()
"""
print("{key, value}")
print("((hour, zone), (amount, count))")

rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(10)

{key, value}
((hour, zone), (amount, count))


                                                                                

[((datetime.datetime(2021, 10, 16, 7, 0), 174), (162.04000000000002, 3)),
 ((datetime.datetime(2021, 10, 29, 14, 0), 7), (312.9, 14)),
 ((datetime.datetime(2021, 10, 5, 8, 0), 152), (120.87, 5)),
 ((datetime.datetime(2021, 10, 14, 15, 0), 166), (271.59000000000003, 12)),
 ((datetime.datetime(2021, 10, 13, 14, 0), 42), (109.07, 6)),
 ((datetime.datetime(2021, 10, 20, 21, 0), 95), (37.739999999999995, 3)),
 ((datetime.datetime(2021, 10, 28, 10, 0), 75), (515.07, 28)),
 ((datetime.datetime(2021, 10, 29, 20, 0), 74), (244.71999999999997, 15)),
 ((datetime.datetime(2021, 10, 10, 7, 0), 69), (58.04, 1)),
 ((datetime.datetime(2021, 10, 3, 18, 0), 42), (131.51999999999998, 5))]

In [42]:
# Create an immutable named tuple instead of using a fully fledged class
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow',['hour','zone','revenue','count'])

# Utility function
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [55]:
# Unwrapping each row and converting rdd back to spark DF
results_df = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF()
results_df.show(10)



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-10-16 07:00:00| 174|162.04000000000002|    3|
|2021-10-29 14:00:00|   7|             312.9|   14|
|2021-10-05 08:00:00| 152|            120.87|    5|
|2021-10-14 15:00:00| 166|271.59000000000003|   12|
|2021-10-13 14:00:00|  42|            109.07|    6|
|2021-10-20 21:00:00|  95|37.739999999999995|    3|
|2021-10-28 10:00:00|  75|            515.07|   28|
|2021-10-29 20:00:00|  74|244.71999999999997|   15|
|2021-10-10 07:00:00|  69|             58.04|    1|
|2021-10-03 18:00:00|  42|131.51999999999998|    5|
+-------------------+----+------------------+-----+
only showing top 10 rows



                                                                                

In [None]:
# Automatically inferred schema
results_df.printSchema()

root
 |-- hour: timestamp (nullable = true)
 |-- zone: long (nullable = true)
 |-- revenue: double (nullable = true)
 |-- count: long (nullable = true)



In [59]:
# Supply a schema
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [None]:
# Apply schema
results_df = (
    rdd
    .filter(filter_outliers)           # Remove rows with revenue/count outliers
    .map(prepare_for_grouping)         # Convert row into (key, value) for aggregation
    .reduceByKey(calculate_revenue)    # Combine values by key to calculate revenue
    .map(unwrap)                       # Convert (key, value) back to Row
    .toDF(result_schema)               # Convert RDD[Row] to DataFrame
)

results_df.printSchema()

root
 |-- hour: timestamp (nullable = true)
 |-- zone: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- count: integer (nullable = true)



In [61]:
# Save as parquet file
results_df.write.parquet('../datasets/tmp/green-revenue')

25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/04/05 04:14:12 WARN MemoryManager: Total allocation exceeds 95.