## Pyspark Intro

Import packages

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

Set connection type and location

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/24 18:59:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Get data

In [22]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

--2023-02-24 18:07:17--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230224%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230224T180718Z&X-Amz-Expires=300&X-Amz-Signature=4d642d340d4cac5b5a18cb7228b7a2b9bf78024562f70ed7fbf0896a71e4d0a0&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-02-24 18:07:18--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8

Unpack .gz

In [23]:
!gzip -d fhvhv_tripdata_2021-01.csv.gz

Check record count

In [25]:
!wc -l fhvhv_tripdata_2021-01.csv

11908469 fhvhv_tripdata_2021-01.csv


Set dataframe

In [30]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

In [31]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:42:51', dropoff_datetime='2021-01-01 00:45:50', PULocationID='142', DOLocationID='143', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:48:14', dropoff_datetime='2021-01-01 01:08:42', PULocationID='143', DOLocationID='78', SR_Flag=None)]

Note all of the strings. Unlike Pandas spark does not infer types.

Create csv with 1001 rows to determine data types with pandas

In [34]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [35]:
df_pandas = pd.read_csv('head.csv')

In [36]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

pickup_datetime and dropoff_datetime should be timestamps, otherwise this is good.

Create spark dataframe from pandas dataframe and show schema. We will use this structure to create the desired schema.

In [39]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

Create schema.

In [41]:
schema = types.StructType([
	types.StructField('hvfhs_license_num', types.StringType(), True), 
	types.StructField('dispatching_base_num', types.StringType(), True), 
	types.StructField('pickup_datetime', types.TimestampType(), True), 
	types.StructField('dropoff_datetime', types.TimestampType(), True), 
	types.StructField('PULocationID', types.IntegerType(), True), 
	types.StructField('DOLocationID', types.IntegerType(), True), 
	types.StructField('SR_Flag', types.StringType(), True)
])

Set dataframe with schema.

In [42]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [44]:
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

Now we have 1 large file which is inefficient due to the MPP in Spark. We must repartition. Note that repartition is a lazy command.

In [47]:
df.repartition(24)

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string]

Turn the 6 partitions into 24 and write that to parquest.

In [48]:
df.write.parquet('fhvhv/2021/01/')

                                                                                

## Dataframes

### Actions vs Transformation

Transformations - lazy:
* selecting columns 
* filtering
* joins
* group by

Actions - eager:
* show, take, head
* write

Create dataframe with parquet files from before.

In [4]:
df = spark.read.parquet('fhvhv/2021/01/')

                                                                                

In [7]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



Note parquet files know and use schemas which is one reason they are smaller.

Selecting specific columns:

In [8]:
df.select('pickup_datetime','dropoff_datetime','PULocationID','DOLocationID')

DataFrame[pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int]

Filtering:

In [9]:
df.select('pickup_datetime','dropoff_datetime','PULocationID','DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003') \
    .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-18 09:26:34|2021-01-18 09:45:13|         144|         181|
|2021-01-18 09:01:10|2021-01-18 09:19:18|         145|         138|
|2021-01-18 09:23:03|2021-01-18 09:28:40|         138|          70|
|2021-01-18 09:34:22|2021-01-18 09:40:02|         138|         138|
|2021-01-18 09:03:11|2021-01-18 09:10:50|          92|         171|
|2021-01-18 09:39:57|2021-01-18 09:53:46|          53|         213|
|2021-01-18 09:50:14|2021-01-18 10:02:09|          42|         119|
|2021-01-18 09:59:00|2021-01-18 10:03:01|         153|         220|
|2021-01-18 09:14:15|2021-01-18 09:34:47|           7|         237|
|2021-01-18 09:05:00|2021-01-18 09:22:26|         119|         166|
|2021-01-18 09:02:52|2021-01-18 09:20:40|         256|          17|
|2021-01-18 09:30:05|2021-01-18 09:39:59|       

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

### Functions and UDFs

Why not just use SQL as it is simpler than the spark syntax above? 
Spark is more flexible and can allow for user defined functions (udf).

In [11]:
from pyspark.sql import functions as F 

