In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .config("spark.executor.memory", "4g") \
    	.config("spark.driver.memory", "4g") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/12 16:05:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_green = spark.read.parquet('data/pq/green/*/*')
df_green.printSchema()

                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
df_green

DataFrame[VendorID: int, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: int, PULocationID: int, DOLocationID: int, passenger_count: int, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: double, improvement_surcharge: double, total_amount: double, payment_type: int, trip_type: int, congestion_surcharge: double]

In [7]:
df_green.rdd

MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0

In [6]:
df_green.rdd.take(5)

                                                                                

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 13, 38, 16), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=130, passenger_count=1, trip_distance=12.77, fare_amount=36.0, extra=0.0, mta_tax=0.5, tip_amount=2.05, tolls_amount=6.12, ehail_fee=None, improvement_surcharge=0.3, total_amount=44.97, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 20, 15, 46), store_and_fwd_flag=None, RatecodeID=None, PULocationID=67, DOLocationID=39, passenger_count=None, trip_distance=8.0, fare_amount=29.9, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=33.45, payment_type=None, trip_type=None, congestion_surcharge=None),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41)

In [8]:
rdd = \
    df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [9]:
rdd.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, total_amount=33.45),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, total_amount=12.74)]

In [10]:
rdd.filter(lambda row:True).take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97)]

In [11]:
from datetime import datetime

In [15]:
start = datetime(year=2021, month=1, day=1)

In [16]:
rdd.filter(lambda row: row.lpep_pickup_datetime >= start).take(1)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2021, 6, 18, 13, 31, 15), PULocationID=7, total_amount=3.3)]

In [17]:
start = datetime(year=2021, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [19]:
rows = rdd.take(10)
row = rows[0]

In [21]:
row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)

datetime.datetime(2020, 1, 23, 13, 0)

In [22]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97)

In [23]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [24]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(10)

                                                                                

[((datetime.datetime(2021, 6, 18, 13, 0), 7), (3.3, 1)),
 ((datetime.datetime(2021, 6, 24, 22, 0), 41), (18.55, 1)),
 ((datetime.datetime(2021, 6, 29, 12, 0), 43), (7.55, 1)),
 ((datetime.datetime(2021, 6, 13, 11, 0), 134), (39.61, 1)),
 ((datetime.datetime(2021, 6, 27, 20, 0), 75), (19.3, 1)),
 ((datetime.datetime(2021, 6, 23, 19, 0), 22), (63.3, 1)),
 ((datetime.datetime(2021, 6, 12, 10, 0), 41), (16.31, 1)),
 ((datetime.datetime(2021, 6, 19, 15, 0), 41), (9.8, 1)),
 ((datetime.datetime(2021, 6, 12, 21, 0), 189), (26.83, 1)),
 ((datetime.datetime(2021, 6, 18, 19, 0), 41), (33.35, 1))]

In [27]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [28]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(10)

                                                                                

[((datetime.datetime(2021, 6, 29, 12, 0), 43), (63.55, 6)),
 ((datetime.datetime(2021, 6, 19, 15, 0), 41), (280.36, 16)),
 ((datetime.datetime(2021, 6, 23, 18, 0), 41), (228.83000000000004, 13)),
 ((datetime.datetime(2021, 6, 11, 14, 0), 167), (37.1, 2)),
 ((datetime.datetime(2021, 6, 14, 21, 0), 93), (140.39999999999998, 3)),
 ((datetime.datetime(2021, 6, 9, 13, 0), 159), (81.45, 4)),
 ((datetime.datetime(2021, 6, 28, 9, 0), 197), (54.8, 1)),
 ((datetime.datetime(2021, 6, 15, 14, 0), 37), (23.11, 1)),
 ((datetime.datetime(2021, 6, 1, 11, 0), 42), (83.4, 6)),
 ((datetime.datetime(2021, 6, 11, 16, 0), 95), (185.06, 12))]

In [29]:
def unwrap(row):
    return(row[0][0], row[0][1], row[1][0], row[1][1])

In [30]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .take(10)

                                                                                

[(datetime.datetime(2021, 6, 29, 12, 0), 43, 63.55, 6),
 (datetime.datetime(2021, 6, 19, 15, 0), 41, 280.36, 16),
 (datetime.datetime(2021, 6, 23, 18, 0), 41, 228.83000000000004, 13),
 (datetime.datetime(2021, 6, 11, 14, 0), 167, 37.1, 2),
 (datetime.datetime(2021, 6, 14, 21, 0), 93, 140.39999999999998, 3),
 (datetime.datetime(2021, 6, 9, 13, 0), 159, 81.45, 4),
 (datetime.datetime(2021, 6, 28, 9, 0), 197, 54.8, 1),
 (datetime.datetime(2021, 6, 15, 14, 0), 37, 23.11, 1),
 (datetime.datetime(2021, 6, 1, 11, 0), 42, 83.4, 6),
 (datetime.datetime(2021, 6, 11, 16, 0), 95, 185.06, 12)]

