In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/28 06:27:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/28 06:27:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [3]:
'''
    SELECT 
        DATE_TRUNC('HOUR', lpep_pickup_datetime) AS hour, 
        PULocationID AS zone,
        SUM(total_amount) AS amount,
        COUNT(*) AS number_records
    FROM green
    WHERE lpep_pickup_datetime >= '2020-01-01'
    GROUP BY 1,2
'''

"\n    SELECT \n        DATE_TRUNC('HOUR', lpep_pickup_datetime) AS hour, \n        PULocationID AS zone,\n        SUM(total_amount) AS amount,\n        COUNT(*) AS number_records\n    FROM green\n    WHERE lpep_pickup_datetime >= '2020-01-01'\n    GROUP BY 1,2\n"

In [4]:
# We will execute above statement with RDDs
# rdd.take(10) will show first ten Row objects of an rdd
df_green.rdd.take(10)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 13, 38, 16), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=130, passenger_count=1, trip_distance=12.77, fare_amount=36.0, extra=0.0, mta_tax=0.5, tip_amount=2.05, tolls_amount=6.12, ehail_fee=None, improvement_surcharge=0.3, total_amount=44.97, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 20, 15, 46), store_and_fwd_flag=None, RatecodeID=None, PULocationID=67, DOLocationID=39, passenger_count=None, trip_distance=8.0, fare_amount=29.9, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=33.45, payment_type=None, trip_type=None, congestion_surcharge=None),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41)

In [6]:
# You can select specific fields from an RDD
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

rdd.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, total_amount=33.45),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, total_amount=12.74)]

In [8]:
# RDD can be filtered using lambda function or predefined function, WHERE statement essentially
from datetime import datetime

start = datetime(year=2021, month=1, day=1)
rdd.filter(lambda row: row.lpep_pickup_datetime >= start).take(1)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2021, 6, 18, 13, 31, 15), PULocationID=7, total_amount=3.3)]

In [9]:
# We can also use map() function on each row of RDD to apply some transformations
# I will use a defined func to do this instead of a lambda

