## Spark intro

In [1]:
import os
import sys

# have my file path set to python not anaconda
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.path.dirname(sys.executable)

'C:\\Users\\Abdulkadir\\anaconda3'

In [2]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

In [3]:
spark = SparkSession.builder \
.master("local[*]") \
.appName('DE-ZOOM').getOrCreate()

Spark jobs can be seen in port 4040

In [4]:
# get the high volume data set 
!curl -sS https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-01.parquet > data.parquet

In [5]:
!wc -l data.parquet # works on linux (gitbash)

'wc' is not recognized as an internal or external command,
operable program or batch file.


In [6]:
df = spark.read.parquet('data.parquet', header=True, inferSchema=True)

In [7]:
df.show(5)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [8]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

### manually changing schema

In [9]:
df.limit(10).toPandas().dtypes

hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                    float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

In [10]:
spark.createDataFrame(df.limit(10).toPandas()).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampType(), True), StructField('on_scene_datetime', TimestampType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag',

In [11]:
from pyspark.sql import types
from pyspark.sql.types import StructType, IntegerType, StringType, StructField, TimestampType, LongType, DoubleType

In [12]:
schema = types.StructType([
        types.StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True),
        types.StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampType(), True),
        types.StructField('on_scene_datetime', TimestampType(), True), StructField('pickup_datetime', TimestampType(), True),
        types.StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True),
        types.StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True),
        types.StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True),
        types.StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True),
        types.StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True),
        types.StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True),
        types.StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag', StringType(), True),
        types.StructField('shared_match_flag', StringType(), True), StructField('access_a_ride_flag', StringType(), True),
        types.StructField('wav_request_flag', StringType(), True),
        types.StructField('wav_match_flag', StringType(), True)
])

### Partitioning

In [13]:
# take a large fiel and split it into multiple partitions
df_repartitioned = df.repartition(50)

In [14]:
df_repartitioned

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, originating_base_num: string, request_datetime: timestamp, on_scene_datetime: timestamp, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: bigint, DOLocationID: bigint, trip_miles: double, trip_time: bigint, base_passenger_fare: double, tolls: double, bcf: double, sales_tax: double, congestion_surcharge: double, airport_fee: double, tips: double, driver_pay: double, shared_request_flag: string, shared_match_flag: string, access_a_ride_flag: string, wav_request_flag: string, wav_match_flag: string]

In [15]:
# creates a new folder
df_repartitioned.write.parquet('output_partitioned/', mode='overwrite')

In [16]:
# reading partitioned data
df = spark.read.parquet('output_partitioned/')

In [17]:
# shape of df
df.count(), len(df.columns)

(14751591, 24)

In [18]:
df.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'originating_base_num',
 'request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'bcf',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag']

