# NYC Taxi Analysis - Pro Databricks Project

This notebook demonstrates ETL, Delta Lake, SQL queries, MLlib modeling, and visualizations in Databricks using NYC taxi data.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, hour
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

spark = SparkSession.builder.appName('NYC Taxi Analysis').getOrCreate()

# Load dataset into Spark DataFrame
sample_path = 'data/taxi_trips.csv'  # <-- Make sure this file exists in /data
df = spark.read.csv(sample_path, header=True, inferSchema=True)

print('Sample Data:')
df.show(5)

# Also load into Pandas for local charting
pdf = pd.read_csv(sample_path, parse_dates=['pickup_datetime', 'dropoff_datetime'], dayfirst=True)
print(pdf.head())

In [None]:
# OPTIONAL: Download large dataset from TLC (Parquet format)
"""
import urllib.request
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
urllib.request.urlretrieve(url, "data/yellow_tripdata_2024-01.parquet")
df_big = spark.read.parquet("data/yellow_tripdata_2024-01.parquet")
df_big.show(5)
"""

In [None]:
# Convert datetime columns in Spark
df = df.withColumn('pickup_datetime', to_timestamp(col('pickup_datetime')))
df = df.withColumn('dropoff_datetime', to_timestamp(col('dropoff_datetime')))

# Add trip duration in minutes
df = df.withColumn('trip_duration_min', 
                   (col('dropoff_datetime').cast('long') - col('pickup_datetime').cast('long')) / 60)

df.printSchema()
df.show(5)

In [None]:
# Save as Delta table
delta_path = 'data/delta/nyc_taxi'
df.write.format('delta').mode('overwrite').save(delta_path)

# Read Delta table
df_delta = spark.read.format('delta').load(delta_path)
df_delta.show(5)

In [None]:
# Register SQL table
df_delta.createOrReplaceTempView('nyc_taxi')

# Example SQL query
avg_fare_by_location = spark.sql('''
SELECT pickup_location, ROUND(AVG(fare_amount), 2) AS avg_fare
FROM nyc_taxi
GROUP BY pickup_location
ORDER BY avg_fare DESC
''')

avg_fare_by_location.show()

In [None]:
# Visualization in Pandas
pdf_chart = avg_fare_by_location.toPandas()
plt.figure(figsize=(8, 5))
sns.barplot(x='avg_fare', y='pickup_location', data=pdf_chart)
plt.title('Average Fare by Pickup Location')
plt.show()

In [None]:
# MLlib - Predict fare_amount
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

features = ['passenger_count', 'trip_distance', 'trip_duration_min']
assembler = VectorAssembler(inputCols=features, outputCol='features')
df_ml = assembler.transform(df_delta).select('features', 'fare_amount')

train, test = df_ml.randomSplit([0.8, 0.2], seed=42)
lr = LinearRegression(featuresCol='features', labelCol='fare_amount')
model = lr.fit(train)
predictions = model.transform(test)

predictions.select('features', 'fare_amount', 'prediction').show(5)