In [32]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show()

                                                                                

+-------------------+---+------------------+---+
|                 _1| _2|                _3| _4|
+-------------------+---+------------------+---+
|2021-06-29 12:00:00| 43|             63.55|  6|
|2021-06-19 15:00:00| 41|            280.36| 16|
|2021-06-23 18:00:00| 41|228.83000000000004| 13|
|2021-06-11 14:00:00|167|              37.1|  2|
|2021-06-14 21:00:00| 93|140.39999999999998|  3|
|2021-06-09 13:00:00|159|             81.45|  4|
|2021-06-28 09:00:00|197|              54.8|  1|
|2021-06-15 14:00:00| 37|             23.11|  1|
|2021-06-01 11:00:00| 42|              83.4|  6|
|2021-06-11 16:00:00| 95|            185.06| 12|
|2021-06-21 18:00:00|181|             14.44|  1|
|2021-06-22 15:00:00|134|             59.62|  4|
|2021-06-29 13:00:00|145|             27.85|  1|
|2021-06-23 18:00:00|193|             22.47|  2|
|2021-06-22 06:00:00| 11|              10.3|  1|
|2021-06-10 21:00:00| 42|              91.0|  5|
|2021-06-02 14:00:00|119|             70.89|  4|
|2021-06-11 08:00:00

In [33]:
from collections import namedtuple

In [34]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [35]:
def unwrap(row):
    return RevenueRow(
                    hour = row[0][0],
                    zone = row[0][1],
                    revenue = row[1][0],
                    count = row[1][1]
            )

In [36]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() \
    .show()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-06-29 12:00:00|  43|             63.55|    6|
|2021-06-19 15:00:00|  41|            280.36|   16|
|2021-06-23 18:00:00|  41|228.83000000000004|   13|
|2021-06-11 14:00:00| 167|              37.1|    2|
|2021-06-14 21:00:00|  93|140.39999999999998|    3|
|2021-06-09 13:00:00| 159|             81.45|    4|
|2021-06-28 09:00:00| 197|              54.8|    1|
|2021-06-15 14:00:00|  37|             23.11|    1|
|2021-06-01 11:00:00|  42|              83.4|    6|
|2021-06-11 16:00:00|  95|            185.06|   12|
|2021-06-21 18:00:00| 181|             14.44|    1|
|2021-06-22 15:00:00| 134|             59.62|    4|
|2021-06-29 13:00:00| 145|             27.85|    1|
|2021-06-23 18:00:00| 193|             22.47|    2|
|2021-06-22 06:00:00|  11|              10.3|    1|
|2021-06-10 21:00:00|  42|              91.0|    5|
|2021-06-02 

In [39]:
df_result = \
    rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() 

                                                                                

In [40]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

In [43]:
result_schema = \
            types.StructType([
                types.StructField('hour', types.TimestampType(), True), 
                types.StructField('zone', types.IntegerType(), True), 
                types.StructField('revenue', types.DoubleType(), True), 
                types.StructField('count', types.IntegerType(), True)
                ])

In [44]:
df_result = \
    rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

In [45]:
df_result.show()



+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2021-06-29 12:00:00|  43|             63.55|    6|
|2021-06-19 15:00:00|  41|            280.36|   16|
|2021-06-23 18:00:00|  41|228.83000000000004|   13|
|2021-06-11 14:00:00| 167|              37.1|    2|
|2021-06-14 21:00:00|  93|140.39999999999998|    3|
|2021-06-09 13:00:00| 159|             81.45|    4|
|2021-06-28 09:00:00| 197|              54.8|    1|
|2021-06-15 14:00:00|  37|             23.11|    1|
|2021-06-01 11:00:00|  42|              83.4|    6|
|2021-06-11 16:00:00|  95|            185.06|   12|
|2021-06-21 18:00:00| 181|             14.44|    1|
|2021-06-22 15:00:00| 134|             59.62|    4|
|2021-06-29 13:00:00| 145|             27.85|    1|
|2021-06-23 18:00:00| 193|             22.47|    2|
|2021-06-22 06:00:00|  11|              10.3|    1|
|2021-06-10 21:00:00|  42|              91.0|    5|
|2021-06-02 

                                                                                

In [46]:
df_result.write.parquet('tmp/green-revenue')

                                                                                

