In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
import time
import datetime
import pandas as pd
import numpy as np
from pyspark.context import SparkContext
from datetime import datetime, timedelta

In [4]:
data = spark \
      .read \
      .option ("inferSchema" , "true") \
      .option ("header" , "true") \
      .csv ("gs://personalized-recommendation-system-files/fhvhv_tripdata_2021-06.csv.gz")

In [5]:
data.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



# Question 3: How many taxi trips were started on June 15th?

In [7]:
count_trips = data.filter(col("pickup_datetime").like("2021-06-15%")).count()

# print the number of trips made on June 15
print("Number of trips made on June 15:", count_trips)

Number of trips made on June 15: 452470


# Question 4: How long is the longest trip in the dataset? 

In [8]:
data = data.withColumn("distance", 
                   when(col("DOLocationID") == col("PULocationID"), 0)
                   .otherwise((col("DOLocationID") - col("PULocationID")).cast("float"))
                   )

# sort the DataFrame by distance in descending order and select the first row
longest_trip = data.orderBy(col("distance").desc()).first()

# print the length of the longest trip
print("Length of longest trip:", longest_trip["distance"])

Length of longest trip: 263.0


# Question 6: What is the name of the most frequent pickup location zone?

In [9]:
zones = spark \
      .read \
      .option ("inferSchema" , "true") \
      .option ("header" , "true") \
      .csv ("gs://personalized-recommendation-system-files/taxi_zone_lookup.csv")

In [10]:
zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [12]:
joined_df = data.join(zones, col("PULocationID") == col("LocationID"), "left")

In [13]:
joined_df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- distance: float (nullable = true)
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [22]:
zone_counts = joined_df.groupBy("Zone").count()

In [23]:
most_frequent_zone = zone_counts.orderBy(col("count").desc()).first()

In [24]:
print("The most frequent pickup location zone is:", most_frequent_zone["Zone"])

The most frequent pickup location zone is: Crown Heights North
