In [38]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

In [2]:
spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/29 21:27:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.parquet('/home/ezzaldin/data/taxi_tripdata/all_data_part.parquet')
df.show(10)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2022-05-01 20:57:59|2022-05-01 21:15:57|                 N|       1.0|         130|         179|            1.0|         10.1|       28.5|  0.5|    0.5|      5.96|         0.0|       

                                                                                

In [17]:
green_df = df.filter(df['service_type'] == 'green')\
             .select('pickup_datetime', 'PULocationID', 'total_amount')
green_df.show(5)

+-------------------+------------+------------+
|    pickup_datetime|PULocationID|total_amount|
+-------------------+------------+------------+
|2022-05-01 20:57:59|         130|       35.76|
|2022-05-16 09:54:51|          97|        20.0|
|2022-03-16 14:54:09|          74|        11.8|
|2022-03-23 18:12:08|          95|         8.3|
|2022-05-24 08:56:50|          41|       24.55|
+-------------------+------------+------------+
only showing top 5 rows



In [8]:
df.createOrReplaceTempView('df_view')

In [13]:
agg_green_df = spark.sql(
"""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    df_view
WHERE
    pickup_datetime >= '2020-01-01 00:00:00' AND
    service_type = 'green'
GROUP BY
    1, 2
"""
)
green_df.show(5)



+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2022-03-14 11:00:00|  75|440.38000000000005|            25|
|2022-05-12 10:00:00|  75|381.19999999999993|            20|
|2022-05-09 19:00:00|  92|123.19999999999999|             4|
|2022-03-02 09:00:00|  43|            109.55|             5|
|2022-05-10 18:00:00|  74|            561.62|            30|
+-------------------+----+------------------+--------------+
only showing top 5 rows



                                                                                

In [15]:
agg_green_rdd = agg_green_df.rdd
agg_green_rdd .take(10)

[Row(hour=datetime.datetime(2022, 3, 14, 11, 0), zone=75, amount=440.38000000000005, number_records=25),
 Row(hour=datetime.datetime(2022, 5, 12, 10, 0), zone=75, amount=381.19999999999993, number_records=20),
 Row(hour=datetime.datetime(2022, 5, 9, 19, 0), zone=92, amount=123.19999999999999, number_records=4),
 Row(hour=datetime.datetime(2022, 3, 2, 9, 0), zone=43, amount=109.55, number_records=5),
 Row(hour=datetime.datetime(2022, 5, 10, 18, 0), zone=74, amount=561.62, number_records=30),
 Row(hour=datetime.datetime(2022, 3, 2, 8, 0), zone=40, amount=34.05, number_records=1),
 Row(hour=datetime.datetime(2022, 3, 21, 20, 0), zone=7, amount=37.31, number_records=1),
 Row(hour=datetime.datetime(2022, 3, 14, 19, 0), zone=97, amount=59.02, number_records=4),
 Row(hour=datetime.datetime(2022, 5, 10, 10, 0), zone=244, amount=205.22, number_records=8),
 Row(hour=datetime.datetime(2022, 3, 28, 11, 0), zone=217, amount=48.5, number_records=1)]

In [18]:
green_rdd = green_df.rdd
green_rdd.take(5)

[Row(pickup_datetime=datetime.datetime(2022, 5, 1, 20, 57, 59), PULocationID=130, total_amount=35.76),
 Row(pickup_datetime=datetime.datetime(2022, 5, 16, 9, 54, 51), PULocationID=97, total_amount=20.0),
 Row(pickup_datetime=datetime.datetime(2022, 3, 16, 14, 54, 9), PULocationID=74, total_amount=11.8),
 Row(pickup_datetime=datetime.datetime(2022, 3, 23, 18, 12, 8), PULocationID=95, total_amount=8.3),
 Row(pickup_datetime=datetime.datetime(2022, 5, 24, 8, 56, 50), PULocationID=41, total_amount=24.55)]

In [19]:
from datetime import datetime

In [20]:
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.pickup_datetime >= start

In [27]:
def mapping(row):
    hour = row.pickup_datetime.replace(minute = 0, second = 0, microsecond = 0)
    zone = row.PULocationID
    amount = row.total_amount
    count = 1
    key = (hour, zone)
    val = (amount, count)
    return (key, val)

In [29]:
def reduction(l, r):
    l_amt, l_cnt = l
    r_amt, r_cnt = r
    o_amt = l_amt + r_amt
    o_cnt = l_cnt + r_cnt
    o_val = (o_amt, o_cnt)
    return o_val

In [31]:
# apply map-reduce
green_rdd.filter(filter_outliers)\
         .map(mapping)\
         .reduceByKey(reduction)\
         .take(5)

                                                                                

[((datetime.datetime(2022, 5, 26, 9, 0), 129), (23.82, 2)),
 ((datetime.datetime(2022, 3, 18, 11, 0), 42), (141.52, 8)),
 ((datetime.datetime(2022, 3, 15, 8, 0), 75), (287.21000000000004, 17)),
 ((datetime.datetime(2022, 3, 19, 14, 0), 82), (97.92, 6)),
 ((datetime.datetime(2022, 3, 24, 14, 0), 116), (76.16000000000001, 3))]

In [44]:
duration_rdd = df.repartition(20)\
                 .select('VendorID', 'pickup_datetime',
                         'PULocationID', 'DOLocationID', 'trip_distance')\
                 .rdd



In [45]:
duration_rdd.take(5)

                                                                                

[Row(VendorID=2, pickup_datetime=datetime.datetime(2022, 8, 11, 23, 41, 22), PULocationID=144, DOLocationID=24, trip_distance=6.48),
 Row(VendorID=2, pickup_datetime=datetime.datetime(2022, 5, 3, 6, 54, 39), PULocationID=141, DOLocationID=144, trip_distance=6.02),
 Row(VendorID=2, pickup_datetime=datetime.datetime(2022, 4, 29, 15, 25, 13), PULocationID=239, DOLocationID=238, trip_distance=0.98),
 Row(VendorID=1, pickup_datetime=datetime.datetime(2022, 8, 15, 12, 47, 34), PULocationID=161, DOLocationID=237, trip_distance=1.0),
 Row(VendorID=2, pickup_datetime=datetime.datetime(2022, 8, 21, 20, 11, 18), PULocationID=132, DOLocationID=90, trip_distance=17.41)]

In [50]:
def naive_model(distance):
    return distance * 3

In [51]:
def apply_ML_in_batch(partition):
    df = pd.DataFrame(partition, columns = ['VendorID', 'pickup_datetime','PULocationID',
                                            'DOLocationID', 'trip_distance'])
    df['predicted_duration'] = df['trip_distance'].apply(naive_model)
    for row in df.itertuples():
        yield row

In [54]:
duration_rdd.mapPartitions(apply_ML_in_batch).toDF().show(5)

[Stage 50:>                                                         (0 + 1) / 1]

+-----+--------+---------------+------------+------------+-------------+------------------+
|Index|VendorID|pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+-----+--------+---------------+------------+------------+-------------+------------------+
|    0|       2|             {}|         144|          24|         6.48|             19.44|
|    1|       2|             {}|         141|         144|         6.02|             18.06|
|    2|       2|             {}|         239|         238|         0.98|              2.94|
|    3|       1|             {}|         161|         237|          1.0|               3.0|
|    4|       2|             {}|         132|          90|        17.41|52.230000000000004|
+-----+--------+---------------+------------+------------+-------------+------------------+
only showing top 5 rows



                                                                                