In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/04 10:38:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import pandas as pd

df = pd.read_csv('https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz')

In [3]:
print(len(df))

1897493


In [7]:
schema = df.dtypes
print(schema)

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object


In [10]:
df.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [12]:
from pyspark.sql import types

fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PUlocationID", types.IntegerType(), True),
    types.StructField("DOlocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [14]:
!wget 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz'
!cp fhv_tripdata_2019-10.csv.gz /home/cris/data-engineering-zoomcamp/05-batch/code/data/raw/fhv

--2024-03-03 19:16:36--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T191636Z&X-Amz-Expires=300&X-Amz-Signature=92c1a8f2f312b587a1f0ad0e6dd9392ce3d5a637effeb07eebc086747bfbca72&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-03 19:16:36--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [16]:
input_path = 'data/raw/fhv/'
output_path = 'data/pq/fhv/'

df_fhv = spark.read \
        .option("header", "true") \
        .schema(fhv_schema) \
        .csv(input_path)

df_fhv \
        .repartition(6) \
        .write.parquet(output_path)

                                                                                

In [4]:
df_fhv = spark.read.parquet('data/pq/fhv/')

                                                                                

In [5]:
df_fhv.registerTempTable('trips_data')



In [6]:
spark.sql("""
SELECT
    count(*)
FROM
    trips_data
WHERE
    pickup_datetime > "2019-10-14 23:59:59" AND pickup_datetime < "2019-10-16 00:00:00"
""").show()



+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

In [20]:
from datetime import datetime

def sane_date(start_date, end_date):
    # start_date = datetime.strptime(start_date_str, '%Y-%m-%d %H:%M:%S')
    # end_date = datetime.strptime(end_date_str, '%Y-%m-%d %H:%M:%S')
    if (end_date.year != start_date.year):
        return 0
    return (end_date - start_date)

In [21]:
from pyspark.sql import functions as F
sane_date_udf = F.udf(sane_date)

In [24]:
df_fhv = df_fhv \
    .withColumn('trip_duration', sane_date_udf(df_fhv.pickup_datetime, df_fhv.dropOff_datetime)) \
    .select('trip_duration', 'pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+
|       trip_duration|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+--------------------+-------------------+-------------------+------------+------------+
|Timedelta: 0 days...|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|
|Timedelta: 0 days...|2019-10-21 04:15:47|2019-10-21 04:36:04|         264|         264|
|Timedelta: 0 days...|2019-10-19 12:00:00|2019-10-19 12:20:00|         264|         264|
|Timedelta: 0 days...|2019-10-11 14:28:00|2019-10-11 14:32:44|         264|         216|
|Timedelta: 0 days...|2019-10-21 18:00:26|2019-10-21 18:07:21|         264|          80|
|Timedelta: 0 days...|2019-10-03 19:30:35|2019-10-03 20:27:57|         161|          14|
|Timedelta: 0 days...|2019-10-25 06:10:40|2019-10-25 06:29:43|         264|         208|
|Timedelta: 0 days...|2019-10-30 06:18:02|2019-10-30 06:35:12|         260|         260|
|Timedelta: 0 days...

In [29]:
# Add a new column for trip length
from pyspark.sql.functions import col
df_fhv = df_fhv.withColumn("trip_duration", (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60) # Duration in minutes

# Display the DataFrame with the new column
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|     trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|              12.0|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|1.8833333333333333|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|25.616666666666667|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|               1.3|
|              B00014|2019-

In [25]:
max_trip_duration = df_fhv.agg({"trip_duration": "max"}).collect()[0][0]

                                                                                

In [27]:
print(max_trip_duration)

Timedelta: 9 days, 995 seconds, 0 microseconds (total: 778595 seconds)


In [30]:
print(778595/3600)

216.2763888888889


In [42]:
# Try this using sql
df_fhv.registerTempTable('trips_data_with_duration')

In [43]:
spark.sql("""
    SELECT MAX(trip_duration)
    FROM trips_data_with_duration
""").show()

[Stage 19:>                                                         (0 + 1) / 1]

+------------------+
|max(trip_duration)|
+------------------+
|        3.786915E7|
+------------------+



                                                                                

In [44]:
print(3.786915E7/60)

631152.5


In [9]:
# moving on
df_zones = spark.read \
        .option("header", "true") \
        .csv('zones.csv')

df_zones.write.parquet('data/zones')

In [12]:
df_zones.head(10)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone'),
 Row(LocationID='6', Borough='Staten Island', Zone='Arrochar/Fort Wadsworth', service_zone='Boro Zone'),
 Row(LocationID='7', Borough='Queens', Zone='Astoria', service_zone='Boro Zone'),
 Row(LocationID='8', Borough='Queens', Zone='Astoria Park', service_zone='Boro Zone'),
 Row(LocationID='9', Borough='Queens', Zone='Auburndale', service_zone='Boro Zone'),
 Row(LocationID='10', Borough='Queens', Zone='Baisley Park', service_zone='Boro Zone')]

In [15]:
df_zones = spark.read \
    .option("header", "true") \
    .parquet('data/zones')

In [16]:
df_zones

DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]

In [17]:
df_fhv.head(2)

[Row(dispatching_base_num='B02784', pickup_datetime=datetime.datetime(2019, 10, 1, 9, 55, 38), dropOff_datetime=datetime.datetime(2019, 10, 1, 10, 5, 43), PUlocationID=89, DOlocationID=85, SR_Flag=None, Affiliated_base_number=None),
 Row(dispatching_base_num='B02429', pickup_datetime=datetime.datetime(2019, 10, 21, 4, 15, 47), dropOff_datetime=datetime.datetime(2019, 10, 21, 4, 36, 4), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B02429')]

In [18]:
df_result = df_fhv.join(df_zones, df_fhv.PUlocationID == df_zones.LocationID)

In [21]:
df_result

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropOff_datetime: timestamp, PUlocationID: int, DOlocationID: int, SR_Flag: string, Affiliated_base_number: string, LocationID: string, Borough: string, Zone: string, service_zone: string]

In [22]:
df_result.registerTempTable('trips')

In [28]:
spark.sql("""
SELECT
    Zone, COUNT(*) AS pickup_count
FROM
    trips
GROUP BY Zone
ORDER BY pickup_count
LIMIT 10;
""").show()

+--------------------+------------+
|                Zone|pickup_count|
+--------------------+------------+
|         Jamaica Bay|           1|
|Governor's Island...|           2|
| Green-Wood Cemetery|           5|
|       Broad Channel|           8|
|     Highbridge Park|          14|
|        Battery Park|          15|
|Saint Michaels Ce...|          23|
|Breezy Point/Fort...|          25|
|Marine Park/Floyd...|          26|
|        Astoria Park|          29|
+--------------------+------------+

