In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df_green = spark.read.parquet('data/pq/green/*/*')



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/10 11:41:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

In [6]:
print(type(df_green.rdd.take(1)[0]))
# <class 'pyspark.sql.types.Row'>

df_green.rdd.take(3)

                                                                                

<class 'pyspark.sql.types.Row'>


[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), lpep_dropoff_datetime=datetime.datetime(2020, 1, 23, 13, 38, 16), store_and_fwd_flag='N', RatecodeID=1, PULocationID=74, DOLocationID=130, passenger_count=1, trip_distance=12.77, fare_amount=36.0, extra=0.0, mta_tax=0.5, tip_amount=2.05, tolls_amount=6.12, ehail_fee=None, improvement_surcharge=0.3, total_amount=44.97, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), lpep_dropoff_datetime=datetime.datetime(2020, 1, 20, 15, 46), store_and_fwd_flag=None, RatecodeID=None, PULocationID=67, DOLocationID=39, passenger_count=None, trip_distance=8.0, fare_amount=29.9, extra=2.75, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=33.45, payment_type=None, trip_type=None, congestion_surcharge=None),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41)

####  SQL query -> RDD

```sql
SELECT
    date_trunc('hour', lpep_pickup_datetime) AS hour,
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
```

In [7]:
# keep relevant only columns 

rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

rdd.take(5)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, total_amount=33.45),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, total_amount=12.74)]

In [8]:
# the first row.
rdd.filter(lambda row: True).take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97)]

In [9]:
# filters the while dataset and returns an empty list.
rdd.filter(lambda row: False).take(1)

                                                                                

[]

In [10]:
# This returns even numbers.
# See https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.RDD.filter.html
# rdd = sc.parallelize([1, 2, 3, 4, 5])
# rdd.filter(lambda x: x % 2 == 0).collect()


In [12]:
from datetime import datetime

start = datetime(year=2020, month=1, day=1)

rdd.filter(lambda row: row.lpep_pickup_datetime >= start).take(5)

# Better, because with lambda we gets messy quiet fast.
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

rdd.filter(filter_outliers).take(3)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, total_amount=44.97),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, total_amount=33.45),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, total_amount=8.3)]

In [13]:
def prepare_for_grouping(row):
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)

    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .take(5)

[((datetime.datetime(2020, 1, 23, 13, 0), 74), (44.97, 1)),
 ((datetime.datetime(2020, 1, 20, 15, 0), 67), (33.45, 1)),
 ((datetime.datetime(2020, 1, 15, 20, 0), 260), (8.3, 1)),
 ((datetime.datetime(2020, 1, 5, 16, 0), 82), (8.3, 1)),
 ((datetime.datetime(2020, 1, 29, 19, 0), 166), (12.74, 1))]

In [15]:
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value

    output_amount = left_amount + right_amount
    output_count = left_count + right_count

    return (output_amount, output_count)



                                                                                

In [16]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .take(5)

                                                                                

In [17]:
df_result

[((datetime.datetime(2020, 1, 20, 15, 0), 67), (79.5, 3)),
 ((datetime.datetime(2020, 1, 5, 16, 0), 82), (500.4700000000002, 33)),
 ((datetime.datetime(2020, 1, 16, 8, 0), 41), (736.1399999999996, 54)),
 ((datetime.datetime(2020, 1, 20, 15, 0), 75), (608.9999999999999, 47)),
 ((datetime.datetime(2020, 1, 17, 21, 0), 74), (594.87, 39))]

In [18]:
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

def unwrap(row):
    return RevenueRow(
        hour=row[0][0],
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

In [19]:
rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .take(5)

                                                                                

[RevenueRow(hour=datetime.datetime(2020, 1, 20, 15, 0), zone=67, revenue=79.5, count=3),
 RevenueRow(hour=datetime.datetime(2020, 1, 5, 16, 0), zone=82, revenue=500.4700000000002, count=33),
 RevenueRow(hour=datetime.datetime(2020, 1, 16, 8, 0), zone=41, revenue=736.1399999999996, count=54),
 RevenueRow(hour=datetime.datetime(2020, 1, 20, 15, 0), zone=75, revenue=608.9999999999999, count=47),
 RevenueRow(hour=datetime.datetime(2020, 1, 17, 21, 0), zone=74, revenue=594.87, count=39)]

In [20]:
from pyspark.sql import types

result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])



In [21]:
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema)

