In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


In [3]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [4]:
df_green.registerTempTable('green')



In [5]:
df_green_revenue = spark.sql("""
SELECT
    -- Reveneue grouping 
    date_trunc("hour", "pickup_datetime") AS revenue_month, 
    PULocationID AS zone,
                      
    SUM(total_amount) AS revenue_monthly_total_amount,
    COUNT(1) as number_records
FROM green

WHERE lpep_pickup_datetime >='2020-01-01 00:00:00'
GROUP BY 1,2
ORDER BY 1,2
""")

In [6]:
df_green.rdd.take(2)

[Row(VendorID=1, lpep_pickup_datetime=datetime.datetime(2020, 1, 3, 19, 0, 1), lpep_dropoff_datetime=datetime.datetime(2020, 1, 3, 19, 5, 48), store_and_fwd_flag='N', RatecodeID=1, PULocationID=244, DOLocationID=116, passenger_count=1, trip_distance=1.0, fare_amount=6.0, extra=1.0, mta_tax=0.5, tip_amount=1.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=8.8, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 55, 1), lpep_dropoff_datetime=datetime.datetime(2020, 1, 29, 19, 59, 12), store_and_fwd_flag='N', RatecodeID=1, PULocationID=166, DOLocationID=24, passenger_count=1, trip_distance=0.82, fare_amount=5.0, extra=1.0, mta_tax=0.5, tip_amount=1.36, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=8.16, payment_type=1, trip_type=1, congestion_surcharge=0.0)]

In [8]:
rdd = df_green \
    .select("lpep_pickup_datetime","PULocationID","total_amount") \
    .rdd

In [9]:
rdd.take(2)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 3, 19, 0, 1), PULocationID=244, total_amount=8.8),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 55, 1), PULocationID=166, total_amount=8.16)]

In [11]:
from datetime import datetime

In [18]:
start = datetime(year=2020, month=1, day=1)
def filter_outliers(row):
    return row.lpep_pickup_datetime < start

In [27]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)


In [31]:
def caculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    ouput_amount = left_amount + right_amount
    output_count = left_count + right_count

    return (ouput_amount, output_count)

In [34]:
from collections import namedtuple
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [35]:
def unwrap(row):
    return RevenueRow(hour = row[0][0], 
                      zone = row[0][1], 
                      revenue = row[1][0],
                      count = row[1][1])

In [36]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(caculate_revenue) \
    .map(unwrap) \
    .take(10)

[RevenueRow(hour=datetime.datetime(2009, 1, 1, 0, 0), zone=75, revenue=98.39999999999999, count=2),
 RevenueRow(hour=datetime.datetime(2010, 9, 23, 0, 0), zone=95, revenue=38.56, count=3),
 RevenueRow(hour=datetime.datetime(2019, 12, 18, 15, 0), zone=264, revenue=4.81, count=1),
 RevenueRow(hour=datetime.datetime(2009, 1, 1, 0, 0), zone=193, revenue=12.350000000000001, count=9),
 RevenueRow(hour=datetime.datetime(2010, 9, 23, 0, 0), zone=257, revenue=45.3, count=1),
 RevenueRow(hour=datetime.datetime(2009, 1, 1, 0, 0), zone=264, revenue=0.0, count=1),
 RevenueRow(hour=datetime.datetime(2009, 1, 1, 0, 0), zone=42, revenue=8.3, count=1),
 RevenueRow(hour=datetime.datetime(2009, 1, 1, 1, 0), zone=264, revenue=0.0, count=1),
 RevenueRow(hour=datetime.datetime(2009, 1, 1, 1, 0), zone=75, revenue=6.3, count=1),
 RevenueRow(hour=datetime.datetime(2009, 1, 1, 7, 0), zone=130, revenue=10.3, count=1)]

In [42]:
from pyspark.sql import types

In [43]:
schema = types.StructType([
            types.StructField('hour', types.TimestampType(), True), 
            types.StructField('zone', types.LongType(), True), 
            types.StructField('revenue', types.DoubleType(), True), 
            types.StructField('count', types.LongType(), True)
            ])

In [44]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(caculate_revenue) \
    .map(unwrap) \
    .toDF(schema)

