# Urban Mobility Pattern Analysis

## Project: Analysis of Urban Mobility Patterns using Apache Spark

This notebook implements a complete data analysis pipeline for understanding urban mobility patterns from taxi trip data.

### Objectives:
- Apply Apache Spark for processing and analyzing urban mobility data
- Identify mobility patterns (peak hours, busiest zones, average trip duration, etc.)
- Develop a complete data analysis workflow: ingestion, cleaning, transformation, analysis, and visualization


## 1. Setup and Configuration


In [None]:
# Import required libraries
import sys
import os
from pathlib import Path
import logging

# Add src directory to path
project_root = Path().resolve().parent
sys.path.insert(0, str(project_root))

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Import project modules
from src.config import *
from src.data_processing import *
from src.zone_mapping import *
from src.analysis import *
from src.mongodb_operations import *

print("Setup complete!")


In [None]:
# MongoDB Configuration
# TODO: Replace with your MongoDB Atlas connection string
# Format: mongodb+srv://username:password@cluster.mongodb.net/
MONGODB_URI = os.getenv("MONGODB_URI", "mongodb+srv://a271455_db_user:I3Sxtk54C3J5moXw@bigdatacluster.zagwkmh.mongodb.net/?appName=BigDataCluster")

# Update config if needed
if MONGODB_URI != "mongodb+srv://a271455_db_user:I3Sxtk54C3J5moXw@bigdatacluster.zagwkmh.mongodb.net/?appName=BigDataCluster":
    from src import config
    config.MONGODB_URI = MONGODB_URI
    print("MongoDB URI configured")
else:
    print("WARNING: Please update MongoDB URI before saving to MongoDB")


## 2. Initialize Spark Session


In [None]:
# Create Spark session
spark = create_spark_session()

# Display Spark version and configuration
print(f"Spark Version: {spark.version}")
print(f"Spark Master: {spark.sparkContext.master}")
print("Spark session initialized successfully!")


## 3. Data Ingestion and Initial Exploration


In [None]:
# Load taxi data
taxi_data_path = str(TAXI_DATA_DIR)
print(f"Loading data from: {taxi_data_path}")

df_raw = load_taxi_data(spark, taxi_data_path)

# Display schema
print("\n=== Data Schema ===")
df_raw.printSchema()


In [None]:
# Display basic statistics
print(f"Total records: {df_raw.count():,}")
print(f"Total columns: {len(df_raw.columns)}")
print(f"\nColumn names: {df_raw.columns}")

# Show sample data
print("\n=== Sample Data (first 5 rows) ===")
df_raw.show(5, truncate=False)


In [None]:
# Check for null values in key columns
from pyspark.sql.functions import col, isnan, isnull, when, count

print("=== Null Value Analysis ===")
null_counts = df_raw.select([count(when(isnull(c) | isnan(c), c)).alias(c) for c in df_raw.columns])
null_counts.show(vertical=True)


## 4. Data Cleaning and Preprocessing


In [None]:
# Standardize column names and clean data
print("Starting data preprocessing...")

df_cleaned = preprocess_taxi_data(spark, taxi_data_path)

print(f"\nPreprocessing complete!")
print(f"Final record count: {df_cleaned.count():,}")


In [None]:
# Display cleaned data schema
print("=== Cleaned Data Schema ===")
df_cleaned.printSchema()

# Show sample of cleaned data
print("\n=== Sample Cleaned Data ===")
df_cleaned.select(
    "pickup_datetime", "dropoff_datetime", "trip_duration_min",
    "hour_of_day", "day_of_week", "day_name",
    "trip_distance_km", "fare_amount", "passenger_count"
).show(10, truncate=False)


## 5. Data Enrichment - Zone Assignment


In [None]:
# Load zones dataset
zones_path = str(ZONES_DATA_PATH)
print(f"Loading zones from: {zones_path}")

try:
    zones_df = load_zones(spark, zones_path)
    print(f"Loaded {zones_df.count()} zones")
    zones_df.show(truncate=False)
except Exception as e:
    print(f"Error loading zones file: {e}")
    print("Creating zones from data...")
    zones_df = create_zones_from_data(df_cleaned, num_zones=20)
    zones_df.show(truncate=False)


In [None]:
# Enrich trips with zone information
print("Enriching trips with zone information...")

df_enriched = enrich_with_zones(df_cleaned, zones_df)

print("Zone enrichment complete!")
print(f"Records with pickup zone: {df_enriched.filter(col('pickup_zone_name').isNotNull()).count():,}")
print(f"Records with dropoff zone: {df_enriched.filter(col('dropoff_zone_name').isNotNull()).count():,}")


In [None]:
# Show sample of enriched data
print("=== Sample Enriched Data ===")
df_enriched.select(
    "pickup_datetime", "hour_of_day", "day_name",
    "pickup_zone_name", "dropoff_zone_name",
    "trip_duration_min", "trip_distance_km", "fare_amount"
).show(10, truncate=False)


## 6. Exploratory Data Analysis

### 6.1 Demand by Hour of Day


In [None]:
# Analyze demand by hour
hourly_demand = analyze_demand_by_hour(df_enriched)

if hourly_demand:
    print("=== Hourly Demand Analysis ===")
    hourly_demand.show(24, truncate=False)
    
    # Find peak hours
    from pyspark.sql.functions import desc
    peak_hour = hourly_demand.orderBy(desc("total_trips")).first()
    print(f"\nPeak hour: {peak_hour['hour_of_day']}:00 with {peak_hour['total_trips']:,} trips")
else:
    print("Hourly analysis not available (missing hour_of_day column)")


### 6.2 Demand by Day of Week


In [None]:
# Analyze demand by day of week
daily_demand = analyze_demand_by_day(df_enriched)

