In [1]:
# import library and create spark session

import pyspark
from pyspark.sql import SparkSession

In [2]:
# create the spark session
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("test")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/26 05:15:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [4]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

To compare with the Spark SQL we used in previous lecture as below to show the revenue of greeen taxi, we can do it in RDD as wel by implementing its own strategy

"""
SELECT
   
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,
    
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
    
FROM
    green
WHERE
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1,2

"""

Within the dataframe, we have the RDD as the architeture inside. For example, we can use the df_green to access the RDD as follow. The `take` command is used to access the rdd

In [5]:
# take the rdd 
df_green.rdd.take(2)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 13, 38, 16), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=130, passenger_count=1, trip_distance=12.77, fare_amount=36.0, extra=0.0, mta_tax=0.5, tip_amount=2.05, tolls_amount=6.12, ehail_fee=None, improvement_surcharge=0.3, total_amount=44.97, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 20, 15, 46), store_and_fwd_flag=None, RatecodeID=None, PULocationID=67, DOLocationID=39, passenger_count=None, trip_distance=8.0, fare_amount=29.9, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=33.45, payment_type=None, trip_type=None, congestion_surcharge=None)]

In [6]:
# to create something similar to the query, we first extract the columns that will be used 
rdd = df_green.select('lpep_pickup_datetime', 'PULocationID','total_amount').rdd

To achieve the "WHERE pickup_datetime >= '2020-01-01 00:00:00'" clause in the SQL query, we need to use the filter the filter take in function to transform the data. In such case, we will be creating a customize fucntion 




In [7]:
from datetime import datetime

start_date = datetime(year=2020, month=1,day=1)

def filter_outliers(row):
    return row['lpep_pickup_datetime']>=start_date
    


In [8]:
# to access the individual row
for row in rdd.take(2):
    print(row['lpep_pickup_datetime'])

2020-01-23 13:10:15
2020-01-20 15:09:00


In [41]:
rdd.filter(filter_outliers).take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, total_amount=33.45),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, total_amount=12.74)]

Next, to achieve the Group By in the SQL query, it can be achieve by map() function in RDD which take in rows as input and output as rows as well. It prepare the 'key' and 'value' used for reshuffling.

In [11]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0,second=0,microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    
    return (key, value)

In [42]:
example = rdd.filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .take(5)
example

[((datetime.datetime(2020, 1, 23, 13, 0), 74), (44.97, 1)),
 ((datetime.datetime(2020, 1, 20, 15, 0), 67), (33.45, 1)),
 ((datetime.datetime(2020, 1, 15, 20, 0), 260), (8.3, 1)),
 ((datetime.datetime(2020, 1, 5, 16, 0), 82), (8.3, 1)),
 ((datetime.datetime(2020, 1, 29, 19, 0), 166), (12.74, 1))]

Using the reduce_by_key() function, we then can group them accordingly to the key, which the reshuffling occur. Similar to map(), it take in RDD and output RDD as well but replication of key is eliminated by reducing them into one. The reduce_by_key() function will automatically sort the similar key together. Hence, the reduce action will be focusing on the value aggregration of two rows.


In [20]:
def reduce(left_value,right_value): # calculate revenue
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [43]:
rdd.filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(reduce)\
    .take(5)

                                                                                

[((datetime.datetime(2020, 1, 15, 20, 0), 260), (163.90000000000003, 14)),
 ((datetime.datetime(2020, 1, 29, 19, 0), 166), (695.0099999999999, 45)),
 ((datetime.datetime(2020, 1, 16, 8, 0), 41), (736.1399999999996, 54)),
 ((datetime.datetime(2020, 1, 4, 20, 0), 129), (583.27, 38)),
 ((datetime.datetime(2020, 1, 2, 8, 0), 66), (197.69, 10))]

It might not be in the right format we want, we can unnest it to suit our use case

In [32]:
def unwrap(row):
    return RevenueRow(
        hour= row[0][0],
        zone = row[0][1],
        revenue = row[1][0],
        count = row[1][1]
    )

In [27]:
rdd.filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(reduce)\
    .map(unwrap)\
    .toDF()\
    .show()

                                                                                

+-------------------+---+------------------+---+
|                 _1| _2|                _3| _4|
+-------------------+---+------------------+---+
|2020-01-15 20:00:00|260|163.90000000000003| 14|
|2020-01-29 19:00:00|166| 695.0099999999999| 45|
|2020-01-16 08:00:00| 41| 736.1399999999996| 54|
|2020-01-04 20:00:00|129|            583.27| 38|
|2020-01-02 08:00:00| 66|            197.69| 10|
|2020-01-03 09:00:00| 61|            142.21|  9|
|2020-01-17 21:00:00|236|              33.6|  4|
|2020-01-12 12:00:00| 82|            290.41| 14|
|2020-01-28 16:00:00|197| 831.4399999999998| 18|
|2020-01-10 22:00:00| 95| 407.7100000000002| 37|
|2020-01-10 01:00:00|215|            109.69|  2|
|2020-01-07 18:00:00| 25| 554.2900000000001| 37|
|2020-01-18 07:00:00| 55|              48.3|  1|
|2020-01-28 09:00:00|166| 473.0200000000002| 36|
|2020-01-12 15:00:00| 82| 265.7900000000001| 29|
|2020-01-10 20:00:00| 66|            405.88| 21|
|2020-01-31 15:00:00| 43|345.58000000000004| 19|
|2020-01-31 21:00:00

