In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
import urllib.request
import os
import gzip



In [9]:
pyspark.__file__

'/workspaces/zoomcampde_2024_aidi/spark/spark-3.3.2-bin-hadoop3/python/pyspark/__init__.py'

In [10]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Question 1

In [11]:
print(spark.version)


3.3.2


# Question 2

In [12]:
fhv_schema = StructType([
    StructField("dispatching_base_num", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropOff_datetime", TimestampType(), True),
    StructField("PUlocationID", IntegerType(), True),
    StructField("DOlocationID", IntegerType(), True),
    StructField("SR_Flag", StringType(), True),
    StructField("Affiliated_base_number", StringType(), True),
    StructField("Trip_type", IntegerType(), True),
    StructField("PULocation", StringType(), True),
    StructField("DOLocation", StringType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("fare_amount", FloatType(), True),
    StructField("extra", FloatType(), True),
    StructField("mta_tax", FloatType(), True),
    StructField("tip_amount", FloatType(), True),
    StructField("tolls_amount", FloatType(), True),
    StructField("improvement_surcharge", FloatType(), True),
    StructField("congestion_surcharge", FloatType(), True),
    StructField("total_amount", FloatType(), True),
    StructField("congestion_surcharge_flag", StringType(), True),
    StructField("record_type", StringType(), True)
])


In [13]:
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz"
compressed_file_path = "fhv_tripdata_2019-10.csv.gz"
decompressed_file_path = "fhv_tripdata_2019-10.csv"

urllib.request.urlretrieve(url, compressed_file_path)

with gzip.open(compressed_file_path, 'rb') as f_in:
    with open(decompressed_file_path, 'wb') as f_out:
        f_out.write(f_in.read())

os.remove(compressed_file_path)


In [14]:
fhv_df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv(decompressed_file_path)


In [15]:
fhv_df_repartitioned = fhv_df.repartition(6)


In [16]:
fhv_df_repartitioned.write.parquet("path/to/output/fhv_october_2019_repartitioned")


[Stage 0:>                                                          (0 + 2) / 2]

24/03/05 21:51:17 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 22
CSV file: file:///workspaces/zoomcampde_2024_aidi/notebooks/fhv_tripdata_2019-10.csv


                                                                                

In [17]:
ls -lh path/to/output/fhv_october_2019_repartitioned/*.parquet


-rw-r--r-- 1 codespace codespace 6.3M Mar  5 21:51 path/to/output/fhv_october_2019_repartitioned/part-00000-3710d6c6-f34e-4651-8c5a-0ce55ec2e9e7-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  5 21:51 path/to/output/fhv_october_2019_repartitioned/part-00001-3710d6c6-f34e-4651-8c5a-0ce55ec2e9e7-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  5 21:51 path/to/output/fhv_october_2019_repartitioned/part-00002-3710d6c6-f34e-4651-8c5a-0ce55ec2e9e7-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  5 21:51 path/to/output/fhv_october_2019_repartitioned/part-00003-3710d6c6-f34e-4651-8c5a-0ce55ec2e9e7-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  5 21:51 path/to/output/fhv_october_2019_repartitioned/part-00004-3710d6c6-f34e-4651-8c5a-0ce55ec2e9e7-c000.snappy.parquet
-rw-r--r-- 1 codespace codespace 6.3M Mar  5 21:51 path/to/output/fhv_october_2019_repartitioned/part-00005-3710d6c6-f34e-4651-8c5a-0ce55ec2e9e7-c000.snappy.parquet


# Question 3

In [19]:
from pyspark.sql.functions import dayofmonth

fhv_df_filtered = fhv_df.filter(dayofmonth("pickup_datetime") == 15)
fhv_df_filtered_count = fhv_df_filtered.count()
print(fhv_df_filtered_count)



[Stage 3:>                                                          (0 + 2) / 2]

62610


                                                                                

In [20]:
fhv_df.show(100)


24/03/05 21:58:52 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 22
CSV file: file:///workspaces/zoomcampde_2024_aidi/notebooks/fhv_tripdata_2019-10.csv
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+---------+----------+----------+------------+-------------+-----------+-----+-------+----------+------------+---------------------+--------------------+------------+-------------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|Trip_type|PULocation|DOLocation|payment_type|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|congestion_surcharge|total_amount|congestion_surcharge_flag|record_type|
+--------------------+-------------------+-------------------+------------+------------+-------+------------------

In [21]:
fhv_df_filtered.show(100)


24/03/05 21:59:18 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 22
CSV file: file:///workspaces/zoomcampde_2024_aidi/notebooks/fhv_tripdata_2019-10.csv


[Stage 7:>                                                          (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+---------+----------+----------+------------+-------------+-----------+-----+-------+----------+------------+---------------------+--------------------+------------+-------------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|Trip_type|PULocation|DOLocation|payment_type|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|congestion_surcharge|total_amount|congestion_surcharge_flag|record_type|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+---------+----------+----------+------------+-------------+-----------+-----+-------+----------+------------+---------------------+--------------------+------------+-------------------------+-----------+
|              B00009|2019-10-1

                                                                                

# Question 4

In [23]:
from pyspark.sql.functions import col, max

# Calculate the duration of each trip in seconds
fhv_df = fhv_df.withColumn("trip_duration_seconds", (col("dropOff_datetime").cast("long") - col("pickup_datetime").cast("long")) )

# Find the maximum trip duration in the dataset
max_trip_duration_seconds = fhv_df.agg(max("trip_duration_seconds")).collect()[0][0]

# Convert the maximum trip duration to hours
max_trip_duration_hours = max_trip_duration_seconds / 3600
print(f"The length of the longest trip in the dataset is: {max_trip_duration_hours:.2f} hours")


[Stage 11:>                                                         (0 + 2) / 2]

The length of the longest trip in the dataset is: 631152.50 hours


                                                                                

# Question 5

In [29]:
from pyspark.sql import functions as F


In [31]:
import urllib.request

# Download the zone lookup data
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"
urllib.request.urlretrieve(url, "taxi_zone_lookup.csv")

# Read the zone lookup data into a DataFrame
zone_lookup_df = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)

# Register the zone lookup DataFrame as a temp view
zone_lookup_df.createOrReplaceTempView("zone_lookup")

# Read the FHV October 2019 data into a DataFrame (if not already done)
fhv_df = spark.read.parquet("/workspaces/zoomcampde_2024_aidi/notebooks/path/to/output/fhv_october_2019_repartitioned")

# Join the two DataFrames using the PUlocationID column in the FHV DataFrame and the LocationID column in the zone lookup DataFrame
joined_df = fhv_df.join(zone_lookup_df, fhv_df.PUlocationID == zone_lookup_df.LocationID, "left_outer")

# Count the number of occurrences for each pickup location zone
pickup_zone_counts = joined_df.groupBy("Zone").count()





In [32]:

# Find the least frequent pickup location zone
least_frequent_pickup_zone = pickup_zone_counts.orderBy(F.asc(pickup_zone_counts.count)).first()



# Print the name of the least frequent pickup location zone
print(f"The name of the least frequent pickup location Zone is: {least_frequent_pickup_zone.Zone}")

AttributeError: 'function' object has no attribute '_get_object_id'

In [6]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-05 21:45:31--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240305T214532Z&X-Amz-Expires=300&X-Amz-Signature=8057ea32ea95e972a6123672ebedf4a71784a23ecad779f5246185287b6ba86c&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-05 21:45:32--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [7]:
# read in a subset of the data
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz'
df_csv = pd.read_csv(url, nrows=10000)

NameError: name 'pd' is not defined

In [None]:
!gzip -dc fhv_tripdata_2019-10.csv.gz

In [None]:
!wc -l fhv_tripdata_2019-10.csv

In [None]:
!head taxi+_zone_lookup.csv


In [None]:
df = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [None]:
df.show()

In [None]:
df.write.parquet('zones')


In [None]:
!ls -lh