In [19]:
df.select('hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'trip_time').show()

+-----------------+-------------------+-------------------+----------+---------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|trip_miles|trip_time|
+-----------------+-------------------+-------------------+----------+---------+
|           HV0003|2022-01-02 14:29:56|2022-01-02 14:50:35|     12.35|     1239|
|           HV0003|2022-01-17 20:44:30|2022-01-17 20:53:03|      1.47|      514|
|           HV0003|2022-01-26 09:16:13|2022-01-26 09:23:22|      1.08|      429|
|           HV0005|2022-01-19 13:18:39|2022-01-19 13:45:24|     3.969|     1605|
|           HV0005|2022-01-17 13:20:19|2022-01-17 13:49:10|     8.987|     1731|
|           HV0003|2022-01-24 21:40:23|2022-01-24 21:53:22|      1.87|      779|
|           HV0003|2022-01-11 09:02:59|2022-01-11 09:19:16|       2.7|      977|
|           HV0005|2022-01-11 11:46:30|2022-01-11 12:04:54|     8.068|     1104|
|           HV0003|2022-01-10 08:25:14|2022-01-10 08:35:34|      1.28|      620|
|           HV0003|2022-01-0

In [20]:
df[['hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'trip_time']].show()

+-----------------+-------------------+-------------------+----------+---------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|trip_miles|trip_time|
+-----------------+-------------------+-------------------+----------+---------+
|           HV0003|2022-01-02 14:29:56|2022-01-02 14:50:35|     12.35|     1239|
|           HV0003|2022-01-17 20:44:30|2022-01-17 20:53:03|      1.47|      514|
|           HV0003|2022-01-26 09:16:13|2022-01-26 09:23:22|      1.08|      429|
|           HV0005|2022-01-19 13:18:39|2022-01-19 13:45:24|     3.969|     1605|
|           HV0005|2022-01-17 13:20:19|2022-01-17 13:49:10|     8.987|     1731|
|           HV0003|2022-01-24 21:40:23|2022-01-24 21:53:22|      1.87|      779|
|           HV0003|2022-01-11 09:02:59|2022-01-11 09:19:16|       2.7|      977|
|           HV0005|2022-01-11 11:46:30|2022-01-11 12:04:54|     8.068|     1104|
|           HV0003|2022-01-10 08:25:14|2022-01-10 08:35:34|      1.28|      620|
|           HV0003|2022-01-0

In [21]:
# where statement
df[df['trip_miles'] > 10]['hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'trip_time'].show()

+-----------------+-------------------+-------------------+----------+---------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|trip_miles|trip_time|
+-----------------+-------------------+-------------------+----------+---------+
|           HV0003|2022-01-02 14:29:56|2022-01-02 14:50:35|     12.35|     1239|
|           HV0003|2022-01-22 07:17:21|2022-01-22 10:06:21|     65.58|    10140|
|           HV0003|2022-01-23 09:38:40|2022-01-23 10:01:28|     13.02|     1368|
|           HV0005|2022-01-14 11:14:30|2022-01-14 11:52:54|    14.021|     2304|
|           HV0005|2022-01-15 19:22:02|2022-01-15 19:52:02|    11.574|     1800|
|           HV0003|2022-01-21 08:02:12|2022-01-21 08:31:15|     10.18|     1743|
|           HV0003|2022-01-13 17:53:22|2022-01-13 18:49:14|     15.65|     3352|
|           HV0003|2022-01-27 06:25:41|2022-01-27 06:58:25|     12.49|     1964|
|           HV0003|2022-01-05 10:45:08|2022-01-05 11:11:52|     10.77|     1605|
|           HV0003|2022-01-0

#### Why pyspark? Spark has user defined functions. There is a list of predefined functions, but you can also use your own functions like .apply() or .map() in pandas

In [22]:
from pyspark.sql import functions as F

In [23]:
df['hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'trip_time'] \
.withColumn('pickup_day', F.dayofweek('pickup_datetime')).show()

+-----------------+-------------------+-------------------+----------+---------+----------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|trip_miles|trip_time|pickup_day|
+-----------------+-------------------+-------------------+----------+---------+----------+
|           HV0003|2022-01-02 14:29:56|2022-01-02 14:50:35|     12.35|     1239|         1|
|           HV0003|2022-01-17 20:44:30|2022-01-17 20:53:03|      1.47|      514|         2|
|           HV0003|2022-01-26 09:16:13|2022-01-26 09:23:22|      1.08|      429|         4|
|           HV0005|2022-01-19 13:18:39|2022-01-19 13:45:24|     3.969|     1605|         4|
|           HV0005|2022-01-17 13:20:19|2022-01-17 13:49:10|     8.987|     1731|         2|
|           HV0003|2022-01-24 21:40:23|2022-01-24 21:53:22|      1.87|      779|         2|
|           HV0003|2022-01-11 09:02:59|2022-01-11 09:19:16|       2.7|      977|         3|
|           HV0005|2022-01-11 11:46:30|2022-01-11 12:04:54|     8.068|     1104|

In [24]:
def divisible(base_num):
    num = int(base_num[2:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [25]:
divisible('HV0003')

'a/003'

In [26]:
my_udf = F.udf(divisible)

In [27]:
df['hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 'trip_miles', 'trip_time'] \
.withColumn('base_id', my_udf(df['hvfhs_license_num'])).show()

+-----------------+-------------------+-------------------+----------+---------+-------+
|hvfhs_license_num|    pickup_datetime|   dropoff_datetime|trip_miles|trip_time|base_id|
+-----------------+-------------------+-------------------+----------+---------+-------+
|           HV0003|2022-01-02 14:29:56|2022-01-02 14:50:35|     12.35|     1239|  a/003|
|           HV0003|2022-01-17 20:44:30|2022-01-17 20:53:03|      1.47|      514|  a/003|
|           HV0003|2022-01-26 09:16:13|2022-01-26 09:23:22|      1.08|      429|  a/003|
|           HV0005|2022-01-19 13:18:39|2022-01-19 13:45:24|     3.969|     1605|  e/005|
|           HV0005|2022-01-17 13:20:19|2022-01-17 13:49:10|     8.987|     1731|  e/005|
|           HV0003|2022-01-24 21:40:23|2022-01-24 21:53:22|      1.87|      779|  a/003|
|           HV0003|2022-01-11 09:02:59|2022-01-11 09:19:16|       2.7|      977|  a/003|
|           HV0005|2022-01-11 11:46:30|2022-01-11 12:04:54|     8.068|     1104|  e/005|
|           HV0003|20

### Yellow and green taxi data

In [28]:
# stars indicate all files in that folder
df_yellow = spark.read.parquet('data/raw/yellow/*/*', header=True, inferSchema=True)
df_green = spark.read.parquet('data/raw/green/*/*', header=True, inferSchema=True)

In [29]:
df_yellow.printSchema(), df_yellow.printSchema() 

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (

(None, None)

In [30]:
print(df_green.columns)

['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge']


In [31]:
print(df_yellow.columns)

['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']


Only take matching columns

In [32]:
set(df_green.columns) & set(df_yellow.columns) 

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

Also include pick up and drop off times as they have different names

In [33]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [34]:
set(df_green.columns) & set(df_yellow.columns) 

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

Preserve order

In [35]:
df_green.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [36]:
common_columns = []
for col in df_green.columns:
    if col in df_yellow.columns:
        common_columns.append(col)

In [37]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

When joining the data together, need to identify whether green or yellow

In [38]:
df_green = df_green[common_columns]
df_green = df_green.withColumn('service_type', F.lit('green'))

In [39]:
df_yellow = df_yellow[common_columns]
df_yellow = df_yellow.withColumn('service_type', F.lit('yellow'))

Combine

In [40]:
df_trips_data = df_green.unionAll(df_yellow)

In [41]:
df_trips_data.groupBy('service_type').count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2802931|
|      yellow|55553400|
+------------+--------+



Using SQL queries

In [42]:
df_trips_data.createOrReplaceTempView('trips_data')

In [43]:
spark.sql("""
    SELECT 
        service_type,
        COUNT(service_type)
    FROM 
        trips_data
    GROUP BY
        service_type
""").show()

+------------+-------------------+
|service_type|count(service_type)|
+------------+-------------------+
|       green|            2802931|
|      yellow|           55553400|
+------------+-------------------+



In [44]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 
    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,
    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

Save file

In [45]:
# coalesce writes it to one file
df_result.coalesce(1) \
    .write.parquet('data/report/revenue/', mode='overwrite')

### GroupBy

In [46]:
df_green.createOrReplaceTempView('green')

In [47]:
df_green_revenue = spark.sql("""
SELECT
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM 
    green
WHERE 
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
ORDER BY 
    1, 2
""")

In [48]:
df_green_revenue.show()

+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2020-01-01 00:00:00|   7| 769.7299999999997|            45|
|2020-01-01 00:00:00|  17|            195.03|             9|
|2020-01-01 00:00:00|  18|               7.8|             1|
|2020-01-01 00:00:00|  22|              15.8|             1|
|2020-01-01 00:00:00|  24|              87.6|             3|
|2020-01-01 00:00:00|  25|             531.0|            26|
|2020-01-01 00:00:00|  29|              61.3|             1|
|2020-01-01 00:00:00|  32| 68.94999999999999|             2|
|2020-01-01 00:00:00|  33|317.27000000000004|            11|
|2020-01-01 00:00:00|  35|            129.96|             5|
|2020-01-01 00:00:00|  36|295.34000000000003|            11|
|2020-01-01 00:00:00|  37|            175.67|             6|
|2020-01-01 00:00:00|  38| 98.78999999999999|             2|
|2020-01-01 00:00:00|  4

In [49]:
# repartition writes the data to 20 different files
df_green_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/green', mode='overwrite')

In [50]:
df_yellow.createOrReplaceTempView('yellow')

In [51]:
df_yellow_revenue = spark.sql("""
SELECT
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM 
    yellow
WHERE 
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
ORDER BY 
    1, 2
""")

In [52]:
df_yellow_revenue.show()

+-------------------+----+------------------+--------------+
|               hour|zone|            amount|number_records|
+-------------------+----+------------------+--------------+
|2020-01-01 00:00:00|   3|              25.0|             1|
|2020-01-01 00:00:00|   4|1004.2999999999995|            57|
|2020-01-01 00:00:00|   7| 455.1700000000002|            38|
|2020-01-01 00:00:00|  10|             42.41|             2|
|2020-01-01 00:00:00|  12|             107.0|             6|
|2020-01-01 00:00:00|  13|1214.7999999999993|            56|
|2020-01-01 00:00:00|  14|               8.8|             1|
|2020-01-01 00:00:00|  15|             34.09|             1|
|2020-01-01 00:00:00|  17|220.20999999999998|             8|
|2020-01-01 00:00:00|  18|               5.8|             1|
|2020-01-01 00:00:00|  24| 754.9499999999997|            45|
|2020-01-01 00:00:00|  25| 324.3500000000001|            16|
|2020-01-01 00:00:00|  32|              18.0|             1|
|2020-01-01 00:00:00|  3

In [53]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')

### Joins

In [54]:
df_green_revenue = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records') 

In [55]:
df_yellow_revenue = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records') 

In [56]:
df_join = df_green_revenue.join(df_yellow_revenue, on=['hour', 'zone'], how='outer') # left outer --> everything in green to match in yellow

In [57]:
df_join.show()

+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|  36|295.34000000000003|                  11|            109.17|                    3|
|2020-01-01 00:00:00|  68|              null|                null| 7825.070000000024|                  396|
|2020-01-01 00:00:00|  70|54.900000000000006|                   3|               9.3|                    1|
|2020-01-01 00:00:00|  73|              null|                null|              17.3|                    1|
|2020-01-01 00:00:00|  76|            143.78|                   4|             35.51|                    2|
|2020-01-01 00:00:00| 136|            111.68|                   2|             168.0|                    4|
|2020-01-01 00:00:00| 151|  

In [58]:
df_join.write.parquet('data/report/total', mode='overwrite')

### RDD

In [59]:
df_green.columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'service_type']

In [60]:
# implement this using rdd
"""
SELECT
    date_trunc('hour', pickup_datetime) AS hour,
    PULocationID AS zone,
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM 
    green
WHERE 
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
ORDER BY 
    1, 2
"""

"\nSELECT\n    date_trunc('hour', pickup_datetime) AS hour,\n    PULocationID AS zone,\n    SUM(total_amount) AS amount,\n    COUNT(1) AS number_records\nFROM \n    green\nWHERE \n    pickup_datetime >= '2020-01-01 00:00:00'\nGROUP BY\n    1, 2\nORDER BY \n    1, 2\n"

In [61]:
# SELECT
rdd_result = df_green['pickup_datetime', 'PULocationID', 'total_amount'].rdd

In [62]:
rdd_result.take(5)

[Row(pickup_datetime=datetime.datetime(2019, 12, 18, 15, 52, 30), PULocationID=264, total_amount=4.81),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 45, 58), PULocationID=66, total_amount=24.36),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 41, 38), PULocationID=181, total_amount=15.34),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 52, 46), PULocationID=129, total_amount=25.05),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 19, 57), PULocationID=210, total_amount=11.3)]

