In [0]:
df_nyc_taxi = spark.read.parquet(
    "/FileStore/tables/yellow_tripdata_2016_01.parquet",
    "/FileStore/tables/yellow_tripdata_2016_02.parquet",
    "/FileStore/tables/yellow_tripdata_2016_03.parquet"
)

In [0]:
df_nyc_taxi.display()
df_nyc_taxi.printSchema()

In [0]:
df_zone_lookup=spark.read.format('csv').option("header",True).option('inferSchema',True).load('/FileStore/tables/taxi_zone_lookup.csv')

In [0]:
df_zone_lookup.display()
df_zone_lookup.printSchema()

In [0]:
df_zone_lookup_filled =  df_zone_lookup.replace("N/A", "Unknown").fillna("Unknown")
df_zone_lookup_filled.display()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
df_zone_lookup_PU=df_zone_lookup.select([col(c).alias(f"PU{c}") if c != "LocationID" else col(c).alias("PULocationID") for c in df_zone_lookup.columns])
df_zone_lookup_DO=df_zone_lookup.select([col(c).alias(f"DO{c}") if c != "LocationID" else col(c).alias("DOLocationID") for c in df_zone_lookup.columns])

In [0]:
# Join pickup location data
df_joined = df_nyc_taxi.join(df_zone_lookup_PU, on="PULocationID", how="left")

# Join dropoff location data
df_joined = df_joined.join(df_zone_lookup_DO, on="DOLocationID", how="left")

In [0]:
print("Total data:", df_joined.count())
df_joined.display()

In [0]:
%pip install missingno

In [0]:
import missingno as msno

import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
# Convert PySpark DataFrame to Pandas (be careful with large datasets)
df_pd = df_joined.limit(10000).toPandas()  # limit rows to avoid memory issues

# Plot missing values matrix
msno.matrix(df_pd, figsize=(20, 6))
plt.show()

In [0]:
# Plot heatmap of null correlations
msno.heatmap(df_pd, figsize=(10, 5))
plt.show()

In [0]:
columns_to_drop = [
    'congestion_surcharge','airport_fee'
]

df_joined = df_joined.drop(*columns_to_drop)

print("Total data:", df_joined.count())
# df_joined.display()

In [0]:
# Compute and plot Spearman correlation
plt.figure(figsize=(15, 8))
sns.heatmap(df_pd.corr(method='spearman'), vmin=-1, vmax=1, annot=True, cmap="coolwarm")
plt.title("Spearman Correlation Heatmap")
plt.show()

In [0]:
# Group by all columns and count occurrences

duplicate_groups = df_joined.groupBy(df_joined.columns) \
    .count() \
    .filter("count > 1")\
    .drop("count")

In [0]:
duplicate_groups.display()

In [0]:
df_joined = df_joined.dropDuplicates()

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

# Join duplicates with original DataFrame to get duplicate rows (excluding first)
duplicate_rows = df_joined.join(duplicate_groups,on=df_joined.columns, how='inner')

# Sum all numeric columns over duplicate rows
numeric_cols = [field.name for field in df_joined.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType, LongType, FloatType))]

sum_exprs = [spark_sum(col(c)).alias(c) for c in numeric_cols]

duplicate_sums = duplicate_rows.agg(*sum_exprs)

duplicate_sums.display()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, lit

# Define the subset columns used to detect duplicates
subset_cols = [
    "tpep_pickup_datetime", "tpep_dropoff_datetime", "RatecodeID",
    "PULocationID", "DOLocationID", "passenger_count", "trip_distance"
]

# Define a window partitioned by subset columns
window_spec = Window.partitionBy([col(c) for c in subset_cols])

# Add a count column that counts how many rows share the same values in subset_cols
df_with_dup_count = df_joined.withColumn("dup_count", count("*").over(window_spec))

# Filter rows where dup_count > 1 (i.e., duplicates including the first occurrence)
duplicate_value = df_with_dup_count.filter(col("dup_count") > 1).drop("dup_count")

duplicate_value.display()

In [0]:
# Filter negative fare_amount values from duplicates
dup_negatif_val = duplicate_value.filter(col("fare_amount") < 0)

dup_negatif_val.display()

In [0]:
# joins the two DataFrames and keeps only those rows from df_joined that do not exist in dup_negatif_val.
df_joined = df_joined.join(dup_negatif_val, on=df_joined.columns, how='left_anti')

df_joined.display()

In [0]:
# from pyspark.sql.window import Window
# from pyspark.sql.functions import row_number

