In [15]:
from pathlib import Path

import pandas as pd

import pyspark
from pyspark.sql import SparkSession

#### Let's initialize spark (with all CPU --> *)

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/23 13:29:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/23 13:29:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Let's download the dataset and check the number of lines

#### Let's load the dataset into spark and try some basic commands 

In [3]:
df = spark.read \
    .option("header", "true") \
    .csv(str(Path('data','fhvhv_tripdata_2021-01.csv.gz')))

In [4]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   null|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   null|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   null|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   null|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   null|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

In [5]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:42:51', dropoff_datetime='2021-01-01 00:45:50', PULocationID='142', DOLocationID='143', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:48:14', dropoff_datetime='2021-01-01 01:08:42', PULocationID='143', DOLocationID='78', SR_Flag=None)]

#### Spark does't try to infer the types and use STRING as default type

In [7]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

#### Se we will add an option to infer the types

In [29]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema" , "true") \
    .csv(str(Path('data','fhvhv_tripdata_2021-01.csv.gz')))

                                                                                

In [31]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [32]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', IntegerType(), True)])

#### But we can also infer the columns types using a small subset of the dataset and Pandas

In [41]:
# Let's unzip the gzip file
!gunzip -k data/fhvhv_tripdata_2021-01.csv.gz

In [42]:
# Let's extract a subset of the whole (too large for pandas) CSV file
!head -n 10 data/fhvhv_tripdata_2021-01.csv > data/header.csv

In [43]:
# Let's delete the full extracted CSV file
!rm data/fhvhv_tripdata_2021-01.csv

In [44]:
df_pandas = pd.read_csv("data/header.csv")

In [45]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [47]:
# Let's load the pandas dataframe into spark and see the schema
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

#### Obviously the types are not correctly detected by Pandas so we will have to modify them...

In [57]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType

In [59]:
new_schema = StructType([
    StructField('hvfhs_license_num', StringType(), True), 
    StructField('dispatching_base_num', StringType(), True), 
    StructField('pickup_datetime', TimestampType(), True), 
    StructField('dropoff_datetime', TimestampType(), True), 
    StructField('PULocationID', IntegerType(), True), 
    StructField('DOLocationID', IntegerType(), True), 
    StructField('SR_Flag', DoubleType(), True)
])

In [60]:
df = spark.read \
    .option("header", "true") \
    .schema(new_schema) \
    .csv(str(Path('data','fhvhv_tripdata_2021-01.csv.gz')))

In [62]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [63]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', DoubleType(), True)])

## Spark Partitions

There can be only one Executor from the Spark cluster per file. So if we keep ONE large file, we don't really use the power of spark and its multiple Executors...

#### To solve this problem, we can split the file into several files using the Spark PARITIONS.

In [64]:
df = df.repartition(24)

#### This command alone, doesn't do anything, but it's applied when using other commands

In [67]:
df.write.parquet('data/fhv/2021/01/', mode="overwrite")

                                                                                

#### Now we can read the parquet files

In [68]:
df = spark.read.parquet('data/fhv/2021/01/')

In [69]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)



# Basic commands (similar to pandas)

### Select some columns

In [74]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID', 'hvfhs_license_num') \
  .filter(df.hvfhs_license_num == 'HV0004') \
  .show()

+-------------------+-------------------+------------+------------+-----------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|hvfhs_license_num|
+-------------------+-------------------+------------+------------+-----------------+
|2021-01-29 17:18:59|2021-01-29 17:35:54|         263|         162|           HV0004|
|2021-01-08 13:45:06|2021-01-08 14:32:53|          95|          22|           HV0004|
|2021-01-20 10:57:31|2021-01-20 11:20:45|         244|         262|           HV0004|
|2021-01-26 19:29:44|2021-01-26 19:55:30|         137|         129|           HV0004|
|2021-01-30 16:55:04|2021-01-30 17:00:33|         229|         233|           HV0004|
|2021-01-12 10:51:03|2021-01-12 11:18:48|          76|         130|           HV0004|
|2021-01-29 07:41:40|2021-01-29 08:03:07|         263|          13|           HV0004|
|2021-01-24 23:30:27|2021-01-24 23:39:11|         132|          10|           HV0004|
|2021-01-28 07:38:46|2021-01-28 08:15:34|         196|

### Add a new columns and use SQL functions

In [75]:
from pyspark.sql import functions as F

In [84]:
df_tmp = df \
    .withColumn('new_pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('new_dropoff_date', F.to_date(df.dropoff_datetime))

In [85]:
df_tmp \
    .select('new_pickup_date', 'new_dropoff_date', 'PULocationID', 'DOLocationID') \
    .show(3)

+---------------+----------------+------------+------------+
|new_pickup_date|new_dropoff_date|PULocationID|DOLocationID|
+---------------+----------------+------------+------------+
|     2021-01-20|      2021-01-20|         148|         188|
|     2021-01-09|      2021-01-09|          15|         265|
|     2021-01-24|      2021-01-24|          72|          97|
+---------------+----------------+------------+------------+
only showing top 3 rows



### Create our own pyspark.sql.function (it will be converted to SQL and executed)

In [86]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [87]:
crazy_stuff('B02682')

'a/a7a'

In [88]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=StringType())

#### Let's create a new dataframe column with this crazy_stuff function

In [90]:
df_tmp = df \
    .withColumn('new_pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('new_dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('new_dispatching_base_num', crazy_stuff_udf(df.dispatching_base_num))

In [91]:
df_tmp \
    .select('new_pickup_date', 'new_dropoff_date', 'PULocationID', 'DOLocationID', 'new_dispatching_base_num') \
    .show(3)

[Stage 24:>                                                         (0 + 1) / 1]

+---------------+----------------+------------+------------+------------------------+
|new_pickup_date|new_dropoff_date|PULocationID|DOLocationID|new_dispatching_base_num|
+---------------+----------------+------------+------------+------------------------+
|     2021-01-20|      2021-01-20|         148|         188|                   e/b35|
|     2021-01-09|      2021-01-09|          15|         265|                   e/b14|
|     2021-01-24|      2021-01-24|          72|          97|                   s/b3d|
+---------------+----------------+------------+------------+------------------------+
only showing top 3 rows



                                                                                

#### The advantage over using SQL directly is that we can create very complicated functions.
#### Furthermore we can run Tests on those functions whereas it's not really possible on SQL functions