# Prerequisites
Sign up to https://ngrok.com/ to be able to reach Spark UI

In [1]:
%%capture
!pip install pyspark
!pip install findspark
!pip install pyngrok

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .appName('testColab') \
        .getOrCreate()



In [6]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-03 14:27:29--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T142729Z&X-Amz-Expires=300&X-Amz-Signature=32440c93b26d30b9e5b8570e2f6dcf1b7c7e7b05f3450067671d3d5d08901917&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-03 14:27:29--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [7]:
df = spark.read \
    .option("header", "true") \
    .csv('/content/fhv_tripdata_2019-10.csv.gz')

In [8]:
df

DataFrame[dispatching_base_num: string, pickup_datetime: string, dropOff_datetime: string, PUlocationID: string, DOlocationID: string, SR_Flag: string, Affiliated_base_number: string]

In [12]:
from pyspark.sql import types

In [15]:
schema= types.StructType(
    [types.StructField("dispatching_base_num",types.StringType(),True),
    types.StructField("pickup_datetime",types.TimestampType(),True),
    types.StructField("dropOff_datetime",types.TimestampType(),True),
    types.StructField("PUlocationID",types.IntegerType(),True),
    types.StructField("DOlocationID",types.IntegerType(),True),
    types.StructField("Affiliated_base_number",types.StringType(),True)]

)

In [16]:
df = spark.read \
    .option("header", "true") \
    .schema(schema)\
    .csv('/content/fhv_tripdata_2019-10.csv.gz')

df = df.repartition(6)

df.write.parquet('test1')

In [27]:
import os
import glob

# Specify the path to the folder containing Parquet files
folder_path = "/content/test1"

# Use glob to get a list of Parquet files in the folder
parquet_files = glob.glob(os.path.join(folder_path, "*.parquet"))

# Calculate the total size of Parquet files in bytes
total_size_bytes = sum(os.path.getsize(file) for file in parquet_files)

# Calculate the average size in megabytes
average_size_mb = total_size_bytes / (1024 * 1024 * len(parquet_files))

# Print the result
print(f"Total size of Parquet files: {total_size_bytes} bytes")
print(f"Number of Parquet files: {len(parquet_files)}")
print(f"Average size of Parquet files: {average_size_mb:.2f} MB")

Total size of Parquet files: 37531730 bytes
Number of Parquet files: 6
Average size of Parquet files: 5.97 MB


In [30]:
parquet_df = spark.read.parquet('test1')

# Filter the DataFrame for trips on the 15th of October
october_15_trips = parquet_df.filter(parquet_df.pickup_datetime.cast("date") == "2019-10-15")

# Count the number of trips on the 15th of October
number_of_trips = october_15_trips.count()

# Print the result
print(f"The number of taxi trips on the 15th of October is: {number_of_trips}")

The number of taxi trips on the 15th of October is: 62610


In [35]:
from pyspark.sql.functions import col, unix_timestamp
# Read the Parquet files
parquet_df = spark.read.parquet('test1')

# Calculate the trip duration in seconds
duration_seconds = (unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime")))

# Create a new DataFrame with the calculated duration column
df_with_duration = parquet_df.withColumn("trip_duration_seconds", duration_seconds)

# Find the maximum duration
max_duration_seconds = df_with_duration.agg({"trip_duration_seconds": "max"}).collect()[0][0]

# Convert the maximum duration to hours
max_duration_hours = max_duration_seconds / 3600

# Print the result
print(f"The length of the longest trip in the dataset is: {max_duration_hours:.2f} Hours")

The length of the longest trip in the dataset is: 631152.50 Hours


In [41]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
# Read the Taxi Zone Lookup data from the provided URL
taxi_zone_lookup_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])

taxi_zone_lookup_df = spark.read.option("header", "true").schema(taxi_zone_lookup_schema).csv('/content/taxi_zone_lookup.csv')

# Create or replace a temporary view for the Taxi Zone Lookup data
taxi_zone_lookup_df.createOrReplaceTempView("taxi_zone_lookup_temp_view")

# Example query to show the data in the temporary view
result = spark.sql("SELECT * FROM taxi_zone_lookup_temp_view LIMIT 5")
result.show()



--2024-03-03 14:56:57--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240303%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240303T145657Z&X-Amz-Expires=300&X-Amz-Signature=fa40e8430b2f3bd2d2f642fa1c664903c4a89d1a0313ab8bac27006fea6ef827&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-03 14:56:57--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [48]:
# Perform the join with the Taxi Zone Lookup data
merged_df = parquet_df.join(taxi_zone_lookup_df, parquet_df["PULocationID"] == taxi_zone_lookup_df["LocationID"], "inner")

# Calculate the frequency of each pickup location zone
zone_frequencies = merged_df.groupBy("Zone").count().orderBy("count")

# Extract the name of the least frequent pickup location zone
least_frequent_zone_name = zone_frequencies.first()["Zone"]

# Print the result
print(f"The name of the least frequent pickup location zone is: {least_frequent_zone_name}")


The name of the least frequent pickup location zone is: Jamaica Bay


In [49]:
zone_frequencies.show()

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows



# Start a tunnel to access SparkUI

Open a ngrok tunnel to the HTTP server

In [4]:
from pyngrok import ngrok, conf
import getpass

print("Enter your authtoken, which can be copied "
"from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken


KeyboardInterrupt: Interrupted by user

## Download Yellow Taxi Trip records data, read it in with Spark, and count the number of rows

Data source: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [None]:
from pyspark import SparkFiles

file_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-07.parquet'
spark.sparkContext.addFile(file_url)

df = spark.read.csv(SparkFiles.get('yellow_tripdata_2023-07.parquet'), header=True)

df.count()

377471

In [None]:
# Optionally put the tunnel down
# ngrok.disconnect(public_url)