In [3]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/03 15:22:50 WARN Utils: Your hostname, Hema resolves to a loopback address: 127.0.1.1; using 172.30.190.161 instead (on interface eth0)
24/03/03 15:22:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/03 15:22:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [None]:
'''
# Try to create the following SQL using RDD
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2

'''

In [5]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [6]:
from datetime import datetime

In [7]:
# Will return all 10 records
rows = rdd.filter(lambda row: True).take(10)
row = rows[0]

                                                                                

In [9]:
rows

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, total_amount=16.0),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 30, 20, 30), PULocationID=238, total_amount=28.51),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 18, 13), PULocationID=82, total_amount=22.11),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 18, 12, 24), PULocationID=42, total_amount=8.8),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 11, 53), PULocationID=82, total_amount=19.02),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 15, 35), PULocationID=7, total_amount=47.42),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 2, 11, 32), PULocationID=66, total_amount=12.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 9, 40, 17), PULocationID=42, total_amount=7.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 18, 21, 41, 43), PULocationID=129, total_amount=5.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 11, 1

In [11]:
# Will return 0 records
rows = rdd.filter(lambda row: False).take(10)
rows

                                                                                

[]

In [12]:
# Apply a function to a row to filter based on date
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [13]:
rows = rdd.filter(filter_outliers).take(10)
row = rows[0]

In [14]:
row

Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, total_amount=16.0)

In [15]:
# Create a group by function based on hour, LocationID as the key and amount and count as the value. The key and value are returned as tuples.
def prepare_for_grouping(row): 
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

In [17]:
# The values are added using this logic
# v1 + v2 = v1* , v1* + v3 = v2*, v2* + v4 - v3* and so on so that the values are aggregated 
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [18]:

from collections import namedtuple

In [20]:
def unwrap(row):
    return (row[0][0], 
           row[0][1],
           row[1][0],
           row[1][1]
    )

In [22]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .take(10)

                                                                                

In [24]:
df_result

[(datetime.datetime(2020, 1, 27, 23, 0), 260, 154.26, 15),
 (datetime.datetime(2020, 1, 2, 11, 0), 66, 204.01999999999998, 10),
 (datetime.datetime(2020, 1, 4, 9, 0), 42, 426.55000000000024, 36),
 (datetime.datetime(2020, 1, 11, 18, 0), 41, 476.0100000000002, 45),
 (datetime.datetime(2020, 1, 22, 12, 0), 244, 352.26000000000016, 21),
 (datetime.datetime(2020, 1, 22, 15, 0), 177, 56.28, 3),
 (datetime.datetime(2020, 1, 17, 14, 0), 7, 350.2100000000001, 34),
 (datetime.datetime(2020, 1, 12, 12, 0), 200, 83.63, 2),
 (datetime.datetime(2020, 1, 17, 17, 0), 7, 597.0100000000001, 49),
 (datetime.datetime(2020, 1, 29, 13, 0), 74, 686.4699999999997, 61)]

In [26]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() 

                                                                                

In [27]:
df_result

DataFrame[_1: timestamp, _2: bigint, _3: double, _4: bigint]

In [36]:
# This time we will give names to columns
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [37]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [38]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF() 

                                                                                

In [39]:
df_result

DataFrame[hour: timestamp, zone: bigint, revenue: double, count: bigint]

In [40]:
df_result.columns

['hour', 'zone', 'revenue', 'count']

In [41]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

In [32]:
from pyspark.sql import types

In [42]:
# After adjusting the schema based on data
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

In [43]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

In [44]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', IntegerType(), True), StructField('revenue', DoubleType(), True), StructField('count', IntegerType(), True)])

In [45]:
df_result.write.parquet('tmp/green-revenue')

                                                                                

In [47]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

In [48]:
def apply_model_in_batch(partition):
    cnt = 0
    for row in partition:
        cnt = cnt +1
    return [cnt]

In [49]:
rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[1164928, 456935, 425872, 256782]

In [50]:
import pandas as pd

In [51]:
rows = duration_rdd.take(10)