In [63]:
# WHERE
from datetime import datetime

start_date = datetime(2020, 1, 1)

rdd_result = rdd_result.filter(lambda row: row['pickup_datetime'] >= start_date)

In [64]:
rdd_result.take(5)

[Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 45, 58), PULocationID=66, total_amount=24.36),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 41, 38), PULocationID=181, total_amount=15.34),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 52, 46), PULocationID=129, total_amount=25.05),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 19, 57), PULocationID=210, total_amount=11.3),
 Row(pickup_datetime=datetime.datetime(2020, 1, 1, 0, 52, 33), PULocationID=35, total_amount=14.8)]

In [65]:
def prepare_for_grouping(row):
    hour = row['pickup_datetime'].replace(minute=0, second=0, microsecond=0)
    zone = row['PULocationID']
    key = (hour, zone)
    
    amount = row['total_amount']
    count = 1
    value = (amount, count)
    
    return (key, value)

In [66]:
# map
rdd_result = rdd_result.map(prepare_for_grouping)

In [67]:
rdd_result = rdd_result.toDF()

In [68]:
rdd_result.repartition(50) \
    .write.parquet('data/report/rdd/', mode='overwrite')

In [69]:
rdd_result = spark.read.parquet('data/report/rdd/', inferSchema=True, header=True)

In [70]:
rdd_result = rdd_result.rdd

In [71]:
# reduce by key
def calculate_revenue(left, right):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

In [72]:
rdd_result = rdd_result.reduceByKey(calculate_revenue)

In [73]:
from collections import namedtuple

In [74]:
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [75]:
def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1], 
        revenue=row[1][0], 
        count=row[1][1])

In [76]:
rdd_result.map(unwrap)

PythonRDD[200] at RDD at PythonRDD.scala:53

In [81]:
# rdd_result.toDF().show()
# job cannot handle capacity