### This notebook is for explaining the concepts of RDD's in spark and it's internals with code

## Setup spark

### Setup python environments

In [1]:
# Add the vevn to spark's settings, so inject the venv’s Python into both driver & worker configs before recreating the session, to find the right python interpreter, since the notebook us running within the venv kernel, we can just use sys.executable
import os, sys

print(sys.executable)
venv_python = sys.executable

# 1) Ensure the worker uses exactly this Python executable:
os.environ['PYSPARK_PYTHON'] = venv_python
os.environ['PYSPARK_DRIVER_PYTHON'] = venv_python

print(os.environ['PYSPARK_PYTHON'])
print(os.environ['PYSPARK_DRIVER_PYTHON'])


c:\Sandeep SSD\Programming SSD\Data Engineering Zoomcamp\data-engineering-zoomcamp\dataenginzoomvenv\Scripts\python.exe
c:\Sandeep SSD\Programming SSD\Data Engineering Zoomcamp\data-engineering-zoomcamp\dataenginzoomvenv\Scripts\python.exe
c:\Sandeep SSD\Programming SSD\Data Engineering Zoomcamp\data-engineering-zoomcamp\dataenginzoomvenv\Scripts\python.exe


### Launch spark with those python variables

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.pyspark.python", venv_python) \
    .config("spark.pyspark.driver.python", venv_python) \
    .getOrCreate()

In [3]:
# Check which port the Spark UI is running on
print(spark.sparkContext.uiWebUrl)

http://192.168.0.181:4040


In [4]:
# Use 2 * since we have year subfolders, and month subfolders, okay...
df_green = spark.read.parquet('../../Data/data/csv/green/spark_parquet/*/*')

In [5]:
# Dataframes have rdd's in them 
df_green.rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), lpep_dropoff_datetime=datetime.datetime(2020, 1, 12, 18, 19, 52), store_and_fwd_flag='N', RatecodeID=1, PULocationID=41, DOLocationID=41, passenger_count=1, trip_distance=0.78, fare_amount=5.5, extra=0.0, mta_tax=0.5, tip_amount=1.58, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=7.88, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 31, 20, 24, 10), lpep_dropoff_datetime=datetime.datetime(2020, 1, 31, 20, 31, 51), store_and_fwd_flag='N', RatecodeID=1, PULocationID=173, DOLocationID=70, passenger_count=1, trip_distance=0.98, fare_amount=7.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=8.3, payment_type=2, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 7, 8, 16, 53), lpep_dropoff_dat

### We are going to reproduce this query
```
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
```

In [6]:
# So select only those columns
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

#### Implementation of the where filter using rdds

In [7]:
from datetime import datetime

In [8]:
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [9]:
rows = rdd.take(10)
row = rows[0]

In [10]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 12, 18, 15, 4), PULocationID=41, total_amount=7.88)

### Explaining groupby filter in RDDs

In [11]:
# Truncating the timestamp values grouping by and selecting the rows
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [12]:
# Will be used as the reducebykey method
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [13]:
from collections import namedtuple

In [14]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [15]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [16]:
from pyspark.sql import types

In [17]:
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [18]:
# Reducebykey reduces duplicates values of the same key to only one
# Unwrap to unnest the dataset
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

In [19]:
df_result.show()

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-07 08:00:00|  74|1497.2599999999989|  104|
|2020-01-08 20:00:00| 254|            126.62|    6|
|2020-01-03 14:00:00| 242|            163.62|   11|
|2020-01-31 21:00:00|  41| 588.1600000000001|   40|
|2020-01-06 14:00:00|  95|511.40000000000003|   27|
|2020-01-30 17:00:00| 247|              21.6|    2|
|2020-01-05 04:00:00|  74|109.21999999999998|    7|
|2020-01-05 13:00:00| 215|            515.66|   18|
|2020-01-19 16:00:00| 181|            159.35|   14|
|2020-01-18 21:00:00| 188|             52.26|    3|
|2020-01-20 15:00:00|  25|            134.59|   12|
|2020-01-04 12:00:00|  78|            146.93|    3|
|2020-01-31 18:00:00|  75|1244.8699999999994|   89|
|2020-01-20 01:00:00|  74|             42.32|    4|
|2020-01-10 19:00:00|  74|1331.4899999999996|   89|
|2020-01-31 23:00:00| 134|             57.32|    5|
|2020-01-16 

In [21]:
df_result.write.parquet('../../Data/data/module-05-batch/08rdds/tmp/green-revenue', mode='overwrite')

### Explaining mapPartition in RDDs

#### Imagine in Machine Learning we want to create a model that predicts based on the pickuptime the length of the duration of the trip

In [23]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [24]:
import pandas as pd

In [25]:
rows = duration_rdd.take(10)

In [26]:
df = pd.DataFrame(rows, columns=columns)

In [27]:
columns

['VendorID',
 'lpep_pickup_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_distance']

In [28]:
#model = ...

def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [29]:
# Function to predict the duration based on the trip pickup time
# But uses dummy prediction method
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

In [None]:
# Predict it
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch)\
    .toDF() \
    .drop('Index')

In [None]:
# And show the predicted duration
df_predicts.select('predicted_duration').show()

+------------------+
|predicted_duration|
+------------------+
|3.9000000000000004|
|               4.9|
|              13.5|
|               4.0|
|             11.65|
|13.100000000000001|
|5.6499999999999995|
| 6.800000000000001|
|             55.75|
|               8.9|
|               5.0|
|             13.75|
|               5.5|
|             19.05|
|              9.25|
|              45.7|
|               5.2|
| 5.699999999999999|
|              5.75|
|4.6000000000000005|
+------------------+
only showing top 20 rows