In [46]:
df_result.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2009-01-01 00:00:00|  75| 98.39999999999999|    2|
|2010-09-23 00:00:00|  95|             38.56|    3|
|2019-12-18 15:00:00| 264|              4.81|    1|
|2009-01-01 00:00:00| 193|12.350000000000001|    9|
|2010-09-23 00:00:00| 257|              45.3|    1|
|2009-01-01 00:00:00| 264|               0.0|    1|
|2009-01-01 00:00:00|  42|               8.3|    1|
|2009-01-01 01:00:00| 264|               0.0|    1|
|2009-01-01 01:00:00|  75|               6.3|    1|
|2009-01-01 07:00:00| 130|              10.3|    1|
|2019-12-31 23:00:00| 116|              6.81|    1|
|2009-01-01 00:00:00|  41|21.650000000000002|    3|
|2019-12-31 23:00:00| 130|             29.96|    3|
|2019-12-31 23:00:00| 179|               7.8|    1|
|2009-01-01 01:00:00| 193|               0.0|    2|
|2009-01-01 00:00:00| 126|               6.3|    1|
|2008-12-31 

In [47]:
df_result.write.parquet('data/report/rdd_report_green')

# MAP PARTITION

In [7]:
columns = ["VendorID", "lpep_pickup_datetime", "lpep_dropoff_datetime", "PULocationID", "DOLocationID", "trip_distance"]
duration_rdd = df_green \
                .select(columns) \
                .rdd

In [16]:
import pandas as pd

In [17]:
rows = duration_rdd.take(10)

In [18]:
pd.DataFrame(rows, columns=columns)

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,trip_distance
0,1,2020-01-03 19:00:01,2020-01-03 19:05:48,244,116,1.0
1,2,2020-01-29 19:55:01,2020-01-29 19:59:12,166,24,0.82
2,1,2020-01-02 10:20:42,2020-01-02 10:21:07,145,145,0.0
3,2,2020-01-07 08:30:00,2020-01-07 08:44:00,191,131,3.06
4,2,2020-01-09 17:40:00,2020-01-09 18:28:00,112,72,6.62
5,2,2020-01-07 17:48:13,2020-01-07 18:00:44,41,151,1.56
6,2,2020-01-11 10:09:00,2020-01-11 10:18:00,77,76,1.32
7,2,2020-01-24 21:25:12,2020-01-24 21:27:41,42,42,0.74
8,2,2020-01-04 17:06:40,2020-01-04 17:23:26,82,223,4.34
9,2,2020-01-26 15:07:00,2020-01-26 15:23:00,41,127,4.17


In [22]:
model = "..."

def model_predict(df):
    # y_pred = model.predict(df)
    y_pred = df.trip_distance*5
    return y_pred

In [29]:
def apply_model_in_batch(partition):
    # cnt = 0
    # for row in partition:
    #     cnt +=1

    # cnt = len(df)

    # return [cnt]
    df = pd.DataFrame(partition, columns=columns)

    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

 

In [33]:
duration_rdd \
            .mapPartitions(apply_model_in_batch) \
            .take(5)

[Pandas(Index=0, VendorID=1, lpep_pickup_datetime=Timestamp('2020-01-03 19:00:01'), lpep_dropoff_datetime=Timestamp('2020-01-03 19:05:48'), PULocationID=244, DOLocationID=116, trip_distance=1.0, predicted_duration=5.0),
 Pandas(Index=1, VendorID=2, lpep_pickup_datetime=Timestamp('2020-01-29 19:55:01'), lpep_dropoff_datetime=Timestamp('2020-01-29 19:59:12'), PULocationID=166, DOLocationID=24, trip_distance=0.82, predicted_duration=4.1),
 Pandas(Index=2, VendorID=1, lpep_pickup_datetime=Timestamp('2020-01-02 10:20:42'), lpep_dropoff_datetime=Timestamp('2020-01-02 10:21:07'), PULocationID=145, DOLocationID=145, trip_distance=0.0, predicted_duration=0.0),
 Pandas(Index=3, VendorID=2, lpep_pickup_datetime=Timestamp('2020-01-07 08:30:00'), lpep_dropoff_datetime=Timestamp('2020-01-07 08:44:00'), PULocationID=191, DOLocationID=131, trip_distance=3.06, predicted_duration=15.3),
 Pandas(Index=4, VendorID=2, lpep_pickup_datetime=Timestamp('2020-01-09 17:40:00'), lpep_dropoff_datetime=Timestamp('2

In [31]:
df_predicts = duration_rdd \
                .mapPartitions(apply_model_in_batch) \
                .toDF() \
                .drop("Index")

In [32]:
df_predicts.show()

+--------+--------------------+---------------------+------------+------------+-------------+------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+--------+--------------------+---------------------+------------+------------+-------------+------------------+
|       1|                  {}|                   {}|         244|         116|          1.0|               5.0|
|       2|                  {}|                   {}|         166|          24|         0.82|               4.1|
|       1|                  {}|                   {}|         145|         145|          0.0|               0.0|
|       2|                  {}|                   {}|         191|         131|         3.06|              15.3|
|       2|                  {}|                   {}|         112|          72|         6.62|              33.1|
|       2|                  {}|                   {}|          41|         151|         1.56| 7.