# # Define subset columns for duplicate detection
# subset_cols = [
#     "tpep_pickup_datetime", "tpep_dropoff_datetime", "RatecodeID",
#     "PULocationID", "DOLocationID", "passenger_count", "trip_distance"
# ]

# # Define a window partitioned by the subset columns and ordered arbitrarily
# window_spec = Window.partitionBy(*subset_cols).orderBy("tpep_pickup_datetime")

# # Add a row number to identify duplicates
# df_with_row_num = df_joined.withColumn("row_num", row_number().over(window_spec))

# # Filter to get only the duplicate entries (i.e., row number > 1)
# df_duplicates_only = df_with_row_num.filter("row_num > 1")

# # df_duplicates_only.display()

## Negative Values

In [0]:
from pyspark.sql.functions import col, when, count, lit
from pyspark.sql.types import NumericType

# Get numeric columns
numeric_cols = [field.name for field in df_joined.schema.fields if isinstance(field.dataType, NumericType)]

# Total number of rows
total_rows = df_joined.count()

# Create a list to collect results
neg_percent_list = []

# Loop through numeric columns and compute % of negative values
for col_name in numeric_cols:
    neg_count = df_joined.filter(col(col_name) < 0).count()
    neg_percent = __builtins__.round((neg_count / total_rows) * 100, 3)
    neg_percent_list.append((col_name, neg_percent))

# Create a DataFrame from the result
neg_percent_df = spark.createDataFrame(neg_percent_list, ["feature", "neg_value(%)"])

neg_percent_df.display()

In [0]:
# Replace negative fare_amounts with their absolute value
df_joined = df_joined.withColumn(
    "fare_amount",
    when(col("fare_amount") < 0, abs(col("fare_amount"))).otherwise(col("fare_amount"))
)

## Zero Value

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import NumericType

# Get numeric columns with their data types
numeric_fields = [(f.name, f.dataType.simpleString()) for f in df_joined.schema.fields if isinstance(f.dataType, NumericType)]

# Total number of rows
total_rows = df_joined.count()

# List to collect results
zero_value_stats = []

# Loop through numeric columns and calculate % of zero values
for col_name, dtype in numeric_fields:
    zero_count = df_joined.filter(col(col_name) == 0).count()
    zero_percent = __builtins__.round((zero_count / total_rows) * 100, 3)
    zero_value_stats.append((col_name, dtype, zero_percent))

# Create a DataFrame from results
zero_value_df = spark.createDataFrame(zero_value_stats, ["feature", "data_type", "0_value(%)"])

# Show result
zero_value_df.display()

In [0]:
from pyspark.sql.functions import col, expr

# Step 1: Calculate median (approximate) for passenger_count
median_passenger_count = df_joined.approxQuantile("passenger_count", [0.5], 0.01)[0]

# Step 2: Replace passenger_count = 0 with median
df_joined = df_joined.withColumn(
    "passenger_count",
    expr(f"CASE WHEN passenger_count = 0 THEN {median_passenger_count} ELSE passenger_count END")
)

# Step 3: Filter rows with trip_distance > 0
df_joined = df_joined.filter(col("trip_distance") > 0)

# Step 4: Filter rows with fare_amount > 0
df_joined = df_joined.filter(col("fare_amount") > 0)

## Missing Values

In [0]:
from pyspark.sql.functions import col, count, when, isnan
from pyspark.sql.types import DoubleType, FloatType, NumericType

# Total number of rows
total_rows = df_joined.count()

null_summary = []

for column, dtype in df_joined.dtypes:
    # Check if column is numeric (so we can safely apply isnan)
    is_numeric = dtype in ['double', 'float']

    # Apply isnan only to numeric columns
    if is_numeric:
        null_count = df_joined.filter(col(column).isNull() | isnan(col(column))).count()
    else:
        null_count = df_joined.filter(col(column).isNull()).count()

    null_percent = __builtins__.round((null_count / total_rows) * 100, 3)
    
    null_summary.append((column, dtype, null_percent))

# Create summary DataFrame
null_summary_df = spark.createDataFrame(null_summary, ["feature", "data_type", "null_value(%)"])
null_summary_df.display()

### Handling RateCodeID

In [0]:
df_joined.groupBy("RatecodeID").count().orderBy("count", ascending=False).display()
# df_joined.groupBy("RatecodeID").count().orderBy("count", ascending=False)


In [0]:
df_joined = df_joined.replace("N/A", "Unknown")

In [0]:
df_joined.filter(col("RatecodeID") == 4) \
    .groupBy("PUBorough", "DOBorough") \
    .count() \
    .orderBy("count", ascending=False) \
    .display()

