In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("usmanshams/nyc-yellow-taxi-dataset-2024")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/usmanshams/nyc-yellow-taxi-dataset-2024?dataset_version_number=1...


100%|██████████| 537M/537M [00:05<00:00, 98.4MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/usmanshams/nyc-yellow-taxi-dataset-2024/versions/1


Batch Data Ingestion

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("ACADP-Ingestion") \
    .getOrCreate()

In [None]:
import os

# Construct the full path to the taxi_zone_lookup.csv file
csv_file_path = os.path.join(path, "nyc_yellow_taxi_dataset", "taxi_zone_lookup.csv")

# Load the CSV file into a Spark DataFrame
df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_file_path)

print("DataFrame loaded successfully.")
df.show(5)
df.printSchema()

DataFrame loaded successfully.
+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows
root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



Schema Inference & Validation

In [None]:
df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [None]:
# df = df.withColumn("fare_amount", df.fare_amount.cast("double"))
# df = df.withColumn("trip_distance", df.trip_distance.cast("double"))

# The columns 'fare_amount' and 'trip_distance' are not present in the current 'df'
# which was loaded from 'taxi_zone_lookup.csv'.
# If you intend to work with these columns, please load a different dataset
# (e.g., one of the yellow_tripdata_XXXX-XX.parquet files) into your DataFrame.

Let's explore the distribution of `Borough` in the `taxi_zone_lookup` DataFrame.

In [None]:
df.groupBy("Borough").count().orderBy("count", ascending=False).show()

+-------------+-----+
|      Borough|count|
+-------------+-----+
|       Queens|   69|
|    Manhattan|   69|
|     Brooklyn|   61|
|        Bronx|   43|
|Staten Island|   20|
|          EWR|    1|
|      Unknown|    1|
|          N/A|    1|
+-------------+-----+



Handle Missing Values

In [None]:
import os

for f in os.listdir(path):
    print(f)


nyc_yellow_taxi_dataset


In [None]:
df = spark.read \
    .parquet(f"{path}/nyc_yellow_taxi_dataset/yellow_tripdata_*.parquet")

In [None]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [None]:
df = df.dropna(subset=[
    "fare_amount",
    "trip_distance",
    "passenger_count"
])


In [None]:
df = df.fillna({"payment_type": "UNKNOWN"})

Encoding & Normalization

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
indexer = StringIndexer(
    inputCol="payment_type",
    outputCol="payment_type_idx"
)

df = indexer.fit(df).transform(df)

Normalization

In [None]:
from pyspark.sql.functions import col

In [None]:
df = df.withColumn(
    "fare_amount_norm",
    col("fare_amount") / 500.0
)

Feature Bounding

In [None]:
from pyspark.sql.functions import when

In [None]:
df = df.withColumn(
    "fare_amount",
    when(df.fare_amount < 0, 0)
    .when(df.fare_amount > 500, 500)
    .otherwise(df.fare_amount)
)

df = df.withColumn(
    "trip_distance",
    when(df.trip_distance < 0, 0)
    .when(df.trip_distance > 200, 200)
    .otherwise(df.trip_distance)
)

df = df.withColumn(
    "passenger_count",
    when(df.passenger_count < 1, 1)
    .when(df.passenger_count > 8, 8)
    .otherwise(df.passenger_count)
)

Feature Statistics Generation

In [None]:
df.select(
    "fare_amount",
    "trip_distance",
    "passenger_count"
).describe().show()

+-------+------------------+------------------+------------------+
|summary|       fare_amount|     trip_distance|   passenger_count|
+-------+------------------+------------------+------------------+
|  count|          30463713|          30463713|          30463713|
|   mean|19.530663900347527|3.4160863207972207|1.3426768759277636|
| stddev|18.778822761373913| 4.637725511074635|0.8068186061991545|
|    min|               0.0|               0.0|                 1|
|    max|             500.0|             200.0|                 8|
+-------+------------------+------------------+------------------+



In [None]:
df.groupBy("payment_type").count().show()

+------------+--------+
|payment_type|   count|
+------------+--------+
|           1|24999870|
|           3|  235960|
|           2| 4599611|
|           4|  628268|
|           5|       4|
+------------+--------+



Save Outputs

In [None]:
df.write.mode("overwrite").parquet("bounded_nyc_taxi")

Step 0 Load Required Libraries

In [None]:
import pandas as pd
import numpy as np
from itertools import combinations
import networkx as nx
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import KBinsDiscretizer


Step 1 Load a Sample (Scalability-Safe)*italicised text*

In [None]:
df = pd.read_parquet("/content/bounded_nyc_taxi")

# sample 1–5% or fixed rows
sample_df = df.sample(n=200_000, random_state=42)


Step 2 Initial Feature Filtering (Pruning)
 2.1 Remove constant features


In [None]:
nunique = sample_df.nunique()
valid_features = nunique[nunique > 1].index.tolist()
sample_df = sample_df[valid_features]


2.2 Remove low-variance numerical features

In [None]:
num_cols = sample_df.select_dtypes(include=np.number).columns
variances = sample_df[num_cols].var()

num_cols = variances[variances > 1e-4].index.tolist()


Step 3 Approximate Pearson Correlation (Numerical)
Compute only on filtered numeric features

In [None]:
pearson_corr = sample_df[num_cols].corr(method="pearson")


Keep only strong correlations

In [None]:
THRESH = 0.4

pearson_pairs = [
    (i, j, pearson_corr.loc[i, j])
    for i, j in combinations(num_cols, 2)
    if abs(pearson_corr.loc[i, j]) >= THRESH
]


Step 4 Approximate Mutual Information (Categorical)
4.1 Select categorical features

In [None]:
cat_cols = sample_df.select_dtypes(exclude=np.number).columns.tolist()


4.2 Discretize (for MI feasibility)

In [None]:
disc = KBinsDiscretizer(n_bins=10, encode="ordinal", strategy="uniform")

disc_df = sample_df[num_cols].copy()
disc_df[num_cols] = disc.fit_transform(disc_df[num_cols])


4.3 MI on sampled & pruned pairs only

In [None]:
MI_THRESH = 0.05
mi_pairs = []

for c1, c2 in combinations(cat_cols[:10], 2):  # cap for scalability
    mi = mutual_info_score(sample_df[c1], sample_df[c2])
    if mi >= MI_THRESH:
        mi_pairs.append((c1, c2, mi))


Step 5 Build Dependency Graph

In [None]:
G = nx.Graph()

# Add Pearson edges
for f1, f2, w in pearson_pairs:
    G.add_edge(f1, f2, weight=abs(w))

# Add MI edges
for f1, f2, w in mi_pairs:
    G.add_edge(f1, f2, weight=w)


Step 6 Graph-Based Feature Grouping
Connected components = feature blocks

In [None]:
feature_blocks = list(nx.connected_components(G))


Step 7 Save Outputs (Deliverables)

In [None]:
# Save dependency edges
edges_df = pd.DataFrame(
    [(u, v, d["weight"]) for u, v, d in G.edges(data=True)],
    columns=["feature_1", "feature_2", "strength"]
)
edges_df.to_csv("reduced_dependency_matrix.csv", index=False)

# Save blocks
blocks_df = pd.DataFrame({
    "block_id": range(len(feature_blocks)),
    "features": [list(b) for b in feature_blocks]
})
blocks_df.to_csv("feature_blocks.csv", index=False)
