Tutorial from: https://www.youtube.com/watch?v=r_Sf6fCB40c&list=PL3MmuxUbc_hJed7dXYoJw8DoCuVHhGEQb&index=54m
Spark master UI: http://localhost:4040/jobs/

# Setting up the pyspark session

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/11/28 10:05:52 WARN Utils: Your hostname, Prashants-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.254.38 instead (on interface en0)
24/11/28 10:05:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/28 10:05:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.option("header", "true")\
    .csv('data/taxi_data.csv')

In [3]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:28:09', on_scene_datetime='2021-01-01 00:31:42', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', trip_miles='5.26', trip_time='923', base_passenger_fare='22.28', tolls='0.0', bcf='0.67', sales_tax='1.98', congestion_surcharge='2.75', airport_fee=None, tips='0.0', driver_pay='14.99', shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N'),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:45:56', on_scene_datetime='2021-01-01 00:55:19', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', trip_miles='3.65', trip_time='1382', base_passenger_fare='18.36', tolls='0.0', bcf='0.55', sales_tax='1.63',

# How to read data and assign suitable datatypes?
### Defining the schema for spark dataframe 
### Since all of the data is currently in string format, we want to determine the exact datatypes too.

## Using pandas to determine the data types

In [7]:
!head -n 1000 data/taxi_data.csv > head.csv

In [8]:
import pandas as pd
df_pandas = pd.read_csv('head.csv')
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
originating_base_num     object
request_datetime         object
on_scene_datetime        object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
trip_miles              float64
trip_time                 int64
base_passenger_fare     float64
tolls                   float64
bcf                     float64
sales_tax               float64
congestion_surcharge    float64
airport_fee             float64
tips                    float64
driver_pay              float64
shared_request_flag      object
shared_match_flag        object
access_a_ride_flag       object
wav_request_flag         object
wav_match_flag           object
dtype: object

#### All data was in string format. It could identify numbers and floats but some are just 'objects'.
#### Let's see how spark reads the same datatypes.

In [9]:
#Converting pandas dataframe to spark data frame
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', StringType(), True), StructField('on_scene_datetime', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag', StringType(

It can find almost all datatypes except some date time data types. We could also downsize some types (e.g. double, longType) to save memory. Let's modify the some of the above data types.

In [10]:
#taking in the current schema and modifying the datatypes of the fields as needed. e.g. pickup_datetime was string but I need it as date time.
from pyspark.sql import types
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True), 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('originating_base_num', types.StringType(), True), 
    types.StructField('request_datetime', types.StringType(), True), 
    types.StructField('on_scene_datetime', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('trip_miles', types.DoubleType(), True), 
    types.StructField('trip_time', types.IntegerType(), True), 
    types.StructField('base_passenger_fare', types.DoubleType(), True), 
    types.StructField('tolls', types.DoubleType(), True), 
    types.StructField('bcf', types.DoubleType(), True), 
    types.StructField('sales_tax', types.DoubleType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True), 
    types.StructField('airport_fee', types.DoubleType(), True), 
    types.StructField('tips', types.DoubleType(), True), 
    types.StructField('driver_pay', types.DoubleType(), True), 
    types.StructField('shared_request_flag', types.StringType(), True), 
    types.StructField('shared_match_flag', types.StringType(), True),
    types.StructField('access_a_ride_flag', types.StringType(), True), 
    types.StructField('wav_request_flag', types.StringType(), True), 
    types.StructField('wav_match_flag', types.StringType(), True)])

In [11]:
# Define the same data with the new schema
df = spark.read.option("header", "true")\
    .schema(schema)\
    .csv('data/taxi_data.csv')

In [12]:
#Let's check the data
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:28:09', on_scene_datetime='2021-01-01 00:31:42', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, trip_miles=5.26, trip_time=923, base_passenger_fare=22.28, tolls=0.0, bcf=0.67, sales_tax=1.98, congestion_surcharge=2.75, airport_fee=None, tips=0.0, driver_pay=14.99, shared_request_flag='N', shared_match_flag='N', access_a_ride_flag=' ', wav_request_flag='N', wav_match_flag='N'),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', originating_base_num='B02682', request_datetime='2021-01-01 00:45:56', on_scene_datetime='2021-01-01 00:55:19', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, trip_miles=3.65, trip_time=1382, base_passenger_fare=18.36,

### In the result, we can see that the data is parsed as integers and date time objects.

## Partitioning the data
In real scenarios, we want to make use of as much spark clusters as possible. So, we want to have the big csv file partitioned into many small chunks so that each cluster can pick up one chunk and work on that parallely. 
Partitioning the data in spark is lazy mode i.e. it will partition the data whenever the data is needed somewhere.

In [None]:
#random choice for 24.
df = df.repartition(24)

In [None]:
# Writing the partitioned data into parquet files
df.write.parquet('data/fhvhv/2021/01/')

In [14]:
# We can see the parquet files inside the directory: fhvhv/2021/01/
! ls data/fhvhv/2021/01/ | wc -l
# One extra file is a _SUCCESS file which denotes that the partitioning has been done successfully. Maybe it can be used as a trgger for other jobs.

      25


### Reading the parquet files