df_result.show(5)



+-------------------+----+-----------------+-----+
|               hour|zone|          revenue|count|
+-------------------+----+-----------------+-----+
|2020-01-20 15:00:00|  67|             79.5|    3|
|2020-01-05 16:00:00|  82|500.4700000000002|   33|
|2020-01-16 08:00:00|  41|736.1399999999996|   54|
|2020-01-20 15:00:00|  75|608.9999999999999|   47|
|2020-01-17 21:00:00|  74|           594.87|   39|
+-------------------+----+-----------------+-----+
only showing top 5 rows



                                                                                

In [22]:
df_result.printSchema()

root
 |-- hour: timestamp (nullable = true)
 |-- zone: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- count: integer (nullable = true)



In [23]:
df_result.write.parquet('tmp/green-revenue')

                                                                                

#### Spark RDD: mapPartitions

`mapPartitions()` returns a new RDD by applying a function to each partition of this RDD.

`mapPartitions()` similar to `map()`; the difference being, Spark `mapPartitions()` provides a facility to do heavy initializations (for example Database connection) once for each partition instead of doing it on every DataFrame row. This helps the performance of the job when you dealing with heavy-weighted initialization on larger datasets.


In [24]:
columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

duration_rdd = df_green \
    .select(columns) \
    .rdd

duration_rdd.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 10, 15), PULocationID=74, DOLocationID=130, trip_distance=12.77),
 Row(VendorID=None, lpep_pickup_datetime=datetime.datetime(2020, 1, 20, 15, 9), PULocationID=67, DOLocationID=39, trip_distance=8.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 15, 20, 23, 41), PULocationID=260, DOLocationID=157, trip_distance=1.27),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 5, 16, 32, 26), PULocationID=82, DOLocationID=83, trip_distance=1.25),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 19, 22, 42), PULocationID=166, DOLocationID=42, trip_distance=1.84)]

In [25]:
def apply_model_in_batch(partition):
    return [1]

rdd.mapPartitions(apply_model_in_batch).collect()


[1, 1, 1, 1]

In [26]:
def apply_model_in_batch(partition):
    cnt = 0

    for row in partition:
        cnt = cnt + 1

    return [cnt]

rdd.mapPartitions(apply_model_in_batch).collect()


                                                                                

[1141148, 438057, 432402, 292910]

In [27]:
def apply_model_in_batch(partition):
    cnt = 0

    for row in partition:
        cnt = cnt + 1

    return [cnt]

rdd.mapPartitions(apply_model_in_batch).collect()


                                                                                

[1141148, 438057, 432402, 292910]

In [29]:
import pandas as pd

# model = ...

def model_predict(df):
    # y_pred = model.predict(df)
    y_pred = df.trip_distance * 5
    return y_pred

def apply_model_in_batch(rows):
    df = pd.DataFrame(rows, columns=columns)
    predictions = model_predict(df)
    df['predicted_duration'] = predictions

    for row in df.itertuples():
        yield row

df_predicts = duration_rdd \
    .mapPartitions(apply_model_in_batch) \
    .toDF() \
    .drop('Index')

df_predicts.select('predicted_duration').show(10)

[Stage 29:>                                                         (0 + 1) / 1]

+------------------+
|predicted_duration|
+------------------+
|63.849999999999994|
|              40.0|
|              6.35|
|              6.25|
| 9.200000000000001|
|               3.8|
|16.599999999999998|
|             11.05|
|               4.5|
|              30.5|
+------------------+
only showing top 10 rows



                                                                                

In [30]:
rows = duration_rdd.take(5)
df = pd.DataFrame(rows, columns=columns)
list(df.itertuples())

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-23 13:10:15'), PULocationID=74, DOLocationID=130, trip_distance=12.77),
 Pandas(Index=1, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-20 15:09:00'), PULocationID=67, DOLocationID=39, trip_distance=8.0),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-15 20:23:41'), PULocationID=260, DOLocationID=157, trip_distance=1.27),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-05 16:32:26'), PULocationID=82, DOLocationID=83, trip_distance=1.25),
 Pandas(Index=4, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-29 19:22:42'), PULocationID=166, DOLocationID=42, trip_distance=1.84)]

In [31]:
def infinite_seq(flag: bool):
  i = 0
  while True:
      yield i
      i = i + 1

      if flag and i > 10:
          break

In [34]:
seqq = infinite_seq(False)

In [None]:
list(seqq)