## RDD - Resilient Distributed Datasets

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [3]:
df_green = spark.read.parquet('data/pq/green/*/*')

```
SELECT 
    -- Reveneue grouping 
    date_trunc('hour', lpep_pickup_datetime) AS hour,  
    PULocationID AS  zone,

    -- Revenue calculation  
    SUM(total_amount) AS revenue_monthly_total_amount,
    COUNT(1) AS number_records
 
FROM
    green
WHERE
    lpep_pickup_datetime >='2020-01-01 00:00:00'
GROUP BY
    1, 2 
ORDER BY
    1,2
```

In [4]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-14 13:28:09|  2020-01-14 13:35:19|                 N|         1|          74|          75|              1|         1.35|        7.0|  0.0|    0.

In [5]:
df_green.rdd

MapPartitionsRDD[11] at javaToPython at NativeMethodAccessorImpl.java:0

In [6]:
df_green.rdd.take(10)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 14, 13, 28, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 14, 13, 35, 19), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=75, passenger_count=1, trip_distance=1.35, fare_amount=7.0, extra=0.0, mta_tax=0.5, tip_amount=1.56, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=9.36, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 28, 12, 11, 44), lpep_dropoff_datetime=datetime.datetime(2020, 1, 28, 12, 25, 21), store_and_fwd_flag='N', RatecodeID=1, PULocationID=129, DOLocationID=179, passenger_count=1, trip_distance=3.73, fare_amount=13.5, extra=0.0, mta_tax=0.5, tip_amount=1.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=15.3, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 11, 38), lpep_dropoff_

### SQL To Implement
```
SELECT 
    -- Reveneue grouping 
    date_trunc('hour', lpep_pickup_datetime) AS hour,  
    PULocationID AS  zone,

    -- Revenue calculation  
    SUM(total_amount) AS revenue_monthly_total_amount,
    COUNT(1) AS number_records
 
FROM
    green
WHERE
    lpep_pickup_datetime >='2020-01-01 00:00:00'
GROUP BY
    1, 2 
ORDER BY
    1,2
```

#### WHERE vs .filter()

In [16]:
rdd = df_green\
        .select( "lpep_pickup_datetime", 'PULocationID', 'total_amount')\
        .rdd 

In [7]:
df_green.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 14, 13, 28, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 14, 13, 35, 19), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=75, passenger_count=1, trip_distance=1.35, fare_amount=7.0, extra=0.0, mta_tax=0.5, tip_amount=1.56, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=9.36, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 28, 12, 11, 44), lpep_dropoff_datetime=datetime.datetime(2020, 1, 28, 12, 25, 21), store_and_fwd_flag='N', RatecodeID=1, PULocationID=129, DOLocationID=179, passenger_count=1, trip_distance=3.73, fare_amount=13.5, extra=0.0, mta_tax=0.5, tip_amount=1.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=15.3, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 11, 38), lpep_dropoff_

In [8]:
from datetime import datetime

In [9]:
start = datetime(year=2020, month=1, day=1)

def filter_outlier(row):
    return row.lpep_pickup_datetime >= start

In [26]:
# Change to smaller dataset to save computation resource
rdd_first_20 = rdd.take(20)
rdd_20 = spark.sparkContext.parallelize(rdd_first_20) 

In [17]:
rdd_20\
    .filter(filter_outlier)\
    .take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 14, 13, 28, 9), PULocationID=74, total_amount=9.36)]

#### GROUP BY vs .map()

In [18]:
rows = rdd.take(10)
row = rows[0]
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 14, 13, 28, 9), PULocationID=74, total_amount=9.36)

In [19]:
row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)

datetime.datetime(2020, 1, 14, 13, 0)

In [20]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)
    
    return (key, value)

In [21]:
rdd_20\
    .filter(filter_outlier)\
    .map(prepare_for_grouping)\
    .take(10)

[((datetime.datetime(2020, 1, 14, 13, 0), 74), (9.36, 1)),
 ((datetime.datetime(2020, 1, 28, 12, 0), 129), (15.3, 1)),
 ((datetime.datetime(2020, 1, 4, 11, 0), 25), (28.88, 1)),
 ((datetime.datetime(2020, 1, 30, 10, 0), 182), (12.3, 1)),
 ((datetime.datetime(2020, 1, 30, 20, 0), 210), (7.0, 1)),
 ((datetime.datetime(2020, 1, 6, 14, 0), 33), (9.3, 1)),
 ((datetime.datetime(2020, 1, 9, 18, 0), 42), (6.3, 1)),
 ((datetime.datetime(2020, 1, 29, 4, 0), 116), (5.81, 1)),
 ((datetime.datetime(2020, 1, 9, 15, 0), 223), (21.11, 1)),
 ((datetime.datetime(2020, 1, 31, 8, 0), 55), (38.3, 1))]