In [None]:
""" 
select
    date_trunc('hour', lpep_pickup_datetime)  as hour,
    PULocationID as zones,
    sum(total_amount) as revenue_monthly_total_amount,
    count(*) as number_records
from green_trips
where lpep_pickup_datetime >= '2020-01-01 00:00:00'
and lpep_pickup_datetime is not null
group by 1, 2

;
"""

In [None]:
## map partition

In [49]:
df_green.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-23 13:10:15|  2020-01-23 13:38:16|                 N|         1|          74|         130|              1|        12.77|       36.0|  0.0|    0.

In [51]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'PULocationID', 'trip_distance']

df_green \
    .select(columns) \
    .show()

+--------+--------------------+------------+------------+-------------+
|VendorID|lpep_pickup_datetime|PULocationID|PULocationID|trip_distance|
+--------+--------------------+------------+------------+-------------+
|       2| 2020-01-23 13:10:15|          74|          74|        12.77|
|    null| 2020-01-20 15:09:00|          67|          67|          8.0|
|       2| 2020-01-15 20:23:41|         260|         260|         1.27|
|       2| 2020-01-05 16:32:26|          82|          82|         1.25|
|       2| 2020-01-29 19:22:42|         166|         166|         1.84|
|       2| 2020-01-15 11:07:42|         179|         179|         0.76|
|       2| 2020-01-16 08:22:29|          41|          41|         3.32|
|       2| 2020-01-28 17:05:28|          75|          75|         2.21|
|       1| 2020-01-22 14:51:37|         152|         152|          0.9|
|       2| 2020-01-31 10:25:04|          75|          75|          6.1|
|       2| 2020-01-20 15:50:54|          75|          75|       

In [52]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'PULocationID', 'trip_distance']

duration_rdd = \
    df_green \
    .select(columns) \
    .rdd

In [53]:
duration_rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, PULocationID=74, trip_distance=12.77),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, PULocationID=67, trip_distance=8.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, PULocationID=260, trip_distance=1.27),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, PULocationID=82, trip_distance=1.25),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, PULocationID=166, trip_distance=1.84)]

In [54]:
def apply_model_in_batch(partition):
    return [1]

In [55]:
rdd.mapPartitions(apply_model_in_batch).collect()

[1, 1, 1, 1]

In [56]:
def apply_model_in_batch(partition):
    cnt =0
    for row in partition:
        cnt = cnt + 1
    return [cnt]

In [57]:
rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[1141148, 438057, 432402, 292910]

In [58]:
rows = duration_rdd.take(10)

In [60]:
pd.DataFrame(rows, columns=columns)

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,PULocationID.1,trip_distance
0,2.0,2020-01-23 13:10:15,74,74,12.77
1,,2020-01-20 15:09:00,67,67,8.0
2,2.0,2020-01-15 20:23:41,260,260,1.27
3,2.0,2020-01-05 16:32:26,82,82,1.25
4,2.0,2020-01-29 19:22:42,166,166,1.84
5,2.0,2020-01-15 11:07:42,179,179,0.76
6,2.0,2020-01-16 08:22:29,41,41,3.32
7,2.0,2020-01-28 17:05:28,75,75,2.21
8,1.0,2020-01-22 14:51:37,152,152,0.9
9,2.0,2020-01-31 10:25:04,75,75,6.1


In [63]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    cnt = len(df)
    return [cnt]

In [64]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[1141148, 438057, 432402, 292910]

In [65]:
# mode = ... 

def model_predict(df):
    # y_pred = model.predit(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [68]:
## segway 
df = pd.DataFrame(rows, columns=columns)

