In [11]:
#!conda install pyspark
#!conda install findspark
#!conda install kagglehub

In [12]:
import findspark
findspark.init()

from pyspark.sql import SparkSession


# Initialize Spark session
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()

# Test with a simple DataFrame
data = [("John", 30), ("Alice", 25), ("Bob", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

+-----+---+
| Name|Age|
+-----+---+
| John| 30|
|Alice| 25|
|  Bob| 35|
+-----+---+



In [13]:
# Download specific files from the dataset
import kagglehub
path = "/Users/rzhang/.cache/kagglehub/datasets/microize/newyork-yellow-taxi-trip-data-2020-2019/versions/25"
# Download the file package, size = 10GB
#path = kagglehub.dataset_download(
#    "microize/newyork-yellow-taxi-trip-data-2020-2019"
#)
print("Path to dataset files:", path)


Path to dataset files: /Users/rzhang/.cache/kagglehub/datasets/microize/newyork-yellow-taxi-trip-data-2020-2019/versions/25


In [14]:
df = spark.read.csv(path + "/yellow_tripdata_2019-06.csv", header=True, inferSchema=True)
df.head(5)

                                                                                

[Row(VendorID=1, tpep_pickup_datetime='2019-06-01 00:55:13', tpep_dropoff_datetime='2019-06-01 00:56:17', passenger_count=1, trip_distance=0.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=145, DOLocationID=145, payment_type=2, fare_amount=3.0, extra=0.5, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=4.3, congestion_surcharge=0.0),
 Row(VendorID=1, tpep_pickup_datetime='2019-06-01 00:06:31', tpep_dropoff_datetime='2019-06-01 00:06:52', passenger_count=1, trip_distance=0.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=262, DOLocationID=263, payment_type=2, fare_amount=2.5, extra=3.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=6.3, congestion_surcharge=2.5),
 Row(VendorID=1, tpep_pickup_datetime='2019-06-01 00:17:05', tpep_dropoff_datetime='2019-06-01 00:36:38', passenger_count=1, trip_distance=4.4, RatecodeID=1, store_and_fwd_flag='N', PULocationID=74, DOLocationID=7, payment_type=2, fare_am

In [15]:
#print all columns
df.columns



['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

 Column Name:
 
 1. 'VendorID'  TPEP Provider ID 1 or 2, not used in this project
 2. 'tpep_pickup_datetime'  Pickup date and time
 3. 'tpep_dropoff_datetime'  Dropoff date and time
 4. 'passenger_count'  Number of passengers
 5. 'trip_distance'  Trip distance in miles
 6. 'RatecodeID'  Rate code ID: 1=Standard rate, 2=JFK, 3=Newark, 4=Nassau or Westchester, 5=Negotiated fare, 6=Group ride
 7. 'store_and_fwd_flag'  Store and forward flag, not used in this project
 8. 'PULocationID'  Pickup location ID
 9. 'DOLocationID'  Dropoff location ID
 10. 'payment_type'  Payment type: 1=Credit card, 2=Cash, 3=No charge, 4=Dispute, 5=Unknown, 6=Voided trip
 11. 'fare_amount'  Fare amount
 12. 'extra'  Extra charges
 13. 'mta_tax'  MTA tax
 14. 'tip_amount'  Tip amount
 15. 'tolls_amount'  Tolls amount
 16. 'improvement_surcharge'  Improvement surcharge
 17. 'total_amount'  Total amount
 18. 'congestion_surcharge'  Congestion surcharge

In [16]:
# Print the schema (column names and types)
df.printSchema()

# Get number of rows
print("Number of rows:", df.count())

# Show basic statistics for numeric columns
df.describe().show()

# Show the first few rows
df.show(5)

# Get column names
print("Columns:", df.columns)

# Get basic data types of columns
for col in df.columns:
    print(f"{col}: {df.schema[col].dataType}")


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

Number of rows: 6941024




+-------+------------------+--------------------+---------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+---------------------+------------------+--------------------+
|summary|          VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|   passenger_count|     trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|
+-------+------------------+--------------------+---------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+----------

                                                                                

In [17]:
# Drop specified columns
df_cleaned = df.drop(
    'VendorID',
    'store_and_fwd_flag',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'congestion_surcharge'
)

# Verify the remaining columns
print("Remaining columns:", df_cleaned.columns)

# Show a few rows of the cleaned dataset
df_cleaned.show(5)

Remaining columns: ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type', 'total_amount']
+--------------------+---------------------+---------------+-------------+----------+------------+------------+------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|PULocationID|DOLocationID|payment_type|total_amount|
+--------------------+---------------------+---------------+-------------+----------+------------+------------+------------+------------+
| 2019-06-01 00:55:13|  2019-06-01 00:56:17|              1|          0.0|         1|         145|         145|           2|         4.3|
| 2019-06-01 00:06:31|  2019-06-01 00:06:52|              1|          0.0|         1|         262|         263|           2|         6.3|
| 2019-06-01 00:17:05|  2019-06-01 00:36:38|              1|          4.4|         1|          74|           7|           2|   

In [18]:
# for better machine learning result, replace pullocationID and dolocationID with location name

df_zone_name = spark.read.csv('taxi+_zone_lookup.csv', header=True, inferSchema=True)
df_zone_name = df_zone_name.drop('Zone', 'service_zone')
df_zone_name.show(5)


+----------+-------------+
|LocationID|      Borough|
+----------+-------------+
|         1|          EWR|
|         2|       Queens|
|         3|        Bronx|
|         4|    Manhattan|
|         5|Staten Island|
+----------+-------------+
only showing top 5 rows



In [19]:
# First, let's rename the join columns in df_zone_name to avoid confusion
#df_zone_lookup_pu = df_zone_name.withColumnRenamed("LocationID", "PULocationID") \
#                            .withColumnRenamed("Borough", "PULocation")  
# Join for pickup locations
#df_with_pu = df_cleaned.join(df_zone_lookup_pu, on="PULocationID", how="left")

# Prepare for dropoff location join
#df_zone_lookup_do = df_zone_name.withColumnRenamed("LocationID", "DOLocationID") \
#                                .withColumnRenamed("Borough", "DOLocation")

# Join for dropoff locations
#df_final = df_with_pu.join(df_zone_lookup_do, on="DOLocationID", how="left")

# Drop the original ID columns if you don't need them anymore
#df_final = df_final.drop("PULocationID", "DOLocationID")

# Show the result
#df_final.show(5)

In [20]:


# 1. First approach: Using RDD operations (closer to traditional MapReduce)
# Convert location mapping to dictionary
location_dict = dict(df_zone_name.select("LocationID", "Borough").rdd.collect())

# Create broadcast variable for the mapping
location_broadcast = spark.sparkContext.broadcast(location_dict)

# Define map functions
def map_locations(row):
    locations = location_broadcast.value
    # Map both pickup and dropoff locations
    pu_location = locations.get(row.PULocationID, "Unknown")
    do_location = locations.get(row.DOLocationID, "Unknown")
    
    return (
        row.tpep_pickup_datetime,
        row.tpep_dropoff_datetime,
        row.passenger_count,
        row.trip_distance,
        row.RatecodeID,
        pu_location,  # Replaced PULocationID
        do_location,  # Replaced DOLocationID
        row.payment_type,
        row.total_amount
    )

# Apply the transformation

# 1. Convert DataFrame to RDD
rdd = df_cleaned.rdd

# 2. Apply the mapping function
mapped_rdd = rdd.map(map_locations)

# 3. Define the schema (column names for the new DataFrame)
new_column_names = [
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'passenger_count',
    'trip_distance',
    'RatecodeID',
    'pickup_location',
    'dropoff_location',
    'payment_type',
    'total_amount'
]

# 4. Convert back to DataFrame with the new schema
df_mapped = mapped_rdd.toDF(new_column_names)

# Show results
df_mapped.show(5)

+--------------------+---------------------+---------------+-------------+----------+---------------+----------------+------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|pickup_location|dropoff_location|payment_type|total_amount|
+--------------------+---------------------+---------------+-------------+----------+---------------+----------------+------------+------------+
| 2019-06-01 00:55:13|  2019-06-01 00:56:17|              1|          0.0|         1|         Queens|          Queens|           2|         4.3|
| 2019-06-01 00:06:31|  2019-06-01 00:06:52|              1|          0.0|         1|      Manhattan|       Manhattan|           2|         6.3|
| 2019-06-01 00:17:05|  2019-06-01 00:36:38|              1|          4.4|         1|      Manhattan|          Queens|           2|        18.8|
| 2019-06-01 00:59:02|  2019-06-01 00:59:12|              0|          0.8|         1|         Queens|          Queens|           2

In [21]:
from pyspark.sql.functions import hour, to_date

# Create new DataFrame with extracted date and hour
df_time_processed = df_mapped \
    .withColumn('pickup_date', to_date('tpep_pickup_datetime')) \
    .withColumn('pickup_hour', hour('tpep_pickup_datetime')) \
    .withColumn('dropoff_date', to_date('tpep_dropoff_datetime')) \
    .withColumn('dropoff_hour', hour('tpep_dropoff_datetime')) \
    .drop('tpep_pickup_datetime', 'tpep_dropoff_datetime')

# Reorder columns for better readability
columns_order = [
    'pickup_date',
    'pickup_hour',
    'dropoff_date',
    'dropoff_hour',
    'passenger_count',
    'trip_distance',
    'RatecodeID',
    'pickup_location',
    'dropoff_location',
    'payment_type',
    'total_amount'
]

df_time_processed = df_time_processed.select(columns_order)

# Show the result
df_time_processed.show(5)

+-----------+-----------+------------+------------+---------------+-------------+----------+---------------+----------------+------------+------------+
|pickup_date|pickup_hour|dropoff_date|dropoff_hour|passenger_count|trip_distance|RatecodeID|pickup_location|dropoff_location|payment_type|total_amount|
+-----------+-----------+------------+------------+---------------+-------------+----------+---------------+----------------+------------+------------+
| 2019-06-01|          0|  2019-06-01|           0|              1|          0.0|         1|         Queens|          Queens|           2|         4.3|
| 2019-06-01|          0|  2019-06-01|           0|              1|          0.0|         1|      Manhattan|       Manhattan|           2|         6.3|
| 2019-06-01|          0|  2019-06-01|           0|              1|          4.4|         1|      Manhattan|          Queens|           2|        18.8|
| 2019-06-01|          0|  2019-06-01|           0|              0|          0.8|       

In [22]:
from pyspark.sql.functions import date_format, dayofweek

# Add day of week (1 = Sunday, 7 = Saturday)
df_with_dow = df_time_processed \
    .withColumn('pickup_day_of_week', date_format('pickup_date', 'EEEE')) \
    .withColumn('dropoff_day_of_week', date_format('dropoff_date', 'EEEE'))



# Reorder columns
columns_order = [
    'pickup_date',
    'pickup_day_of_week',
    'pickup_hour',
    'dropoff_date',
    'dropoff_day_of_week',
    'dropoff_hour',
    'passenger_count',
    'trip_distance',
    'RatecodeID',
    'pickup_location',
    'dropoff_location',
    'payment_type',
    'total_amount'
]

df_with_dow = df_with_dow.select(columns_order)

# Show the result
df_with_dow.show(5)

+-----------+------------------+-----------+------------+-------------------+------------+---------------+-------------+----------+---------------+----------------+------------+------------+
|pickup_date|pickup_day_of_week|pickup_hour|dropoff_date|dropoff_day_of_week|dropoff_hour|passenger_count|trip_distance|RatecodeID|pickup_location|dropoff_location|payment_type|total_amount|
+-----------+------------------+-----------+------------+-------------------+------------+---------------+-------------+----------+---------------+----------------+------------+------------+
| 2019-06-01|          Saturday|          0|  2019-06-01|           Saturday|           0|              1|          0.0|         1|         Queens|          Queens|           2|         4.3|
| 2019-06-01|          Saturday|          0|  2019-06-01|           Saturday|           0|              1|          0.0|         1|      Manhattan|       Manhattan|           2|         6.3|
| 2019-06-01|          Saturday|          0| 

In [23]:
# 1. Convert DataFrame to RDD and map to (location_pair, 1) format
location_pairs_rdd = df_with_dow.rdd.map(
    lambda row: (
        (row.pickup_location, row.dropoff_location), 
        1
    )
)

# 2. Reduce by key (location pair) to sum frequencies
location_pairs_count = location_pairs_rdd.reduceByKey(lambda x, y: x + y)

# 3. Sort by frequency in descending order
# FIXED: Changed sortByKey to sortBy and added proper sorting function
sorted_pairs = location_pairs_count.sortBy(lambda x: x[1], ascending=False)

# 4. Convert back to DataFrame for better display
result_df = sorted_pairs.toDF(['location_pair', 'frequency'])

# 5. Show top results
print("Most common pickup-dropoff location pairs:")
result_df.show(10, truncate=False)

# Optional: Get total number of unique pairs
print("\nTotal unique location pairs:", result_df.count())

                                                                                

Most common pickup-dropoff location pairs:
+----------------------+---------+
|location_pair         |frequency|
+----------------------+---------+
|{Manhattan, Manhattan}|5808188  |
|{Queens, Manhattan}   |264493   |
|{Manhattan, Queens}   |223738   |
|{Manhattan, Brooklyn} |169540   |
|{Queens, Queens}      |159312   |
|{Queens, Brooklyn}    |64623    |
|{Brooklyn, Brooklyn}  |52725    |
|{Unknown, Unknown}    |51228    |
|{Manhattan, Bronx}    |33403    |
|{Brooklyn, Manhattan} |25142    |
+----------------------+---------+
only showing top 10 rows


Total unique location pairs: 46


In [24]:
spark.stop()