In [23]:
# reduce
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    output_amount = left_amount + right_amount
    output_count = left_count + right_count 

    return (output_amount, output_count)

In [31]:
# Reduce by key
rdd_20\
    .filter(filter_outlier)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .take(10)

[((datetime.datetime(2020, 1, 30, 16, 0), 33), (6.3, 1)),
 ((datetime.datetime(2020, 1, 14, 18, 0), 75), (8.55, 1)),
 ((datetime.datetime(2020, 1, 3, 19, 0), 74), (15.3, 1)),
 ((datetime.datetime(2020, 1, 5, 0, 0), 42), (9.3, 1)),
 ((datetime.datetime(2020, 1, 28, 12, 0), 129), (15.3, 1)),
 ((datetime.datetime(2020, 1, 4, 11, 0), 25), (28.88, 1)),
 ((datetime.datetime(2020, 1, 3, 15, 0), 41), (6.8, 1)),
 ((datetime.datetime(2020, 1, 17, 22, 0), 83), (10.3, 1)),
 ((datetime.datetime(2020, 1, 9, 15, 0), 223), (21.11, 1)),
 ((datetime.datetime(2020, 1, 30, 20, 0), 210), (7.0, 1))]

In [37]:
# change back to dataframe
def unwrap(row):
    return(row[0][0],row[0][1],row[1][0],row[1][1])

In [32]:
# Reduce by key
rdd_20\
    .filter(filter_outlier)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF()\
    .show()