In [0]:
# Apply conditional updates to RatecodeID
df_joined = df_joined.withColumn(
    "RatecodeID",
    when(
        (col("RatecodeID") == 4) &
        (col("PUBorough") != "Unknown") &
        (col("DOBorough") == "EWR"),
        3
    ).when(
        (col("RatecodeID") == 4) &
        (col("PUBorough") != "Unknown") &
        (col("DOBorough") != "Unknown"),
        1
    ).otherwise(col("RatecodeID"))
)

In [0]:

# Remove rows where either PUBorough or DOBorough is 'Unknown'
df_joined = df_joined.filter(
    (col("PUBorough") != "Unknown") & (col("DOBorough") != "Unknown")
)

In [0]:
# Step 1: Replace 99 with null in RatecodeID
df_joined = df_joined.withColumn(
    "RatecodeID",
    when(col("RatecodeID") == 99, None).otherwise(col("RatecodeID"))
)

# Step 2: Update RatecodeID based on custom conditions
df_joined = df_joined.withColumn(
    "RatecodeID",
    when(
        (col("RatecodeID").isNull()) &
        (col("PUBorough") == "Manhattan") &
        (col("DOZone") == "JFK Airport"),
        2
    ).when(
        (col("RatecodeID").isNull()) &
        (col("PUZone") == "JFK Airport") &
        (col("DOBorough") == "Manhattan"),
        2
    ).when(
        (col("RatecodeID").isNull()) &
        (col("DOZone") == "Newark Airport"),
        3
    ).otherwise(col("RatecodeID"))
)

In [0]:

# Fill nulls in RatecodeID with 1
df_joined = df_joined.withColumn(
    "RatecodeID",
    when(col("RatecodeID").isNull(), 1).otherwise(col("RatecodeID"))
)

# Show value counts for RatecodeID
df_joined.groupBy("RatecodeID").count().orderBy("RatecodeID").show()

# Count total remaining nulls in RatecodeID (should be 0)
null_count = df_joined.filter(col("RatecodeID").isNull()).count()
print("Total NaN values: ", null_count)

#### Filling missing values in the passenger count with the median value of that feature.

In [0]:
# Step 1: Calculate median of passenger_count
median_value = df_joined.approxQuantile("passenger_count", [0.5], 0.0)[0]

# Step 2: Fill nulls with median value
df_joined = df_joined.withColumn(
    "passenger_count",
    when(col("passenger_count").isNull(), median_value).otherwise(col("passenger_count"))
)

# Step 3: Show value counts for passenger_count
df_joined.groupBy("passenger_count").count().orderBy("passenger_count").display()

# Step 4: Count remaining nulls in passenger_count
null_count = df_joined.filter(col("passenger_count").isNull()).count()
print("Total NaN values: ", null_count)

#### Handling Payment Type

In [0]:
# 1. Count of each payment_type (excluding nulls)
df_joined.groupBy("payment_type").count().orderBy("count", ascending=False).show()

# 2. Count of null values in payment_type
null_count = df_joined.filter(col("payment_type").isNull()).count()
print("Total NaN values:", null_count)

In [0]:
df_joined.filter(col("tip_amount") > 0) \
    .groupBy("payment_type") \
    .count() \
    .orderBy("count", ascending=False) \
    .display()

In [0]:
# Step 1: Update payment_type from 2 to 1 where tip_amount > 0
df_joined = df_joined.withColumn(
    "payment_type",
    when((col("tip_amount") > 0) & (col("payment_type") == 2), 1).otherwise(col("payment_type"))
)

# Step 2: Count of payment_type where tip_amount > 0
df_joined.filter(col("tip_amount") > 0) \
    .groupBy("payment_type") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

In [0]:
df_joined = df_joined.withColumn(
    "payment_type",
    when(isnull(col("payment_type")), when(col("tip_amount") > 0, 1.0).otherwise(5.0))
    .otherwise(col("payment_type"))
)

df_joined.groupBy("payment_type").count().orderBy("count", ascending=False).display()

In [0]:
print("Total data:", df_joined.count())
df_joined.display()

In [0]:
# Be careful with large datasets — consider limiting rows
df_pd = df_joined.limit(10000).toPandas()
msno.matrix(df_pd, figsize=(20, 6))
plt.show()

### Formatting and Outlier Handling


#### Pickup and Dropoff Time

In [0]:

df_joined = df_joined.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime"))
df_joined = df_joined.withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))

In [0]:

