In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [None]:
import pyspark
from pyspark.sql import SparkSession
import os
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType


In [None]:


spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [None]:
df = spark.read.option("header", "true").csv('fhv_tripdata_2019-10.csv.gz')


In [None]:
df.head(1)

In [None]:
df.show()

In [None]:
df_part = df.repartition(6)


In [None]:
df_part.show()

In [None]:
# Assuming 'df_part' is your repartitioned DataFrame
parquet_path = "file://" + os.getcwd() + "/your_file_name.parquet"

# Save DataFrame to Parquet file
df_part.write.parquet(parquet_path)


In [None]:


# Filter records for the 15th of October
records_on_15th_oct = df.filter(F.col('pickup_datetime').cast('date') == '2019-10-15')

# Count the number of records
count_records_on_15th_oct = records_on_15th_oct.count()

print(f"Number of records on 15th October: {count_records_on_15th_oct}")


In [None]:
df.printSchema()


In [None]:
df = df.withColumn("pickup_datetime", F.to_timestamp("pickup_datetime", "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("dropOff_datetime", F.to_timestamp("dropOff_datetime", "yyyy-MM-dd HH:mm:ss"))

# Calculate trip duration in seconds
df = df.withColumn("trip_duration_seconds", (F.col("dropOff_datetime").cast("long") - F.col("pickup_datetime").cast("long")))

# Group by date and find the maximum trip duration for each day
max_trip_duration_per_day = df.groupBy(F.date_format("pickup_datetime", "yyyy-MM-dd").alias("trip_date")).agg(
    F.max("trip_duration_seconds").alias("max_trip_duration_seconds")
)

# Convert seconds to hours
max_trip_duration_per_day = max_trip_duration_per_day.withColumn(
    "max_trip_duration_hours", F.col("max_trip_duration_seconds") / 3600.0
)

# Show or print the information about the longest trip for each day
max_trip_duration_per_day.show(truncate=False)

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [None]:
zone = spark.read.option("header", "true").csv('taxi_zone_lookup.csv')


In [None]:
zone.show()

In [None]:
joined_df = df.join(zone, df.PUlocationID == zone.LocationID, "left_outer")

# Group by zone name and count occurrences
zone_counts = joined_df.groupBy("Zone").agg(F.count("*").alias("pickup_count"))

# Find the zone with the minimum count (least frequent)
least_frequent_zone = zone_counts.orderBy("pickup_count").first()

# Extract the name of the least frequent pickup location zone
least_frequent_zone_name = least_frequent_zone["Zone"]

print(f"The name of the least frequent pickup location zone is: {least_frequent_zone_name}")