In [11]:
FHVHV_FILE = "fhvhv_tripdata_2021-06.csv.gz"
ZONES_FILE = "taxi_zone_lookup.csv"
INPUT_PATH = "data/raw"
OUTPUT_PATH = "data/pq"

URL_FILES = [
    f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/{FHVHV_FILE}",
    f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/{ZONES_FILE}",
]

In [12]:
from pathlib import Path
from urllib import request

for url in URL_FILES:
    file_type = url.split("/")[-2]
    file_name = url.split("/")[-1]
    local_directory = f"{INPUT_PATH}/{file_type}"
    input_path = f"{local_directory}/{file_name}"
    
    Path(local_directory).mkdir(parents=True, exist_ok=True)
    request.urlretrieve(url, input_path)

In [13]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

spark = (
    SparkSession.builder
    .master("local[*]")
    .appName("Homework_05")
    .getOrCreate()
)

In [14]:
input_path = f"{INPUT_PATH}/fhvhv/"
output_path = f"{OUTPUT_PATH}/fhvhv/"

fhvhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

fhvhv_df = (
    spark.read
    .option("header", "true")
    .schema(fhvhv_schema)
    .csv(input_path)
)

fhvhv_df \
    .repartition(12) \
    .write.parquet(output_path, mode="overwrite")
    
fhvhv_df.createOrReplaceTempView("fhvhv_trips")

                                                                                

In [15]:
input_path = f"{INPUT_PATH}/misc/"
output_path = f"{OUTPUT_PATH}/misc/"

zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

zones_df = (
    spark.read
    .option("header", "True")
    .schema(zones_schema)
    .csv(input_path)
)

zones_df.write.parquet(output_path, mode="overwrite")

zones_df.createOrReplaceTempView("zones")

In [16]:
spark.sql("""
select
    count(*) as trips_count
from
    fhvhv_trips
where
    cast(pickup_datetime as date) = '2021-06-15'
""").show()

[Stage 15:>                                                         (0 + 1) / 1]

+-----------+
|trips_count|
+-----------+
|     452470|
+-----------+



                                                                                

In [50]:
fhvhv_count_df = (
    fhvhv_df
    .filter(F.col("pickup_datetime").cast("date") == '2021-06-15')
    .select(F.count("*").alias("trips_count"))
).show()

[Stage 64:>                                                         (0 + 1) / 1]

+-----------+
|trips_count|
+-----------+
|     452470|
+-----------+



                                                                                

In [17]:
spark.sql("""
select 
    round(max(
        cast(dropoff_datetime - pickup_datetime as bigint) / 3600
    ), 2) as max_trip_duration_hours 
from 
    fhvhv_trips
""").show()

[Stage 18:>                                                         (0 + 1) / 1]

+-----------------------+
|max_trip_duration_hours|
+-----------------------+
|                  66.88|
+-----------------------+



                                                                                

In [41]:
max_duration_fhvhv_df = (
    fhvhv_df
    .withColumn(
        "trip_duration_hours", 
        (F.col("dropoff_datetime") - F.col("pickup_datetime")).cast("bigint") / 3600
    )
    .select(
        F.round(F.max(F.col("trip_duration_hours")), 2).alias("max_trip_duration_hours")
    )
).show(truncate=False)

[Stage 47:>                                                         (0 + 1) / 1]

+-----------------------+
|max_trip_duration_hours|
+-----------------------+
|66.88                  |
+-----------------------+



                                                                                

In [18]:
spark.sql("""
select 
    z.Zone as zone_name
from 
    fhvhv_trips ft
    join zones z on ft.PULocationID = z.LocationID
group by
    z.Zone
order by
    count(*) desc
limit
    1
""").show()

[Stage 22:>                                                         (0 + 1) / 1]

+-------------------+
|          zone_name|
+-------------------+
|Crown Heights North|
+-------------------+



                                                                                

In [46]:
frequent_zone_df = (
    fhvhv_df
    .join(zones_df, fhvhv_df.PULocationID == zones_df.LocationID, how="inner")
    .groupBy(F.col("Zone"))
    .agg(F.count("*").alias("trips_count"))
    .select(F.col("Zone").alias("zone_name"))
    .orderBy(F.col("trips_count").desc())
    .limit(1)
).show(truncate=False)

[Stage 55:>                                                         (0 + 1) / 1]

+-------------------+
|zone_name          |
+-------------------+
|Crown Heights North|
+-------------------+



                                                                                