In [4]:
# Setup
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
import warnings
pd.DataFrame.iteritems = pd.DataFrame.items # for pandas >=2.0

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('HW_solutions') \
    .getOrCreate()

warnings.filterwarnings('ignore')

# Important: I use conda environment (activate base) with python 3.10.13 because python 3.12 was incompatible with version 3.3.2 of spark
# with base environment activated I had to run "pip install pyspark==3.3.2"

### Question 1

In [5]:
spark.version

'3.3.2'

### Question 2

In [6]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

# move then to "data" folder

--2024-03-16 18:52:42--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 20.201.28.151
Connecting to github.com (github.com)|20.201.28.151|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240316%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240316T215242Z&X-Amz-Expires=300&X-Amz-Signature=3078368dc0c0407dbe11f5314f270d02d9baca2df600a1081c8a3bdad5181b25&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-16 18:52:42--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5

In [23]:
df = spark.read \
     .option("header", "true") \
     .csv('data/fhv_tripdata_2019-10.csv.gz')

In [24]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [25]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [35]:
df_pandas = pd.read_csv('data/fhv_tripdata_2019-10.csv.gz', compression="gzip", keep_default_na=False) #nrows= 100 to test

In [36]:
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropOff_datetime          object
PUlocationID              object
DOlocationID              object
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [37]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [38]:
from pyspark.sql import types

In [30]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime',types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [39]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhv_tripdata_2019-10.csv.gz')

In [40]:
df = df.repartition(6)

In [41]:
df.write.mode('overwrite').parquet('./data/parquet')

# The average size of the Parquet of entire file is 6.35 MB

### Question 3

In [43]:
df.registerTempTable('trips_data')

In [45]:
spark.sql("""
SELECT
    count(*) AS total_trips
FROM
    trips_data
WHERE 
    to_date(pickup_datetime) ='2019-10-15'
""").show()

+-----------+
|total_trips|
+-----------+
|      62610|
+-----------+



In [47]:
# Another option
from pyspark.sql import functions as F

df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

### Question 4

In [50]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropOff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS duration
FROM 
    trips_data
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 10;
""").show()

+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2019-10-28|          631152.5|
| 2019-10-11|          631152.5|
| 2019-10-31| 87672.44083333333|
| 2019-10-01| 70128.02805555555|
| 2019-10-17|            8794.0|
| 2019-10-26| 8784.166666666666|
| 2019-10-30|1464.5344444444445|
| 2019-10-25|1056.8266666666666|
| 2019-10-02| 769.2313888888889|
| 2019-10-23| 745.6166666666667|
+-----------+------------------+



In [52]:
# Another option
from pyspark.sql.functions import *

df.withColumn('DiffInSeconds',unix_timestamp("dropoff_datetime") - unix_timestamp('pickup_datetime')) \
    .withColumn('DiffInHours', col('DiffInSeconds')/3600) \
    .sort('DiffInSeconds', ascending=False) \
    .limit(10) \
    .show(truncate=False)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+------------------+
|dispatching_base_num|pickup_datetime    |dropOff_datetime   |PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|DiffInSeconds|DiffInHours       |
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+------------------+
|B02832              |2019-10-11 18:00:00|2091-10-11 18:30:00|264         |264         |null   |B02832                |2272149000   |631152.5          |
|B02832              |2019-10-28 09:00:00|2091-10-28 09:30:00|264         |264         |null   |B02832                |2272149000   |631152.5          |
|B02416              |2019-10-31 23:46:33|2029-11-01 00:13:00|null        |null        |null   |B02416                |315620787    |87672.44083333333 |
|B00746              |2019-10-01 21:43:42|2027-10-01 21:45:23|159         |264    

### Question 5

In [53]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

# move then to "data" folder

--2024-03-16 19:35:14--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.201.28.151
Connecting to github.com (github.com)|20.201.28.151|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240316%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240316T223515Z&X-Amz-Expires=300&X-Amz-Signature=38d3e701cbe7817ce0f0e77990193925d00d4deab263b58167258b8b61195276&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-16 19:35:15--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a

In [56]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('data/taxi_zone_lookup.csv')

In [57]:
df_zones.show(truncate=False)

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|1         |EWR          |Newark Airport         |EWR         |
|2         |Queens       |Jamaica Bay            |Boro Zone   |
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
|6         |Staten Island|Arrochar/Fort Wadsworth|Boro Zone   |
|7         |Queens       |Astoria                |Boro Zone   |
|8         |Queens       |Astoria Park           |Boro Zone   |
|9         |Queens       |Auburndale             |Boro Zone   |
|10        |Queens       |Baisley Park           |Boro Zone   |
|11        |Brooklyn     |Bath Beach             |Boro Zone   |
|12        |Manhattan    |Battery Park           |Yellow Zone |
|13        |Manhattan    |Battery Park C

In [58]:
df_zones.createOrReplaceTempView("zones")

In [60]:
spark.sql("""
SELECT
    zones.LocationID,
    zones.Zone,
    COUNT(1) AS total_trips
FROM
    trips_data
INNER JOIN zones
    ON trips_data.PULocationID = zones.LocationID
GROUP BY zones.LocationID,
    zones.Zone
ORDER BY total_trips
LIMIT 10
""").show()

+----------+--------------------+-----------+
|LocationID|                Zone|total_trips|
+----------+--------------------+-----------+
|         2|         Jamaica Bay|          1|
|       105|Governor's Island...|          2|
|       111| Green-Wood Cemetery|          5|
|        30|       Broad Channel|          8|
|       120|     Highbridge Park|         14|
|        12|        Battery Park|         15|
|       207|Saint Michaels Ce...|         23|
|        27|Breezy Point/Fort...|         25|
|       154|Marine Park/Floyd...|         26|
|         8|        Astoria Park|         29|
+----------+--------------------+-----------+