outliers = df_joined.filter(
    ~(
        (year(col("tpep_pickup_datetime")) == 2016) &
        (month(col("tpep_pickup_datetime")).isin([1, 2, 3]))
    )
)

outliers.display()

##### Adding columns according to hours(midnight,morning,evening) and Weekdays/Weekend

In [0]:
# Define pickup_time by binning hour of day into categorical time labels
df_joined = df_joined.withColumn(
    "pickup_hour",
    hour(col("tpep_pickup_datetime"))
)

df_joined = df_joined.withColumn(
    "pickup_time",
    when((col("pickup_hour") >= 0) & (col("pickup_hour") < 5), "Midnight")
    .when((col("pickup_hour") >= 5) & (col("pickup_hour") < 11), "Morning")
    .when((col("pickup_hour") >= 11) & (col("pickup_hour") < 15), "Noon")
    .when((col("pickup_hour") >= 15) & (col("pickup_hour") < 20), "Evening")
    .when((col("pickup_hour") >= 20) & (col("pickup_hour") < 24), "Night")
    .otherwise("Unknown")
)

# Create day_category column to mark 'Weekend' or 'Weekdays'
df_joined = df_joined.withColumn(
    "day_name",
    date_format(col("tpep_pickup_datetime"), "EEEE")  # full day name like Monday, Tuesday
)

df_joined = df_joined.withColumn(
    "day_category",
    when(col("day_name").isin("Saturday", "Sunday"), "Weekend").otherwise("Weekdays")
)

# Optional: Drop intermediate 'pickup_hour' and 'day_name' if you want
df_joined = df_joined.drop("pickup_hour", "day_name")

df_joined.display()

##### RateCodeID 

In [0]:
df_joined = df_joined.withColumn(
    "RatecodeID",
    when(col("RatecodeID") == 1, "Standard rate")
    .when(col("RatecodeID") == 2, "JFK Airport")
    .when(col("RatecodeID") == 3, "Newark Airport")
    .when(col("RatecodeID") == 4, "Nassau or Westchester")
    .when(col("RatecodeID") == 5, "Negotiated fare")
    .when(col("RatecodeID") == 6, "Group ride")
    .otherwise(col("RatecodeID"))  # keep original if none of above
)

# To count occurrences like value_counts():
df_joined.groupBy("RatecodeID").count().orderBy("count", ascending=False).display()

In [0]:
df_joined.groupBy("passenger_count").count().orderBy("count", ascending=False).display()

In [0]:
# Collect passenger_count as a Pandas DataFrame (or Series)
pdf = df_joined.select("passenger_count").toPandas()

plt.figure(figsize=(15, 3))
sns.set_style("darkgrid")

sns.boxplot(data=pdf, x='passenger_count', color='skyblue')  # You can replace 'skyblue' with your color[0]
plt.xlabel('Total Passenger')
plt.title('Passenger Distribution', fontsize=16)

plt.show()

###### The maximum amount of passengers allowed in a yellow taxicab by law is four (4) in a four (4) passenger taxicab or five (5) passengers in a five (5) passenger taxicab. All passengers must wear seat belts and children under the age of 4 must ride in child safety seats. Children under the age of 8 must ride in a child restraint system, such as a federally approved harness, vest, or booster-seat.

In [0]:
# Convert passenger_count to a string and group values >5 into '>5'
df_joined = df_joined.withColumn(
    "passenger_count",
    when(col("passenger_count") > 5, ">5").otherwise(col("passenger_count").cast("string"))
)

# Count the transformed values
df_joined.groupBy("passenger_count").count().orderBy("count", ascending=False).display()

#### Trip distance

In [0]:
df_sample = df_joined.select("trip_distance").sample(fraction=0.01).toPandas()

In [0]:
plt.figure(figsize=(15, 3))
sns.boxplot(data=df_sample, x='trip_distance', color='#1f77b4')  # You can replace with your color
plt.xlabel('Distance (mile)')
plt.title('Trip Distance Distribution', fontsize=16)
plt.show()

##### Based on the assumption that taxi trips covering a distance of less than 1 km (0.62 miles) are considered abnormal records or failed trips — such as possible cancellations by passengers or drivers — the dataset entries with such values will be dropped.

In [0]:
# Count number of rows with trip distance less than 0.62
count_short_trips = df_joined.filter(col("trip_distance") < 0.62).count()
print("Number of records with trip distance < 1 km (0.62 miles):", count_short_trips)

# Filter out short-distance trips
df_joined = df_joined.filter(col("trip_distance") > 0.62)

##### 	•	There are extreme outliers in the taxi trip distance data.
•	First, handle those extreme outliers before continuing.