In [52]:
rows

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 23, 23, 13), PULocationID=260, DOLocationID=223, trip_distance=2.98),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 30, 20, 30), PULocationID=238, DOLocationID=47, trip_distance=7.65),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 22, 18, 13), PULocationID=82, DOLocationID=95, trip_distance=1.83),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 18, 12, 24), PULocationID=42, DOLocationID=244, trip_distance=1.65),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 27, 11, 53), PULocationID=82, DOLocationID=197, trip_distance=6.38),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 15, 35), PULocationID=7, DOLocationID=174, trip_distance=10.4),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 2, 11, 32), PULocationID=66, DOLocationID=217, trip_distance=2.95),
 Row(VendorID=1, lpep_pickup_datetime=datetime.d

In [53]:
df = pd.DataFrame(rows, columns=columns) # if we don't assign columns names it will take it as 1,2,3,4 ...

In [54]:
df.columns

Index(['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID',
       'trip_distance'],
      dtype='object')

In [55]:
def apply_model_in_batch(rows):
    cnt = 0
    df = pd.DataFrame(rows, columns=columns)
    cnt = len(df)
    return [cnt]

In [57]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[1164928, 456935, 425872, 256782]

In [59]:
rows = duration_rdd.take(10)

In [60]:
df = pd.DataFrame(rows) # convert rdd to DF

In [61]:
df.itertuples()

<map at 0x7f7d7d85ffd0>

In [62]:
list(df.itertuples())

[Pandas(Index=0, _1=2.0, _2=Timestamp('2020-01-27 23:23:13'), _3=260, _4=223, _5=2.98),
 Pandas(Index=1, _1=nan, _2=Timestamp('2020-01-30 20:30:00'), _3=238, _4=47, _5=7.65),
 Pandas(Index=2, _1=nan, _2=Timestamp('2020-01-22 18:13:00'), _3=82, _4=95, _5=1.83),
 Pandas(Index=3, _1=2.0, _2=Timestamp('2020-01-04 18:12:24'), _3=42, _4=244, _5=1.65),
 Pandas(Index=4, _1=2.0, _2=Timestamp('2020-01-27 11:53:00'), _3=82, _4=197, _5=6.38),
 Pandas(Index=5, _1=2.0, _2=Timestamp('2020-01-29 15:35:00'), _3=7, _4=174, _5=10.4),
 Pandas(Index=6, _1=nan, _2=Timestamp('2020-01-02 11:32:00'), _3=66, _4=217, _5=2.95),
 Pandas(Index=7, _1=1.0, _2=Timestamp('2020-01-04 09:40:17'), _3=42, _4=41, _5=1.0),
 Pandas(Index=8, _1=1.0, _2=Timestamp('2020-01-18 21:41:43'), _3=129, _4=129, _5=0.4),
 Pandas(Index=9, _1=2.0, _2=Timestamp('2020-01-11 18:43:11'), _3=41, _4=41, _5=0.91)]

In [70]:
def infinite_seq():
    seq = 0
    while True:
        yield seq
        seq = seq + 1
        if seq > 15:
            break


In [71]:
seq = infinite_seq()

In [73]:
next(seq)

1

In [74]:
list(seq) 

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

In [75]:
# In case if we have an ML model, we could use it to predict a dependent variable. But
# since there is no model here, we will just hard code it to some value using dummy logic
#model = ...

def model_predict(df):
#     y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

In [76]:
def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

In [77]:
df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch)\
    .toDF() \
    .drop('Index')

                                                                                

In [79]:
df_predicts.show()

[Stage 22:>                                                         (0 + 1) / 1]

+--------+--------------------+------------+------------+-------------+------------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+--------+--------------------+------------+------------+-------------+------------------+
|     2.0|                  {}|         260|         223|         2.98|              14.9|
|     NaN|                  {}|         238|          47|         7.65|             38.25|
|     NaN|                  {}|          82|          95|         1.83|              9.15|
|     2.0|                  {}|          42|         244|         1.65|              8.25|
|     2.0|                  {}|          82|         197|         6.38|              31.9|
|     2.0|                  {}|           7|         174|         10.4|              52.0|
|     NaN|                  {}|          66|         217|         2.95|             14.75|
|     1.0|                  {}|          42|          41|          1.0|               5.0|

                                                                                

In [78]:
df_predicts.select('predicted_duration').show()

[Stage 21:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
|              14.9|
|             38.25|
|              9.15|
|              8.25|
|              31.9|
|              52.0|
|             14.75|
|               5.0|
|               2.0|
|              4.55|
|              10.1|
|              18.0|
|3.3000000000000003|
|               9.0|
|              13.5|
|              12.9|
|24.950000000000003|
|22.450000000000003|
|10.149999999999999|
|              18.5|
+------------------+
only showing top 20 rows



                                                                                