In [2]:
import pandas as pd
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import udf
from io import StringIO

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Define URL
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz"

# Download CSV from URL
response = requests.get(url)
assert response.status_code == 200, 'Failed to download data'


# Use pandas to read compressed CSV from URL
data = pd.read_csv(url, compression='gzip')



In [3]:
df = spark.createDataFrame(data)

In [4]:
spark.version

'3.5.0'

In [5]:
df.repartition(6).write.mode("overwrite").parquet("fhv.parquet")

24/03/02 10:30:17 WARN TaskSetManager: Stage 0 contains a task of very large size (18148 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [7]:
df.show(5)

24/03/02 10:30:46 WARN TaskSetManager: Stage 3 contains a task of very large size (18148 KiB). The maximum recommended task size is 1000 KiB.
24/03/02 10:30:50 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 3 (TID 14): Attempting to kill Python Worker
                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|       264.0|       264.0|    NaN|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|       264.0|       264.0|    NaN|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|       264.0|       264.0|    NaN|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|       264.0|       264.0|    NaN|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|       264.0|       264.0|    NaN|                B00014|
+--------------------+------------------

In [12]:
from pyspark.sql.functions import col, to_date

filtered_df = df.filter(col('pickup_datetime').cast('date') == '2019-10-15')

print(filtered_df.count())


24/03/02 10:33:43 WARN TaskSetManager: Stage 7 contains a task of very large size (18148 KiB). The maximum recommended task size is 1000 KiB.


62610


                                                                                

In [14]:
from pyspark.sql.functions import col, unix_timestamp

# Calculate trip duration in seconds
df = df.withColumn('trip_duration', 
                   (unix_timestamp('dropoff_datetime') - unix_timestamp('pickup_datetime'))/ 3600)

# Fetch longest trips
longest_trips = df.orderBy(col('trip_duration').desc())

# Show the longest trips
longest_trips.show()


24/03/02 12:05:58 WARN TaskSetManager: Stage 11 contains a task of very large size (18148 KiB). The maximum recommended task size is 1000 KiB.
[Stage 11:>                                                         (0 + 8) / 8]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|     trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|       264.0|       264.0|    NaN|                B02832|          631152.5|
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|       264.0|       264.0|    NaN|                B02832|          631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|         NaN|         NaN|    NaN|                B02416| 87672.44083333333|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|       159.0|       264.0|    NaN|       B00746         | 70128.02805555555|
|              B02921|2019-

                                                                                

In [16]:
# Download CSV from URL
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"
response = requests.get(url)
assert response.status_code == 200, 'Failed to download data'

# Use pandas to read CSV from URL
data_zone = pd.read_csv(StringIO(response.content.decode('utf-8')))

# Convert pandas DataFrame to Spark DataFrame
df_zone = spark.createDataFrame(data_zone)

# Create temporary tables
df.createOrReplaceTempView("trips")
df_zone.createOrReplaceTempView("zones")

# SQL query to find out the least frequent pickup location
query = """
SELECT zones.Zone, COUNT(trips.pickup_datetime) as count
FROM trips
JOIN zones ON trips.PULocationID = zones.LocationID
GROUP BY zones.Zone
ORDER BY count ASC
"""

# Execute the query
least_frequent_pickup_location = spark.sql(query)

# Show the result
least_frequent_pickup_location.show()


24/03/02 12:10:03 WARN TaskSetManager: Stage 21 contains a task of very large size (18148 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows

