 # Instancing Spark
 
 Importing the PySpark library and the SparkSession class

In [1]:
import pyspark

In [2]:
pyspark.__file__

'/home/USERNAME/spark/spark-3.4.2-bin-hadoop3/python/pyspark/__init__.py'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/23 09:27:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [21]:
# using a bash command to view this csv

!head taxi_zone_lookup.csv

"LocationID","Borough","Zone","service_zone"
1,"EWR","Newark Airport","EWR"
2,"Queens","Jamaica Bay","Boro Zone"
3,"Bronx","Allerton/Pelham Gardens","Boro Zone"
4,"Manhattan","Alphabet City","Yellow Zone"
5,"Staten Island","Arden Heights","Boro Zone"
6,"Staten Island","Arrochar/Fort Wadsworth","Boro Zone"
7,"Queens","Astoria","Boro Zone"
8,"Queens","Astoria Park","Boro Zone"
9,"Queens","Auburndale","Boro Zone"


# Read Data

In [7]:
# Downloading some high volume data - as spark is meant to deal with big data (this still isn't that big a file)

!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

--2024-01-22 16:54:17--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8-4e24-47e8-a3ce-edcf6d1b11c7?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240122%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240122T165418Z&X-Amz-Expires=300&X-Amz-Signature=aa17347fc636afb6a8a9a88fa8f7cd9b14f3c1b5984317c71c0379324758ec61&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-01-22 16:54:18--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/035746e8

In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

                                                                                

In [6]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   null|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   null|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   null|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   null|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   null|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

# Creating a Schema with Pandas

In [7]:
df.schema # everything is string type

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [10]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [8]:
import pandas as pd

In [9]:
df_pandas = pd.read_csv('head.csv')

In [13]:
df_pandas.dtypes # is now infering the types - didn't infer drop off and pick up as timestamps though

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [14]:
df_pandas.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,
1,HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,
2,HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,
3,HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,
4,HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,


In [18]:
spark.createDataFrame(df_pandas).schema # using spark session to create a spark dataframe
# now that we've a pandas dataframe to create the spark dataframe, can see that not all the field types are strings anymore

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

This datatype description is from Scala, which is the language that Spark is created with. Let's clean up the output and create our usable schema from it.

In [11]:
from pyspark.sql import types

In [12]:
schema = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True),
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.IntegerType(), True),
        types.StructField('DOLocationID', types.IntegerType(), True),
        types.StructField('SR_Flag', types.StringType(), True)
    ]
)

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [25]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   null|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   null|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   null|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   null|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   null|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

# Partitions 

We will now create 24 partitions in our dataframe

In [14]:
df = df.repartition(24) # this is a transformation, which is lazily executed (i.e. only executed when an action is run, like saving the df)

We can now parquetize the dataframe. This will create 24 smaller parquet files.

This operation may take a while.

In [15]:
df.write.parquet('fhvhv/2021/01/')

                                                                                

In [16]:
!ls -lh fhvhv/2021/01/ # 24 parquet files

total 215M
-rw-r--r-- 1 USERNAME USERNAME    0 Jan 23 09:31 _SUCCESS
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00000-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00001-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00002-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00003-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00004-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00005-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00006-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.snappy.parquet
-rw-r--r-- 1 USERNAME USERNAME 9.0M Jan 23 09:31 part-00007-b5ebaa4e-8fb5-420f-a48b-e369c274360e-c000.s

# Spark DataFrames

In [None]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
  .filter(df.hvfhs_license_num == 'HV0003')