In [14]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date','dropoff_date','PULocationID','DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-18|  2021-01-18|         144|         181|
| 2021-01-18|  2021-01-18|         145|         138|
| 2021-01-18|  2021-01-18|         138|          70|
| 2021-01-18|  2021-01-18|         138|         138|
| 2021-01-18|  2021-01-18|          92|         171|
| 2021-01-18|  2021-01-18|          53|         213|
| 2021-01-18|  2021-01-18|          42|         119|
| 2021-01-18|  2021-01-18|         153|         220|
| 2021-01-18|  2021-01-18|           7|         237|
| 2021-01-18|  2021-01-18|         132|         263|
| 2021-01-18|  2021-01-18|         119|         166|
| 2021-01-18|  2021-01-18|         256|          17|
| 2021-01-18|  2021-01-18|         256|         112|
| 2021-01-18|  2021-01-18|         112|         113|
| 2021-01-18|  2021-01-18|          33|         265|
| 2021-01-18|  2021-01-18|         112|       

You can also create custom complex functions that can be stored in a codebase and appropriately tested.

In [16]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [18]:
crazy_stuff('B02884')

's/b44'

Turn the python function into a udf to be used in spark

In [20]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [22]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id','pickup_date','dropoff_date','PULocationID','DOLocationID') \
    .show()

[Stage 4:>                                                          (0 + 1) / 1]

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  s/b3d| 2021-01-18|  2021-01-18|         144|         181|
|  e/b3b| 2021-01-18|  2021-01-18|         145|         138|
|  e/b3b| 2021-01-18|  2021-01-18|         138|          70|
|  e/b3b| 2021-01-18|  2021-01-18|         138|         138|
|  s/b44| 2021-01-18|  2021-01-18|          92|         171|
|  s/b44| 2021-01-18|  2021-01-18|          53|         213|
|  e/acc| 2021-01-18|  2021-01-18|          42|         119|
|  s/b44| 2021-01-18|  2021-01-18|         153|         220|
|  e/acc| 2021-01-18|  2021-01-18|           7|         237|
|  e/9ce| 2021-01-18|  2021-01-18|         132|         263|
|  e/b38| 2021-01-18|  2021-01-18|         119|         166|
|  s/b44| 2021-01-18|  2021-01-18|         256|          17|
|  s/b44| 2021-01-18|  2021-01-18|         256|         112|
|  s/b44| 2021-01-18|  2

                                                                                

## Spark SQL

### Setup

Make sure you have completed the pre-reqs  https://github.com/TylerJSimpson/data_engineering_zoomcamp/blob/main/week_5/README.MD

Load the green and yellow data to dataframes:

In [26]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [28]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [29]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [30]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



Combine the data:

Note the differences in schema.

In [44]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'pickup_dropoffe')

In [45]:
df_green.columns

['VendorID',
 'pickup_datetime',
 'pickup_dropoffe',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [46]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'pickup_dropoffe')

In [47]:
df_yellow.columns

['VendorID',
 'pickup_datetime',
 'pickup_dropoffe',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [48]:
common_columns = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [52]:
common_columns

['VendorID',
 'pickup_datetime',
 'pickup_dropoffe',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

Add service_type column so we know which dataset each record comes from

In [54]:
from pyspark.sql import functions as F

In [57]:
df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

In [58]:
df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

Combine the data:

In [59]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [60]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

### Spark SQL

First register the dataframe as a table:

In [63]:
df_trips_data.registerTempTable('trips_data')



Query the table with Spark SQL:

In [66]:
spark.sql("""
SELECT  service_type,
        count(service_type) AS count
FROM    trips_data 
GROUP   BY service_type
;
""").show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

In [67]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 
    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,
    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [68]:
df_result.show()



+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         250|2020-

                                                                                

Saving outputs:

In [69]:
df_result.write.parquet('data/report/revenue')

                                                                                

This creates over 200 files, you can specify 1 (must overwrite if using the same path):

In [70]:
df_result.coalesce(1).write.parquet('data/report/revenue', mode='overwrite')

                                                                                