In [1]:
from datetime import datetime
import pyspark
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/02 16:28:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [4]:
df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .head(10)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 27, 18, 7, 53), PULocationID=82, total_amount=12.3),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 17, 7, 38, 57), PULocationID=41, total_amount=7.8),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 30, 0, 40, 5), PULocationID=82, total_amount=7.8),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 29, 18, 59, 48), PULocationID=130, total_amount=16.64),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 11, 11, 56, 18), PULocationID=119, total_amount=39.56),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 22, 13, 0, 15), PULocationID=75, total_amount=10.79),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 19, 23, 14, 34), PULocationID=41, total_amount=6.3),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 22, 10, 48, 42), PULocationID=183, total_amount=14.3),
 Row(lpep_pickup_datetime=datetime.datetime(2019, 1, 15, 16, 34, 38), PULocationID=72, total_amount=16.3),
 Row(lpep_pickup_datetime=datetime.date

In [5]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [6]:
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [7]:
rows = rdd.take(10)
row = rows[0]

                                                                                

In [8]:
"""
We need to generate intermediate results in a very similar way to the original SQL query, 
so we will need to create the composite key (hour, zone) and a composite value (amount, count),
which are the 2 halves of each record that the executors will generate. 
Once we have a function that generates the record, we will use the map() method, which takes an RDD, 
transforms it with a function (our key-value function) and returns a new RDD. 
"""
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [9]:
"""
We now need to use the reduceByKey() method, which will take all records with the same key and put them together in a single record
by transforming all the different values according to some rules which we can define with a custom function. 
Since we want to count the total amount and the total number of records, we just need to add the values: 
"""
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [10]:
from collections import namedtuple

In [11]:
# will help with schema creation
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [12]:
"""
The output we have is already usable but not very nice, so we map the output again in order to unwrap it. 
"""
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [13]:
from pyspark.sql import types

In [14]:
"""
Finally, we can take the resulting RDD and convert it to a dataframe with toDF(). 
We will need to generate a schema first because we lost it when converting RDDs:
"""
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [15]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

"""
We can use toDF() without any schema as an input parameter, 
but Spark will have to figure out the schema by itself which may take a substantial amount of time. 
Using namedtuple in the previous step allows Spark to infer the column names but Spark will still need to figure out the data types;
by passing a schema as a parameter we skip this step and get the output much faster.

As you can see, manipulating RDDs to perform SQL-like queries is complex and time-consuming.
Ever since Spark added support for dataframes and SQL, manipulating RDDs in this fashion has become obsolete,
but since dataframes are built on top of RDDs, knowing how they work can help us understand how to make better use of Spark.
"""

'\nWe can use toDF() without any schema as an input parameter, \nbut Spark will have to figure out the schema by itself which may take a substantial amount of time. \nUsing namedtuple in the previous step allows Spark to infer the column names but Spark will still need to figure out the data types;\nby passing a schema as a parameter we skip this step and get the output much faster.\n\nAs you can see, manipulating RDDs to perform SQL-like queries is complex and time-consuming.\nEver since Spark added support for dataframes and SQL, manipulating RDDs in this fashion has become obsolete,\nbut since dataframes are built on top of RDDs, knowing how they work can help us understand how to make better use of Spark.\n'

In [16]:
df_result.write.parquet('tmp/green-revenue')

                                                                                

## mapPartitions()

The mapPartitions() function behaves similarly to map() in how it receives an RDD as input and transforms it into another RDD 
with a function that we define but it transforms partitions rather than elements. 
In other words: map() creates a new RDD by transforming every single element, whereas mapPartitions() transforms every partition to create a new RDD.

mapPartitions() is a convenient method for dealing with large datasets because it allows us to separate it into chunks 
that we can process more easily, which is handy for workflows such as Machine Learning.


Example: service that predicts the duration of a trip.

In [17]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [18]:
import pandas as pd

In [19]:
rows = duration_rdd.take(10)

In [20]:
#model = ...

def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [21]:
# partitions can be unbalanced (some with more rows than others) and it may be necessary to handle but it's an expensive operation
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

"""
df.itertuples() is an iterable that returns a tuple containing all the values in a single row, for all rows.
Thus, row will contain a tuple with all the values for a single row.

yield is a Python keyword that behaves similarly to return but returns a generator object instead of a value.
This means that a function that uses yield can be iterated on. Spark makes use of the generator object in mapPartitions() to build the output RDD.
"""

'\ndf.itertuples() is an iterable that returns a tuple containing all the values in a single row, for all rows.\nThus, row will contain a tuple with all the values for a single row.\n\nyield is a Python keyword that behaves similarly to return but returns a generator object instead of a value.\nThis means that a function that uses yield can be iterated on. Spark makes use of the generator object in mapPartitions() to build the output RDD.\n'

In [22]:

df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch)\
    .toDF() \
    .drop('Index')

                                                                                

In [23]:
df_predicts.select('predicted_duration').show()

                                                                                

+------------------+
|predicted_duration|
+------------------+
|             14.55|
|               5.4|
|               6.2|
|              12.5|
|              43.6|
|               7.9|
|               4.1|
|16.299999999999997|
|             12.85|
|               4.0|
|              10.1|
|              45.0|
|               8.7|
|               9.7|
|17.450000000000003|
|              4.45|
|23.900000000000002|
|             13.45|
|              19.4|
|               2.9|
+------------------+
only showing top 20 rows



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 51286)
Traceback (most recent call last):
  File "/home/carolina/anaconda3/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/carolina/anaconda3/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/home/carolina/anaconda3/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/carolina/anaconda3/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/home/carolina/spark/spark-3.4.0-bin-hadoop3/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/carolina/spark/spark-3.4.0-bin-hadoop3/python/pyspark/accumulators.py", line 253, in poll
    if func():
       ^^^^^^
  File "/home/carolina/spark/s