+--------------------+---+---+-----+---+
|                  _1| _2| _3|   _4| _5|
+--------------------+---+---+-----+---+
|{2020-01-30 16:00...|[0]| 33|  6.3|  1|
|{2020-01-14 18:00...|[0]| 75| 8.55|  1|
|{2020-01-03 19:00...|[0]| 74| 15.3|  1|
|{2020-01-05 00:00...|[0]| 42|  9.3|  1|
|{2020-01-28 12:00...|[0]|129| 15.3|  1|
|{2020-01-04 11:00...|[0]| 25|28.88|  1|
|{2020-01-03 15:00...|[0]| 41|  6.8|  1|
|{2020-01-17 22:00...|[0]| 83| 10.3|  1|
|{2020-01-09 15:00...|[0]|223|21.11|  1|
|{2020-01-30 20:00...|[0]|210|  7.0|  1|
|{2020-01-14 13:00...|[0]| 74| 9.36|  1|
|{2020-01-14 08:00...|[0]|166| 12.0|  1|
|{2020-01-06 14:00...|[0]| 33|  9.3|  1|
|{2020-01-29 04:00...|[0]|116| 5.81|  1|
|{2020-01-15 19:00...|[0]|  7|  7.8|  1|
|{2020-01-09 18:00...|[0]| 42|  6.3|  1|
|{2020-01-31 08:00...|[0]| 55| 38.3|  1|
|{2020-01-30 10:00...|[0]|182| 12.3|  1|
|{2020-01-15 16:00...|[0]| 38|  4.8|  1|
|{2020-01-01 16:00...|[0]|134|14.76|  1|
+--------------------+---+---+-----+---+



In [33]:
# rename columns
from collections import namedtuple

In [34]:
RevenueRow = namedtuple('RevenueRow',['hour','zone','revenue','count'])

In [38]:
# change back to dataframe
def unwrap(row):
    return RevenueRow(
        hour=row[0][0],
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1])

In [39]:
# Reduce by key
rdd_20\
    .filter(filter_outlier)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF()\
    .show()

+-------------------+----+-------+-----+
|               hour|zone|revenue|count|
+-------------------+----+-------+-----+
|2020-01-30 16:00:00|  33|    6.3|    1|
|2020-01-14 18:00:00|  75|   8.55|    1|
|2020-01-03 19:00:00|  74|   15.3|    1|
|2020-01-05 00:00:00|  42|    9.3|    1|
|2020-01-28 12:00:00| 129|   15.3|    1|
|2020-01-04 11:00:00|  25|  28.88|    1|
|2020-01-03 15:00:00|  41|    6.8|    1|
|2020-01-17 22:00:00|  83|   10.3|    1|
|2020-01-09 15:00:00| 223|  21.11|    1|
|2020-01-30 20:00:00| 210|    7.0|    1|
|2020-01-14 13:00:00|  74|   9.36|    1|
|2020-01-14 08:00:00| 166|   12.0|    1|
|2020-01-06 14:00:00|  33|    9.3|    1|
|2020-01-29 04:00:00| 116|   5.81|    1|
|2020-01-15 19:00:00|   7|    7.8|    1|
|2020-01-09 18:00:00|  42|    6.3|    1|
|2020-01-31 08:00:00|  55|   38.3|    1|
|2020-01-30 10:00:00| 182|   12.3|    1|
|2020-01-15 16:00:00|  38|    4.8|    1|
|2020-01-01 16:00:00| 134|  14.76|    1|
+-------------------+----+-------+-----+



In [40]:
# save to variable
df_result = rdd_20\
    .filter(filter_outlier)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF() 

In [41]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

In [42]:
from pyspark.sql import types

In [43]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True), 
    types.StructField('zone', types.IntegerType(), True), 
    types.StructField('revenue', types.DoubleType(), True), 
    types.StructField('count', types.IntegerType(), True)])

In [44]:
# save to variable
df_result = rdd_20\
    .filter(filter_outlier)\
    .map(prepare_for_grouping)\
    .reduceByKey(calculate_revenue)\
    .map(unwrap)\
    .toDF(result_schema) 

In [45]:
df_result.show()

+-------------------+----+-------+-----+
|               hour|zone|revenue|count|
+-------------------+----+-------+-----+
|2020-01-30 16:00:00|  33|    6.3|    1|
|2020-01-14 18:00:00|  75|   8.55|    1|
|2020-01-03 19:00:00|  74|   15.3|    1|
|2020-01-05 00:00:00|  42|    9.3|    1|
|2020-01-28 12:00:00| 129|   15.3|    1|
|2020-01-04 11:00:00|  25|  28.88|    1|
|2020-01-03 15:00:00|  41|    6.8|    1|
|2020-01-17 22:00:00|  83|   10.3|    1|
|2020-01-09 15:00:00| 223|  21.11|    1|
|2020-01-30 20:00:00| 210|    7.0|    1|
|2020-01-14 13:00:00|  74|   9.36|    1|
|2020-01-14 08:00:00| 166|   12.0|    1|
|2020-01-06 14:00:00|  33|    9.3|    1|
|2020-01-29 04:00:00| 116|   5.81|    1|
|2020-01-15 19:00:00|   7|    7.8|    1|
|2020-01-09 18:00:00|  42|    6.3|    1|
|2020-01-31 08:00:00|  55|   38.3|    1|
|2020-01-30 10:00:00| 182|   12.3|    1|
|2020-01-15 16:00:00|  38|    4.8|    1|
|2020-01-01 16:00:00| 134|  14.76|    1|
+-------------------+----+-------+-----+



In [47]:
df_result.write.parquet('tmp/green_revenue')

## Map Partition

In [7]:
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-14 13:28:09|  2020-01-14 13:35:19|                 N|         1|          74|          75|              1|         1.35|        7.0|  0.0|    0.

In [12]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID','trip_distance']

duration_rdd = df_green\
    .select(columns)\
    .rdd

In [19]:
# Change to smaller dataset to save computation resource
rdd_first_20 = rdd.take(20)
rdd_20 = spark.sparkContext.parallelize(rdd_first_20) 

In [18]:
def apply_model_batch(partition):
    cnt = 0

    for row in partition:
        cnt += 1
    
    return [cnt]

In [21]:
rdd_20.mapPartitions(apply_model_batch).collect()

[1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2, 1, 1, 1, 2]

In [22]:
# turn partition to dataframe
import pandas as pd

In [23]:
rows = duration_rdd.take(10)

In [27]:
pd.DataFrame(rows, columns = columns)

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2.0,2020-01-14 13:28:09,74,75,1.35
1,2.0,2020-01-28 12:11:44,129,179,3.73
2,,2020-01-04 11:38:00,25,41,11.19
3,2.0,2020-01-30 10:03:36,182,242,1.32
4,1.0,2020-01-30 20:56:17,210,210,0.8
5,2.0,2020-01-06 14:40:15,33,66,1.24
6,2.0,2020-01-09 18:01:49,42,116,0.52
7,2.0,2020-01-29 04:39:21,116,116,0.58
8,,2020-01-09 15:01:00,223,92,6.79
9,2.0,2020-01-31 08:32:25,55,55,9.69


In [28]:
def apply_model_batch(rows):
    df = pd.DataFrame(rows, columns = columns)
    cnt = len(df)
    
    return [cnt]

In [31]:
# Change to smaller dataset to save computation resource
duration_rdd_first_30 = duration_rdd.take(30)
duration_rdd_30 = spark.sparkContext.parallelize(duration_rdd_first_30) 

In [33]:
duration_rdd_30.mapPartitions(apply_model_batch).collect()
# can use isclice iterator to get the chunks in same size

[1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2]

In [35]:
df = pd.DataFrame(rows,columns=columns)

In [36]:
list(df.itertuples())

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-14 13:28:09'), PULocationID=74, DOLocationID=75, trip_distance=1.35),
 Pandas(Index=1, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-28 12:11:44'), PULocationID=129, DOLocationID=179, trip_distance=3.73),
 Pandas(Index=2, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-04 11:38:00'), PULocationID=25, DOLocationID=41, trip_distance=11.19),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-30 10:03:36'), PULocationID=182, DOLocationID=242, trip_distance=1.32),
 Pandas(Index=4, VendorID=1.0, lpep_pickup_datetime=Timestamp('2020-01-30 20:56:17'), PULocationID=210, DOLocationID=210, trip_distance=0.8),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-06 14:40:15'), PULocationID=33, DOLocationID=66, trip_distance=1.24),
 Pandas(Index=6, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-09 18:01:49'), PULocationID=42, DOLocationID=116, trip_distance=0.52),
 Panda

In [48]:
def plus_seq():
    i = 0

    while True:
        yield i
        i = i+1

        if i > 15:
            break

In [49]:
seq = plus_seq()

In [50]:
list(seq)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

In [51]:
next(seq)

StopIteration: 

In [34]:
# model = ...

def model_predict(df):
    # y_pred = model_predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [52]:
def apply_model_batch(rows):
    df = pd.DataFrame(rows, columns = columns)
    cnt = len(df)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions 
    
    for row in df.itertuples():
        yield row 

In [54]:
duration_rdd_30.mapPartitions(apply_model_batch).toDF().show()

+-----+--------+--------------------+------------+------------+-------------+------------------+
|Index|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+-----+--------+--------------------+------------+------------+-------------+------------------+
|    0|       2|                  {}|          74|          75|         1.35|              6.75|
|    0|    NULL|                  {}|         129|         179|         3.73|             18.65|
|    1|    NULL|                  {}|          25|          41|        11.19|55.949999999999996|
|    0|       2|                  {}|         182|         242|         1.32|6.6000000000000005|
|    1|       1|                  {}|         210|         210|          0.8|               4.0|
|    0|       2|                  {}|          33|          66|         1.24|               6.2|
|    1|       2|                  {}|          42|         116|         0.52|               2.6|
|    0|    NULL|              

In [59]:
# Drop index and convert Pandas Timestamp to string, spark does not reccognize Pandas Timestamp
def apply_model_batch(rows):
    df = pd.DataFrame(rows, columns = columns)
    df['lpep_pickup_datetime'] = df['lpep_pickup_datetime'].astype(str) # string conversion
    predictions = model_predict(df)
    df['predicted_duraction'] = predictions
    for row in df.itertuples():
        yield row

In [60]:
df_predict = duration_rdd_30\
                .mapPartitions(apply_model_batch)\
                .toDF()\
                .drop('Index')

In [61]:
df_predict.show()

+--------+--------------------+------------+------------+-------------+-------------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duraction|
+--------+--------------------+------------+------------+-------------+-------------------+
|       2| 2020-01-14 13:28:09|          74|          75|         1.35|               6.75|
|    NULL| 2020-01-28 12:11:44|         129|         179|         3.73|              18.65|
|    NULL| 2020-01-04 11:38:00|          25|          41|        11.19| 55.949999999999996|
|       2| 2020-01-30 10:03:36|         182|         242|         1.32| 6.6000000000000005|
|       1| 2020-01-30 20:56:17|         210|         210|          0.8|                4.0|
|       2| 2020-01-06 14:40:15|          33|          66|         1.24|                6.2|
|       2| 2020-01-09 18:01:49|          42|         116|         0.52|                2.6|
|    NULL| 2020-01-29 04:39:21|         116|         116|         0.58|         