In [0]:
# Filter for trip_distance > 50
df_joined.filter(col('trip_distance') > 50) \
  .select('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'trip_distance', 'fare_amount') \
  .orderBy(col('trip_distance').asc()) \
  .display()

#### 	•	It can be seen from the extreme outlier data above that there are distances greater than 120 miles.


In [0]:
# Filter rows where trip_distance > 50 and select columns
outliers = df_joined.filter(col("trip_distance") > 50) \
                    .select("PULocationID", "DOLocationID", "trip_distance")

# Count total outliers
total_outliers = outliers.count()
print("Total outliers:", total_outliers)

# Show first few rows
outliers.display()

#### We will try to replace the extreme outlier data above with the central tendency value (median) of trip_distance for each combination of pickup and dropoff locations in the existing dataset.

In [0]:

# Filter out extreme outliers (trip_distance < 50)
df_filtered = df_joined.filter(col('trip_distance') < 50)

# Compute count, average, and median of trip_distance grouped by PULocationID and DOLocationID
ct_distance_byid = df_filtered.groupBy('PULocationID', 'DOLocationID').agg(
    count('trip_distance').alias('count'),
    mean('trip_distance').alias('avg_distance'),
    expr('percentile_approx(trip_distance, 0.5)').alias('median_distance')
)


ct_distance_byid.orderBy(rand()).display()

In [0]:
# Join outliers with median distance table on PU and DO location IDs
distance_byid = outliers.join(
    ct_distance_byid.drop('count'), 
    on=['PULocationID', 'DOLocationID'],
    how='left'
)

In [0]:
# Join df_joined with distance_byid to get median values where needed
df_with_median = df_joined.join(
    distance_byid.select('PULocationID', 'DOLocationID', 'median_distance'),
    on=['PULocationID', 'DOLocationID'],
    how='left'
)

# Replace trip_distance with median_distance for outliers (where trip_distance > 50)
df_imputed = df_with_median.withColumn(
    'trip_distance',
    when(col('trip_distance') > 50, col('median_distance')).otherwise(col('trip_distance'))
).drop('median_distance')

In [0]:
df_imputed.select('PULocationID', 'DOLocationID', 'trip_distance').filter(col('trip_distance') <= 50).display()


In [0]:
df_joined = df_imputed
df_joined.display()

#### There are several entries that do not have matching Location ID records in the dataset, which will result in NaN values during imputation.
•	To handle this issue, a method similar to the previous one will be used. However, this time, the median value will be calculated based on Borough-level (region-level) instead of Location ID.

In [0]:
isna = df_joined.filter(col("trip_distance").isNull()) \
                .select("PUBorough", "DOBorough", "trip_distance")
isna.display()

In [0]:
# Filter data with trip_distance < 70
df_filtered = df_joined.filter(col("trip_distance") < 70)

# Group by PUBorough and DOBorough and compute count, mean, median
ct_distance_byborough = df_filtered.groupBy("PUBorough", "DOBorough").agg(
    count("trip_distance").alias("count"),
    avg("trip_distance").alias("avg_distance"),
    expr("percentile_approx(trip_distance, 0.5)").alias("median_distance")
)

# ct_distance_byborough.display()

In [0]:
# Step 2: Join with ct_distance_byborough to get median_distance for each borough pair
distance_byborough = isna.join(
    ct_distance_byborough.select("PUBorough", "DOBorough", "median_distance"),
    on=["PUBorough", "DOBorough"],
    how="left"
)

# Step 3: Perform the imputation (replace null trip_distance with median_distance)
df_with_median = df_joined.join(
    ct_distance_byborough.select("PUBorough", "DOBorough", "median_distance"),
    on=["PUBorough", "DOBorough"],
    how="left"
)

df_imputed = df_with_median.withColumn(
    "trip_distance",
    when(col("trip_distance").isNull(), expr("round(median_distance, 2)"))
    .otherwise(col("trip_distance"))
).drop("median_distance")  # Drop helper column

# Optional: Inspect newly imputed rows
# print("Input Results:")
# df_imputed.filter(col("trip_distance").isNotNull() & col("median_distance").isNotNull()) \
#           .select("PUBorough", "DOBorough", "trip_distance") \
#           .show(5)

In [0]:
df_joined = df_imputed
df_joined.display()

### Feature Creation adding driving speed

In [0]:

df_joined = df_joined.withColumn(
    "speed",
    round(col("trip_distance") / (col("trip_duration") / 60), 2)
)

In [0]:
df_joined.count()