In [28]:
import pyspark
from pyspark.sql import SparkSession

import pandas as pd

In [29]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

print("\nSuccessfully created spark session!")


Successfully created spark session!


In [30]:
df_pandas = pd.read_csv('data/fhv_tripdata_2019-10.csv')
df_pandas.head(1)

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009


In [31]:
from pyspark.sql import types

schema = types.StructType(
    [
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
)

df_fhv = spark.read \
            .option("header", "true") \
            .schema(schema) \
            .csv('data/fhv_tripdata_2019-10.csv')

df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
+--------------------+------------------

In [32]:
df_fhv = df_fhv.repartition(6)
df_fhv.write.parquet("data/fhv/2019/10", mode="overwrite")

df_fhv = spark.read.parquet("data/fhv/2019/10/")

                                                                                

In [9]:
!ls -lhR data/fhv/2019/10

total 74496
-rw-r--r--@ 1 sebastianwefers  staff     0B Feb 29 21:13 _SUCCESS
-rw-r--r--@ 1 sebastianwefers  staff   6.0M Feb 29 21:13 part-00000-d1b90f77-20bb-4916-87d4-da669d8992d9-c000.snappy.parquet
-rw-r--r--@ 1 sebastianwefers  staff   6.0M Feb 29 21:13 part-00001-d1b90f77-20bb-4916-87d4-da669d8992d9-c000.snappy.parquet
-rw-r--r--@ 1 sebastianwefers  staff   6.0M Feb 29 21:13 part-00002-d1b90f77-20bb-4916-87d4-da669d8992d9-c000.snappy.parquet
-rw-r--r--@ 1 sebastianwefers  staff   6.0M Feb 29 21:13 part-00003-d1b90f77-20bb-4916-87d4-da669d8992d9-c000.snappy.parquet
-rw-r--r--@ 1 sebastianwefers  staff   6.0M Feb 29 21:13 part-00004-d1b90f77-20bb-4916-87d4-da669d8992d9-c000.snappy.parquet
-rw-r--r--@ 1 sebastianwefers  staff   6.0M Feb 29 21:13 part-00005-d1b90f77-20bb-4916-87d4-da669d8992d9-c000.snappy.parquet


In [33]:
from pyspark.sql.functions import col, to_date

filtered_df = df_fhv.filter(
                to_date(col("pickup_datetime")) == "2019-10-15"
)

count = filtered_df.count()
print(f"Number of records for 15th October: {count}")

[Stage 75:>                                                         (0 + 6) / 6]

Number of records for 15th October: 62610


                                                                                

In [34]:
from pyspark.sql.functions import col, to_date, unix_timestamp, max

# Calculate trip duration in hours
df_fhv = df_fhv.withColumn(
    "trip_duration_hours",
    (unix_timestamp("dropOff_datetime") - unix_timestamp("pickup_datetime")) / 3600
)

# Extract date from pickup_datetime and group by it, then find the max duration for each day
daily_longest_trip = df_fhv.groupBy(
                            to_date(col("pickup_datetime")).alias("date")) \
                            .agg(max("trip_duration_hours").alias("longest_trip_hours")
)

# Find the longest trip in the dataset
longest_trip_overall = daily_longest_trip \
    .agg(max("longest_trip_hours").alias("longest_trip_in_dataset_hours"))

# Collect the result as a local object
longest_trip_result = longest_trip_overall.collect()[0]

# Extract the longest trip duration in hours from the result
longest_trip_in_hours = longest_trip_result["longest_trip_in_dataset_hours"]

# Print the longest trip duration in hours
print(f"\n ---> The longest trip in the dataset is {longest_trip_in_hours} hours long.")




 ---> The longest trip in the dataset is 631152.5 hours long.


                                                                                

In [35]:
df_zones = pd.read_csv('data/taxi_zone_lookup.csv')
df_zones.head(5)

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [36]:
schema = types.StructType(
    [
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
    ]
)

df_zones = spark.read \
            .option("header", "true") \
            .schema(schema) \
            .csv('data/taxi_zone_lookup.csv')

df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [41]:
# Create temp views
df_fhv.createOrReplaceTempView("fhv_tripdata")
df_zones.createOrReplaceTempView("zones")

# SQL query: Find the most frequent pickup location zone
query = """
    SELECT
        z.Zone, COUNT(*) as total_pickups
    FROM
        fhv_tripdata f
    JOIN
        zones z ON f.PULocationID = z.LocationID
    GROUP BY
        z.Zone
    ORDER BY
        total_pickups ASC
    LIMIT 1
"""

# Execute the query
least_frequent_pickup_zone = spark.sql(query)

# get actual value
location = least_frequent_pickup_zone.collect()[0]["Zone"]
print(f"\n ---> The least frequent pickup location zone is {location}.")

[Stage 102:>                                                        (0 + 6) / 6]


 ---> The least frequent pickup location zone is Jamaica Bay.


                                                                                

In [None]:
# Collect the result as a local object
longest_trip_result = longest_trip_overall.collect()[0]

# Extract the longest trip duration in hours from the result
longest_trip_in_hours = longest_trip_result["longest_trip_in_dataset_hours"]

# Print the longest trip duration in hours
print(f"\n ---> The longest trip in the dataset is {longest_trip_in_hours} hours long.")