You might notice that is not column name we can explicitly define it


In [28]:
# edit the map function - unwrap
from collections import namedtuple
RevenueRow = namedtuple('RevenueRow',['hour','zone','revenue','count'])

In [34]:
rdd.filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(reduce)\
    .map(unwrap)\
    .toDF()\
    .show()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-15 20:00:00| 260|163.90000000000003|   14|
|2020-01-29 19:00:00| 166| 695.0099999999999|   45|
|2020-01-16 08:00:00|  41| 736.1399999999996|   54|
|2020-01-04 20:00:00| 129|            583.27|   38|
|2020-01-02 08:00:00|  66|            197.69|   10|
|2020-01-03 09:00:00|  61|            142.21|    9|
|2020-01-17 21:00:00| 236|              33.6|    4|
|2020-01-12 12:00:00|  82|            290.41|   14|
|2020-01-28 16:00:00| 197| 831.4399999999998|   18|
|2020-01-10 22:00:00|  95| 407.7100000000002|   37|
|2020-01-10 01:00:00| 215|            109.69|    2|
|2020-01-07 18:00:00|  25| 554.2900000000001|   37|
|2020-01-18 07:00:00|  55|              48.3|    1|
|2020-01-28 09:00:00| 166| 473.0200000000002|   36|
|2020-01-12 15:00:00|  82| 265.7900000000001|   29|
|2020-01-10 20:00:00|  66|            405.88|   21|
|2020-01-31 

We can also define the schema we want 


In [36]:
from pyspark.sql import types

result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [37]:
df_result = rdd.filter(filter_outliers)\
    .map(prepare_for_grouping)\
    .reduceByKey(reduce)\
    .map(unwrap)\
    .toDF(result_schema)

In [39]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', IntegerType(), True), StructField('revenue', DoubleType(), True), StructField('count', IntegerType(), True)])

In [40]:
# save the file

df_result.write.parquet('tmp/green-revenue')

                                                                                

In [44]:
df_result.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-15 20:00:00| 260|163.90000000000003|   14|
|2020-01-29 19:00:00| 166| 695.0099999999999|   45|
|2020-01-16 08:00:00|  41| 736.1399999999996|   54|
|2020-01-04 20:00:00| 129|            583.27|   38|
|2020-01-02 08:00:00|  66|            197.69|   10|
|2020-01-03 09:00:00|  61|            142.21|    9|
|2020-01-17 21:00:00| 236|              33.6|    4|
|2020-01-12 12:00:00|  82|            290.41|   14|
|2020-01-28 16:00:00| 197| 831.4399999999998|   18|
|2020-01-10 22:00:00|  95| 407.7100000000002|   37|
|2020-01-10 01:00:00| 215|            109.69|    2|
|2020-01-07 18:00:00|  25| 554.2900000000001|   37|
|2020-01-18 07:00:00|  55|              48.3|    1|
|2020-01-28 09:00:00| 166| 473.0200000000002|   36|
|2020-01-12 15:00:00|  82| 265.7900000000001|   29|
|2020-01-10 20:00:00|  66|            405.88|   21|
|2020-01-31 

### mapPartitions

In [45]:
# let's say we have columns to predict 'trip_distance' with a well-trained model. We can slice data into chunk to
# make prediction 


columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [46]:
# check number of partition trick

def apply_model_in_batch(partition):
    return [1]


In [47]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()

# 4 partitions are returned


[1, 1, 1, 1]

In [48]:
# check chunk size
def apply_model_in_batch(partition):
    cnt = 0
    for row in partition:
        cnt = cnt+1
    return[cnt]

In [49]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()


                                                                                

[1164928, 456935, 425873, 256781]

In [50]:
# put everything into dataframe only count
import pandas as pd

def apply_model_in_batch(partition):
    df = pd.DataFrame(partition,columns=columns)
    cnt = len(df)
    return[cnt]

In [51]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()


                                                                                

[1164928, 456935, 425873, 256781]

In [53]:
## Assuming we have a pretrained model as follow
#model = ...

def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [54]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

In [58]:
df_predicts = duration_rdd\
        .mapPartitions(apply_model_in_batch)\
        .toDF()\
        .drop('Index')


                                                                                

In [60]:
df_predicts.select('predicted_duration').show()

[Stage 39:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
|63.849999999999994|
|              40.0|
|              6.35|
|              6.25|
| 9.200000000000001|
|               3.8|
|16.599999999999998|
|             11.05|
|               4.5|
|              30.5|
|               8.7|
|5.8999999999999995|
|              11.0|
|              15.2|
|              4.25|
|25.299999999999997|
|7.8500000000000005|
|              34.0|
| 5.300000000000001|
|              6.15|
+------------------+
only showing top 20 rows



                                                                                