if daily_demand:
    print("=== Daily Demand Analysis ===")
    daily_demand.show(truncate=False)
    
    # Compare weekdays vs weekends
    from pyspark.sql.functions import sum as spark_sum
    weekday_weekend = daily_demand.groupBy("is_weekend").agg(
        spark_sum("total_trips").alias("total_trips"),
        spark_sum("total_revenue").alias("total_revenue")
    )
    print("\n=== Weekday vs Weekend Comparison ===")
    weekday_weekend.show(truncate=False)
else:
    print("Daily analysis not available (missing day_of_week column)")


### 6.3 Zone Activity Analysis


In [None]:
# Analyze zone activity
zone_results = analyze_zone_activity(df_enriched)

if zone_results.get("top_origin_zones"):
    print("=== Top 10 Origin Zones ===")
    zone_results["top_origin_zones"].show(truncate=False)

if zone_results.get("top_destination_zones"):
    print("\n=== Top 10 Destination Zones ===")
    zone_results["top_destination_zones"].show(truncate=False)

if zone_results.get("combined_zone_activity"):
    print("\n=== Top 20 Zones by Total Activity ===")
    zone_results["combined_zone_activity"].show(truncate=False)


### 6.4 Trip Duration and Distance Analysis


In [None]:
# Analyze trip duration and distance
duration_distance_stats = analyze_trip_duration_distance(df_enriched)

for stat_type, stats_df in duration_distance_stats:
    print(f"\n=== Duration and Distance Statistics ({stat_type}) ===")
    stats_df.show(truncate=False)


### 6.5 Revenue and Payment Type Analysis


In [None]:
# Analyze revenue by payment type
revenue_analysis = analyze_revenue_payment(df_enriched)

if revenue_analysis:
    print("=== Revenue Analysis by Payment Type ===")
    revenue_analysis.show(truncate=False)
    
    # Calculate total revenue
    from pyspark.sql.functions import sum as spark_sum
    total_revenue = revenue_analysis.agg(spark_sum("total_revenue").alias("total_revenue")).collect()[0]["total_revenue"]
    print(f"\nTotal Revenue: ${total_revenue:,.2f}")
else:
    print("Revenue analysis not available (missing payment_type column)")


## 7. Data Storage - MongoDB


In [None]:
# Prepare all analysis results for MongoDB
analysis_results = {
    "trips_by_hour": hourly_demand,
    "trips_by_day": daily_demand,
    "zones_activity": zone_results.get("combined_zone_activity"),
    "revenue_analysis": revenue_analysis
}

# Save to MongoDB (only if URI is configured)
if MONGODB_URI and MONGODB_URI != "mongodb+srv://username:password@cluster.mongodb.net/":
    try:
        print("Saving analysis results to MongoDB...")
        save_analysis_results(
            analysis_results,
            MONGODB_URI,
            MONGODB_DATABASE,
            MONGODB_COLLECTIONS
        )
        print("\nSuccessfully saved all results to MongoDB!")
    except Exception as e:
        print(f"\nError saving to MongoDB: {e}")
        print("Please check your MongoDB connection string and network access.")
else:
    print("MongoDB URI not configured. Skipping MongoDB storage.")
    print("Please update MONGODB_URI in the configuration cell to save to MongoDB.")


## 8. Export Data for Power BI


In [None]:
# Ensure output directory exists
OUTPUT_DIR.mkdir(exist_ok=True)

# Export all analysis results to CSV for Power BI
print("Exporting data for Power BI...")

if hourly_demand:
    export_for_powerbi(hourly_demand, str(OUTPUT_DIR / "hourly_demand.csv"))
    print("✓ Exported hourly_demand.csv")

if daily_demand:
    export_for_powerbi(daily_demand, str(OUTPUT_DIR / "daily_demand.csv"))
    print("✓ Exported daily_demand.csv")

if zone_results.get("combined_zone_activity"):
    export_for_powerbi(
        zone_results["combined_zone_activity"],
        str(OUTPUT_DIR / "zone_activity.csv")
    )
    print("✓ Exported zone_activity.csv")

if revenue_analysis:
    export_for_powerbi(revenue_analysis, str(OUTPUT_DIR / "revenue_analysis.csv"))
    print("✓ Exported revenue_analysis.csv")

print("\nAll exports complete! Files are ready for Power BI import.")


## 9. Summary and Key Findings


In [None]:
# Generate summary statistics
print("=== PROJECT SUMMARY ===\n")

print(f"Total trips analyzed: {df_enriched.count():,}")

if hourly_demand:
    from pyspark.sql.functions import desc
    peak = hourly_demand.orderBy(desc("total_trips")).first()
    print(f"Peak hour: {peak['hour_of_day']}:00 ({peak['total_trips']:,} trips)")

if daily_demand:
    busiest_day = daily_demand.orderBy(desc("total_trips")).first()
    print(f"Busiest day: {busiest_day['day_name']} ({busiest_day['total_trips']:,} trips)")

if zone_results.get("combined_zone_activity"):
    top_zone = zone_results["combined_zone_activity"].first()
    print(f"Most active zone: {top_zone['zone_name']} ({top_zone['total_activity']:,} trips)")

if revenue_analysis:
    from pyspark.sql.functions import sum as spark_sum
    total_rev = revenue_analysis.agg(spark_sum("total_revenue").alias("total")).collect()[0]["total"]
    print(f"Total revenue: ${total_rev:,.2f}")

print("\n=== Analysis Complete ===")
print("\nNext steps:")
print("1. Review exported CSV files in the output/ directory")
print("2. Import data into Power BI for visualization")
print("3. Connect to MongoDB collections for real-time dashboards")
print("4. Review project documentation in docs/ folder")


## 10. Cleanup


In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped.")