def transform_row(row):
    # truncate hour
    hour = row.lpep_pickup_datetime.replace(minute=0,second=0,microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1 # one row
    value = (amount, count)
    
    return (key, value)

rdd \
    .filter(lambda row: row.lpep_pickup_datetime >= start) \
    .map(transform_row) \
    .take(10)

                                                                                

[((datetime.datetime(2021, 6, 18, 13, 0), 7), (3.3, 1)),
 ((datetime.datetime(2021, 6, 24, 22, 0), 41), (18.55, 1)),
 ((datetime.datetime(2021, 6, 29, 12, 0), 43), (7.55, 1)),
 ((datetime.datetime(2021, 6, 13, 11, 0), 134), (39.61, 1)),
 ((datetime.datetime(2021, 6, 27, 20, 0), 75), (19.3, 1)),
 ((datetime.datetime(2021, 6, 23, 19, 0), 22), (63.3, 1)),
 ((datetime.datetime(2021, 6, 12, 10, 0), 41), (16.31, 1)),
 ((datetime.datetime(2021, 6, 19, 15, 0), 41), (9.8, 1)),
 ((datetime.datetime(2021, 6, 12, 21, 0), 189), (26.83, 1)),
 ((datetime.datetime(2021, 6, 18, 19, 0), 41), (33.35, 1))]

In [12]:
# First typle is a key (datetime, zone), second is the values (amount,count)
# Now we wan to reduce these values into a single key using reduceKey(), so summing up amount and count 
# row by to adding previous_value to next_value

def calculate_revenue(prev_value, next_value):
    prev_amount, prev_count = prev_value
    next_amount, next_count = next_value
    
    output_amount = prev_amount + next_amount
    output_count = prev_count + next_count
    output_value = (output_amount, output_count)
    
    return output_value

In [13]:
rdd \
    .filter(lambda row: row.lpep_pickup_datetime >= start) \
    .map(transform_row) \
    .reduceByKey(calculate_revenue) \
    .take(10)

                                                                                

[((datetime.datetime(2021, 6, 24, 22, 0), 41), (122.05, 8)),
 ((datetime.datetime(2021, 6, 27, 20, 0), 75), (118.32, 7)),
 ((datetime.datetime(2021, 6, 23, 19, 0), 22), (63.3, 1)),
 ((datetime.datetime(2021, 6, 29, 12, 0), 24), (23.97, 2)),
 ((datetime.datetime(2021, 6, 18, 17, 0), 52), (90.79, 1)),
 ((datetime.datetime(2021, 6, 11, 14, 0), 167), (37.1, 2)),
 ((datetime.datetime(2021, 6, 7, 16, 0), 213), (44.3, 2)),
 ((datetime.datetime(2021, 6, 14, 21, 0), 93), (140.39999999999998, 3)),
 ((datetime.datetime(2021, 6, 18, 11, 0), 213), (67.66, 3)),
 ((datetime.datetime(2021, 6, 28, 9, 0), 197), (54.8, 1))]

In [16]:
# This structure is unreadable because it's nested, we now want to unnest this list
# and turn it back into a dataframe
def unnest(row):
    return (row[0][0], row[0][1], row[1][0], row[1][1]) # datetime, zone, amount, count

rdd \
    .filter(lambda row: row.lpep_pickup_datetime >= start) \
    .map(transform_row) \
    .reduceByKey(calculate_revenue) \
    .map(unnest) \
    .toDF() \
    .show()

                                                                                

+-------------------+---+------------------+---+
|                 _1| _2|                _3| _4|
+-------------------+---+------------------+---+
|2021-06-24 22:00:00| 41|            122.05|  8|
|2021-06-27 20:00:00| 75|            118.32|  7|
|2021-06-23 19:00:00| 22|              63.3|  1|
|2021-06-29 12:00:00| 24|             23.97|  2|
|2021-06-18 17:00:00| 52|             90.79|  1|
|2021-06-11 14:00:00|167|              37.1|  2|
|2021-06-07 16:00:00|213|              44.3|  2|
|2021-06-14 21:00:00| 93|140.39999999999998|  3|
|2021-06-18 11:00:00|213|             67.66|  3|
|2021-06-28 09:00:00|197|              54.8|  1|
|2021-06-01 11:00:00| 42|              83.4|  6|
|2021-06-22 10:00:00| 17|             49.68|  2|
|2021-06-17 07:00:00| 47|             46.68|  1|
|2021-06-22 15:00:00|134|             59.62|  4|
|2021-06-29 13:00:00|145|             27.85|  1|
|2021-06-06 19:00:00| 74| 422.0400000000001| 29|
|2021-06-08 12:00:00|116|17.900000000000002|  3|
|2021-06-11 13:00:00

In [17]:
# Columns don't have names, we can fix it by adding a named tuple into unnest()
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

def unnest(row):
    return RevenueRow(hour=row[0][0],
                      zone=row[0][1], 
                      revenue=row[1][0],
                      count=row[1][1]
                     )

In [18]:
rdd \
    .filter(lambda row: row.lpep_pickup_datetime >= start) \
    .map(transform_row) \
    .reduceByKey(calculate_revenue) \
    .map(unnest) \
    .toDF() \
    .show()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-06-24 22:00:00|  41|            122.05|    8|
|2021-06-27 20:00:00|  75|            118.32|    7|
|2021-06-23 19:00:00|  22|              63.3|    1|
|2021-06-29 12:00:00|  24|             23.97|    2|
|2021-06-18 17:00:00|  52|             90.79|    1|
|2021-06-11 14:00:00| 167|              37.1|    2|
|2021-06-07 16:00:00| 213|              44.3|    2|
|2021-06-14 21:00:00|  93|140.39999999999998|    3|
|2021-06-18 11:00:00| 213|             67.66|    3|
|2021-06-28 09:00:00| 197|              54.8|    1|
|2021-06-01 11:00:00|  42|              83.4|    6|
|2021-06-22 10:00:00|  17|             49.68|    2|
|2021-06-17 07:00:00|  47|             46.68|    1|
|2021-06-22 15:00:00| 134|             59.62|    4|
|2021-06-29 13:00:00| 145|             27.85|    1|
|2021-06-06 19:00:00|  74| 422.0400000000001|   29|
|2021-06-08 