In [69]:
list(df.itertuples())

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-23 13:10:15'), PULocationID=74, _4=74, trip_distance=12.77),
 Pandas(Index=1, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-20 15:09:00'), PULocationID=67, _4=67, trip_distance=8.0),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 20:23:41'), PULocationID=260, _4=260, trip_distance=1.27),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-05 16:32:26'), PULocationID=82, _4=82, trip_distance=1.25),
 Pandas(Index=4, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-29 19:22:42'), PULocationID=166, _4=166, trip_distance=1.84),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 11:07:42'), PULocationID=179, _4=179, trip_distance=0.76),
 Pandas(Index=6, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-16 08:22:29'), PULocationID=41, _4=41, trip_distance=3.32),
 Pandas(Index=7, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-28 17:

In [74]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    df['lpep_pickup_datetime'] = df['lpep_pickup_datetime'].astype(str)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

In [75]:
duration_rdd.mapPartitions(apply_model_in_batch).take(10)

                                                                                

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime='2020-01-23 13:10:15', PULocationID=74, _4=74, trip_distance=12.77, predicted_duration=63.849999999999994),
 Pandas(Index=1, VendorID=nan, lpep_pickup_datetime='2020-01-20 15:09:00', PULocationID=67, _4=67, trip_distance=8.0, predicted_duration=40.0),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime='2020-01-15 20:23:41', PULocationID=260, _4=260, trip_distance=1.27, predicted_duration=6.35),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime='2020-01-05 16:32:26', PULocationID=82, _4=82, trip_distance=1.25, predicted_duration=6.25),
 Pandas(Index=4, VendorID=2.0, lpep_pickup_datetime='2020-01-29 19:22:42', PULocationID=166, _4=166, trip_distance=1.84, predicted_duration=9.200000000000001),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime='2020-01-15 11:07:42', PULocationID=179, _4=179, trip_distance=0.76, predicted_duration=3.8),
 Pandas(Index=6, VendorID=2.0, lpep_pickup_datetime='2020-01-16 08:22:29', PULocationID=41, _4=41,

In [76]:
duration_rdd.mapPartitions(apply_model_in_batch).toDF().show()

[Stage 55:>                                                         (0 + 1) / 1]

+-----+--------+--------------------+------------+---+-------------+------------------+
|Index|VendorID|lpep_pickup_datetime|PULocationID| _4|trip_distance|predicted_duration|
+-----+--------+--------------------+------------+---+-------------+------------------+
|    0|     2.0| 2020-01-23 13:10:15|          74| 74|        12.77|63.849999999999994|
|    1|     NaN| 2020-01-20 15:09:00|          67| 67|          8.0|              40.0|
|    2|     2.0| 2020-01-15 20:23:41|         260|260|         1.27|              6.35|
|    3|     2.0| 2020-01-05 16:32:26|          82| 82|         1.25|              6.25|
|    4|     2.0| 2020-01-29 19:22:42|         166|166|         1.84| 9.200000000000001|
|    5|     2.0| 2020-01-15 11:07:42|         179|179|         0.76|               3.8|
|    6|     2.0| 2020-01-16 08:22:29|          41| 41|         3.32|16.599999999999998|
|    7|     2.0| 2020-01-28 17:05:28|          75| 75|         2.21|             11.05|
|    8|     1.0| 2020-01-22 14:5

                                                                                

In [77]:
df_predicts = duration_rdd \
                    .mapPartitions(apply_model_in_batch) \
                    .toDF() \
                    .drop('Index')

                                                                                

In [78]:
df_predicts.show()

[Stage 57:>                                                         (0 + 1) / 1]

+--------+--------------------+------------+---+-------------+------------------+
|VendorID|lpep_pickup_datetime|PULocationID| _4|trip_distance|predicted_duration|
+--------+--------------------+------------+---+-------------+------------------+
|     2.0| 2020-01-23 13:10:15|          74| 74|        12.77|63.849999999999994|
|     NaN| 2020-01-20 15:09:00|          67| 67|          8.0|              40.0|
|     2.0| 2020-01-15 20:23:41|         260|260|         1.27|              6.35|
|     2.0| 2020-01-05 16:32:26|          82| 82|         1.25|              6.25|
|     2.0| 2020-01-29 19:22:42|         166|166|         1.84| 9.200000000000001|
|     2.0| 2020-01-15 11:07:42|         179|179|         0.76|               3.8|
|     2.0| 2020-01-16 08:22:29|          41| 41|         3.32|16.599999999999998|
|     2.0| 2020-01-28 17:05:28|          75| 75|         2.21|             11.05|
|     1.0| 2020-01-22 14:51:37|         152|152|          0.9|               4.5|
|     2.0| 2020-

                                                                                

In [79]:
df_predicts.select('predicted_duration').show()

[Stage 58:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
|63.849999999999994|
|              40.0|
|              6.35|
|              6.25|
| 9.200000000000001|
|               3.8|
|16.599999999999998|
|             11.05|
|               4.5|
|              30.5|
|               8.7|
|5.8999999999999995|
|              11.0|
|              15.2|
|              4.25|
|25.299999999999997|
|7.8500000000000005|
|              34.0|
| 5.300000000000001|
|              6.15|
+------------------+
only showing top 20 rows



----------------------------------------                                        
Exception occurred during processing of request from ('127.0.0.1', 34866)
Traceback (most recent call last):
  File "/home/tobi/anaconda3/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/tobi/anaconda3/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/home/tobi/anaconda3/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/tobi/anaconda3/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/home/tobi/spark/spark-3.4.3-bin-hadoop3/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/home/tobi/spark/spark-3.4.3-bin-hadoop3/python/pyspark/accumulators.py", line 253, in poll
    if func():
       ^^^^^^
  File "/home/