In [18]:
df = spark.read.parquet('data/fhvhv/2021/01/')
df.printSchema()
# Notice how the schema is preserved.

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: string (nullable = true)
 |-- on_scene_datetime: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: integer (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (

### Selecting columns and filtering by column values
If I did just df.select('pickup_datetime', 'dropoff_datetime').filter(df.hvfhs_license_num== 'HV0003'), spark would not run it right away (as seen in the Spark MasterUI) because it is executed lazily. But when I do .show() it has to run the code.

In [23]:
df.select('pickup_datetime', 'dropoff_datetime').filter(df.hvfhs_license_num== 'HV0003').show()

+-------------------+-------------------+
|    pickup_datetime|   dropoff_datetime|
+-------------------+-------------------+
|2021-01-02 18:51:52|2021-01-02 18:58:05|
|2021-01-02 17:54:29|2021-01-02 18:02:27|
|2021-01-02 14:44:27|2021-01-02 14:57:26|
|2021-01-02 15:28:32|2021-01-02 15:34:13|
|2021-01-01 18:07:11|2021-01-01 18:17:54|
|2021-01-01 05:50:20|2021-01-01 06:01:20|
|2021-01-01 09:48:42|2021-01-01 09:51:57|
|2021-01-02 14:37:08|2021-01-02 14:59:03|
|2021-01-01 17:15:42|2021-01-01 17:20:41|
|2021-01-02 20:22:08|2021-01-02 20:36:46|
|2021-01-01 21:55:19|2021-01-01 22:05:38|
|2021-01-02 10:39:57|2021-01-02 10:44:59|
|2021-01-01 10:43:01|2021-01-01 11:25:26|
|2021-01-01 00:40:58|2021-01-01 00:50:46|
|2021-01-02 12:37:19|2021-01-02 12:39:25|
|2021-01-02 15:34:47|2021-01-02 15:45:25|
|2021-01-02 02:31:36|2021-01-02 02:46:22|
|2021-01-02 12:37:12|2021-01-02 12:43:30|
|2021-01-01 17:39:10|2021-01-01 17:52:40|
|2021-01-03 04:25:50|2021-01-03 04:34:46|
+-------------------+-------------

### There are two types of commands in spark: Transformations and Actions.
- Transformations: Select, filtering, joins, groups etc. Lazily executed.
- Actions: Show, take(head), write etc. Eagerly executed (along with all previous transformations.)

### We can use functions like in SQL. e.g. to_date(), max(), sum(). They are in pyspark.sql.

In [31]:
from pyspark.sql import functions as F

In [42]:
# Let's change the datetime to dates
df\
.withColumn('pickup_date', F.to_date(df.pickup_datetime))\
.withColumn('dropoff_date', F.to_date(df.dropoff_datetime))\
.select('pickup_date', 'dropoff_date') \
.show()

+-----------+------------+
|pickup_date|dropoff_date|
+-----------+------------+
| 2021-01-02|  2021-01-02|
| 2021-01-02|  2021-01-02|
| 2021-01-02|  2021-01-02|
| 2021-01-01|  2021-01-01|
| 2021-01-02|  2021-01-02|
| 2021-01-01|  2021-01-01|
| 2021-01-01|  2021-01-01|
| 2021-01-01|  2021-01-01|
| 2021-01-02|  2021-01-02|
| 2021-01-01|  2021-01-01|
| 2021-01-02|  2021-01-02|
| 2021-01-02|  2021-01-02|
| 2021-01-01|  2021-01-01|
| 2021-01-02|  2021-01-02|
| 2021-01-01|  2021-01-01|
| 2021-01-01|  2021-01-01|
| 2021-01-01|  2021-01-01|
| 2021-01-01|  2021-01-01|
| 2021-01-03|  2021-01-03|
| 2021-01-02|  2021-01-02|
+-----------+------------+
only showing top 20 rows



#### What if we need some extra functions than provided? (Difference from SQL)
##### We can use user defined functions(udf) as transformations for the data. Some could be quite difficult to implement with SQL but here we can just use python functions and make them udfs. This makes Spark more powerful.
- For example: The business needs to convert dispatching_base_num(e.g.B02764) to integer by removing 'B' and check if the number is divisible by 2(Put result_yes afterwards) or not(result_no afterwards).

In [46]:
def transform_base_num(base_num):
    num = int(base_num[1:])
    if num%2==0:
        new_num = str(num/2) + '_yes'
    else:
        new_num  = str(num/2) + '_no'
    return new_num
        

In [49]:
# testing the function
print(transform_base_num('B02764'))
print(transform_base_num('B02765'))

1382.0_yes
1382.5_no


In [54]:
#defining udf for the function
transform_base_num_udf = F.udf(transform_base_num, returnType=types.StringType())
#using the udf
df\
.withColumn('modified_base_num', transform_base_num_udf(df.dispatching_base_num))\
.select('dispatching_base_num', 'modified_base_num') \
.show()

+--------------------+-----------------+
|dispatching_base_num|modified_base_num|
+--------------------+-----------------+
|              B02875|        1437.5_no|
|              B02836|       1418.0_yes|
|              B02876|       1438.0_yes|
|              B02510|       1255.0_yes|
|              B02764|       1382.0_yes|
|              B02866|       1433.0_yes|
|              B02866|       1433.0_yes|
|              B02888|       1444.0_yes|
|              B02764|       1382.0_yes|
|              B02870|       1435.0_yes|
|              B02510|       1255.0_yes|
|              B02883|        1441.5_no|
|              B02884|       1442.0_yes|
|              B02869|        1434.5_no|
|              B02764|       1382.0_yes|
|              B02764|       1382.0_yes|
|              B02510|       1255.0_yes|
|              B02510|       1255.0_yes|
|              B02800|       1400.0_yes|
|              B02510|       1255.0_yes|
+--------------------+-----------------+
only showing top