## Importing libraries

In [5]:
import os
import sys
from datetime import datetime

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
import pandas as pd

# Question 1

Starting Spark session:

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/22 09:40:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Checking Spark version:

In [3]:
spark.version

'3.3.2'

Using environment with Python 3.10:

In [7]:
sys.version

'3.10.14 (main, Mar 21 2024, 16:24:04) [GCC 11.2.0]'

# Question 2

Extracting data:

In [4]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-21 12:34:35--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240321%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240321T123435Z&X-Amz-Expires=300&X-Amz-Signature=6daf8a5b6d13b9bb9bf47a27630e16e498c4e17b7d8c044bb5d3d67ed67266cf&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-21 12:34:36--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6

In [None]:
!gzip -d fhv_tripdata_2019-10.csv.gz

In [1]:
!wc -l fhv_tripdata_2019-10.csv

1897494 fhv_tripdata_2019-10.csv


Reading data into Spark:

In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

                                                                                

Checking schema:

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

Using Pandas to infer datatypes:

In [8]:
!head -n 1001 fhv_tripdata_2019-10.csv > head.csv

In [9]:
df_pandas = pd.read_csv('head.csv')

In [10]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

Defining the schema and using it to read the dataset:

In [13]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [14]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

Repartitioning and writing data to `.parquet` files:

In [15]:
df = df.repartition(6)

In [16]:
df.write.parquet('fhv/2019/10/')

                                                                                

Obtaining the average `.parquet` file size:

In [26]:
sum(os.path.getsize('fhv/2019/10/' + f) for f in os.listdir('fhv/2019/10/') if not f.startswith('.')) / 6

6542913.5

# Question 3

Re-reading data from `.parquet` files:

In [4]:
df = spark.read.parquet('fhv/2019/10/')

                                                                                

In [28]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [29]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   null|                  null|
|              B01315|2019-10-05 15:13:04|2019-10-05 15:19:48|         264|          74|   null|                B01315|
|              B01984|2019-10-12 17:13:00|2019-10-12 17:40:00|         264|          75|   null|                B01984|
|              B00310|2019-10-15 10:55:04|2019-10-15 11:00:45|         264|         247|   null|                B03047|
|              B00932|2019-10-08 06:58:42|2019-10-08 07:11:11|         264|          37|   null|                B00932|
|              B01029|2019-10-10 14:45:0

Counting taxi trips on the 15th of October:

In [32]:
df.filter(df.pickup_datetime.startswith('2019-10-15')).count()

                                                                                

62610

# Question 4

UDF to find the difference between two timestamps in hours:

In [20]:
def dt_diff(*dt):
    tstamp1 = dt[0]
    tstamp2 = dt[1]
    
    if tstamp1 > tstamp2:
        td = tstamp1 - tstamp2
    else:
        td = tstamp2 - tstamp1
        
    td_hours = float(round(td.total_seconds() / 3600, 2))
    return td_hours

In [47]:
dt_diff('2019-10-01 09:55:38', '2019-10-01 10:05:43')

0.17

In [14]:
dt_diff_udf = F.udf(dt_diff, returnType=types.FloatType())

Applying the UDF to find the maximum trip duration:

In [19]:
df \
    .withColumn('duration_hours', dt_diff_udf(df.pickup_datetime, df.dropoff_datetime)) \
    .select(F.max('duration_hours')) \
    .show() 



+-------------------+
|max(duration_hours)|
+-------------------+
|           631152.5|
+-------------------+



                                                                                

# Question 6

Extracting zone data:

In [5]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-22 09:43:11--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240322%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240322T094311Z&X-Amz-Expires=300&X-Amz-Signature=5b4a4fc32010a6ec796849bafa0bafbd59adf03040cf9ad13fd5dea91c9e3635&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-22 09:43:11--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

Reading zone data with Spark:

In [6]:
df_zones = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)

In [7]:
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



Registering temporary tables:

In [8]:
df.registerTempTable('fhv')
df_zones.registerTempTable('zones')



Applying SQL query to find the least frequent pickup zone:

In [10]:
spark.sql("""
SELECT 
    zones.Zone, COUNT(1) AS pickup_zone_freq
FROM
    fhv
LEFT JOIN
    zones
ON
    fhv.PULocationID = zones.LocationID
GROUP BY
    zones.Zone
ORDER BY pickup_zone_freq
""").show()

[Stage 8:>                                                          (0 + 2) / 2]

+--------------------+----------------+
|                Zone|pickup_zone_freq|
+--------------------+----------------+
|         Jamaica Bay|               1|
|Governor's Island...|               2|
| Green-Wood Cemetery|               5|
|       Broad Channel|               8|
|     Highbridge Park|              14|
|        Battery Park|              15|
|Saint Michaels Ce...|              23|
|Breezy Point/Fort...|              25|
|Marine Park/Floyd...|              26|
|        Astoria Park|              29|
|    Inwood Hill Park|              39|
|       Willets Point|              47|
|Forest Park/Highl...|              53|
|  Brooklyn Navy Yard|              57|
|        Crotona Park|              62|
|        Country Club|              77|
|     Freshkills Park|              89|
|       Prospect Park|              98|
|     Columbia Street|             105|
|  South Williamsburg|             110|
+--------------------+----------------+
only showing top 20 rows



                                                                                