In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, isnan, isnull, mean, stddev, percentile_approx,
    hour, dayofweek, month, year, date_format,
    regexp_replace, split, trim, abs as spark_abs,
    udf, lit, coalesce, round as spark_round
)
from pyspark.sql.types import IntegerType, DoubleType, StringType
from pyspark.ml.feature import Imputer, StandardScaler, VectorAssembler
from pyspark.ml.stat import Correlation
import numpy as np

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("SmartCityTransportPreprocessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print("=== SMART CITY TRANSPORT DATA PREPROCESSING ===")
print(f"Spark Version: {spark.version}")

=== SMART CITY TRANSPORT DATA PREPROCESSING ===
Spark Version: 3.5.1


In [None]:
# Load the transportation dataset
print("\n1. LOADING DATASET...")
df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Big_Data_public_transport_dataset_v2.csv")

print(f"Original dataset shape: {df.count()} rows, {len(df.columns)} columns")
print("Column names:", df.columns)


1. LOADING DATASET...
Original dataset shape: 30000 rows, 10 columns
Column names: ['Route ID', 'Stop ID', 'Timestamp', 'Passenger Count', 'Latitude', 'Longitude', 'Delay Logs', 'Fare Data', 'delay_minutes', 'fare_amount']


In [None]:
# Display basic statistics
print("\n2. INITIAL DATA OVERVIEW...")
df.describe().show()


2. INITIAL DATA OVERVIEW...
+-------+--------+-------+-----------------+------------------+-------------------+------------------+-----------------+-----------------+------------------+
|summary|Route ID|Stop ID|  Passenger Count|          Latitude|          Longitude|        Delay Logs|        Fare Data|    delay_minutes|       fare_amount|
+-------+--------+-------+-----------------+------------------+-------------------+------------------+-----------------+-----------------+------------------+
|  count|   30000|  30000|            28516|             28500|              28500|             28514|            28516|            30000|             30000|
|   mean|    NULL|   NULL|20.39931968017955|28.001348159719303|  85.40057991242078| 10.53008171424572|54.43123790152904|7.478633333333334|18.739316666666667|
| stddev|    NULL|   NULL|100.9575101980892|0.2883255381825588|0.11553323310202271|102.05187071718426|26.97355662174156|4.612546583514189|2.3062732917570963|
|    min|    R001|   S0

In [2]:
from pyspark.sql import SparkSession

# Initialize SparkSession with HDFS
spark = SparkSession.builder \
    .appName("SmartCity_Transport_Analysis") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()


In [None]:
from pyspark.sql.functions import col, isnan
from pyspark.sql.types import StringType, DoubleType, FloatType

print("\n3. MISSING VALUES ANALYSIS...")
missing_counts = []

for column in df.columns:
    data_type = [field.dataType for field in df.schema.fields if field.name == column][0]

    # Start with NULL check
    condition = col(column).isNull()

    # Add isnan() check for numeric types only
    if isinstance(data_type, (DoubleType, FloatType)):
        condition = condition | isnan(col(column))

    # Add empty string check for StringType only
    if isinstance(data_type, StringType):
        condition = condition | (col(column) == "")

    # Count and percentage
    missing_count = df.filter(condition).count()
    missing_percentage = (missing_count / df.count()) * 100
    missing_counts.append((column, missing_count, missing_percentage))

    print(f"{column}: {missing_count} missing ({missing_percentage:.2f}%)")



3. MISSING VALUES ANALYSIS...
Route ID: 0 missing (0.00%)
Stop ID: 0 missing (0.00%)
Timestamp: 0 missing (0.00%)
Passenger Count: 1484 missing (4.95%)
Latitude: 1500 missing (5.00%)
Longitude: 1500 missing (5.00%)
Delay Logs: 1486 missing (4.95%)
Fare Data: 1484 missing (4.95%)
delay_minutes: 0 missing (0.00%)
fare_amount: 0 missing (0.00%)



# PREPROCESSING TECHNIQUE 1: MISSING VALUE IMPUTATION
# ====================================================

In [None]:
from pyspark.sql.functions import col, when, last, percentile_approx
from pyspark.sql.window import Window

In [None]:
print("\n=== PREPROCESSING TECHNIQUE 1: MISSING VALUE IMPUTATION ===")

print("\nJustification:")
print("- Missing values in critical fields (Passenger Count, GPS coordinates, Fare Data)")
print("- Simple deletion would lose 5% of data, reducing analysis power")
print("- Imputation preserves dataset integrity while maintaining statistical properties")


=== PREPROCESSING TECHNIQUE 1: MISSING VALUE IMPUTATION ===

Justification:
- Missing values in critical fields (Passenger Count, GPS coordinates, Fare Data)
- Simple deletion would lose 5% of data, reducing analysis power
- Imputation preserves dataset integrity while maintaining statistical properties


In [None]:
# Strategy 1: Median imputation for numerical fields
print("\nImplementing median imputation for numerical fields...")



Implementing median imputation for numerical fields...


In [None]:

# Calculate medians for imputation
passenger_median = df.select(percentile_approx("Passenger Count", 0.5).alias("median")).collect()[0]["median"]
fare_median = df.select(percentile_approx("Fare Data", 0.5).alias("median")).collect()[0]["median"]
delay_median = df.select(percentile_approx("Delay Logs", 0.5).alias("median")).collect()[0]["median"]

print(f"Calculated medians - Passengers: {passenger_median}, Fare: {fare_median:.2f}, Delay: {delay_median:.2f}")

Calculated medians - Passengers: 10.0, Fare: 54.78, Delay: 0.06


In [None]:
# Apply imputation
df_imputed = df.fillna({
    "Passenger Count": passenger_median,
    "Fare Data": fare_median,
    "Delay Logs": delay_median
})

In [None]:
# Strategy  Forward-fill for GPS coordinates using route-based grouping
print("\nImplementing route-based GPS coordinate imputation...")



Implementing route-based GPS coordinate imputation...


In [None]:
# Create window specification for GPS imputation
from pyspark.sql.window import Window

window_spec = Window.partitionBy("Route ID").orderBy("Timestamp")

In [None]:
df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Big_Data_public_transport_dataset_v2.csv")

In [None]:
df_imputed = df

In [None]:
# Assuming 'df' is your original DataFrame
df_imputed = df

# Define your window specification
window_spec = Window.partitionBy("Route ID").orderBy("Timestamp")

# Now perform withColumn operations on df_imputed
df_imputed = df_imputed.withColumn(
    "Latitude",
    when(col("Latitude").isNull(), last(col("Latitude"), ignorenulls=True).over(window_spec))
)

In [None]:
# Verify imputation results
print("\nPost-imputation missing value check:")
for column in df_imputed.columns:
    missing_count = df_imputed.filter(col(column).isNull()).count()
    print(f"{column}: {missing_count} missing values remaining")


Post-imputation missing value check:
Route ID: 0 missing values remaining
Stop ID: 0 missing values remaining
Timestamp: 0 missing values remaining
Passenger Count: 1484 missing values remaining
Latitude: 28501 missing values remaining
Longitude: 1500 missing values remaining
Delay Logs: 1486 missing values remaining
Fare Data: 1484 missing values remaining
delay_minutes: 0 missing values remaining
fare_amount: 0 missing values remaining


# PREPROCESSING TECHNIQUE 2: DATA CONSISTENCY RESOLUTION


In [None]:
print("\n=== PREPROCESSING TECHNIQUE 2: DATA CONSISTENCY RESOLUTION ===")

print("\nJustification:")
print("- Identified 19,086 records with inconsistent delay measurements")
print("- Two delay fields (Delay Logs vs delay_minutes) showing major discrepancies")
print("- Consistent delay measurement critical for congestion analysis and predictions")


=== PREPROCESSING TECHNIQUE 2: DATA CONSISTENCY RESOLUTION ===

Justification:
- Identified 19,086 records with inconsistent delay measurements
- Two delay fields (Delay Logs vs delay_minutes) showing major discrepancies
- Consistent delay measurement critical for congestion analysis and predictions


In [None]:
from pyspark.sql import functions as F

print("\nAnalyzing delay field relationships...")
delay_comparison = df_imputed.select(
    "Delay Logs",
    "delay_minutes",
    (F.abs(col("Delay Logs") - col("delay_minutes"))).alias("delay_difference")
).filter(col("delay_difference") > 5)


Analyzing delay field relationships...


In [None]:
print(f"Records with delay difference > 5 minutes: {delay_comparison.count()}")

Records with delay difference > 5 minutes: 19086


In [None]:
# Strategy: Create unified delay field using business logic
print("\nImplementing unified delay field creation...")


Implementing unified delay field creation...


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

df_consistent = df_imputed.withColumn(
    "unified_delay",
    F.coalesce(
        col("delay_minutes"),
        F.round(col("Delay Logs")).cast(IntegerType())
    )
)

In [None]:
# Create delay categories for analysis
df_consistent = df_consistent.withColumn(
    "delay_category",
    when(col("unified_delay") < -2, "Early")
    .when((col("unified_delay") >= -2) & (col("unified_delay") <= 2), "On-Time")
    .when((col("unified_delay") > 2) & (col("unified_delay") <= 10), "Minor Delay")
    .when(col("unified_delay") > 10, "Major Delay")
    .otherwise("Unknown")
)


In [None]:
# Show delay distribution
print("\nDelay category distribution:")
df_consistent.groupBy("delay_category").count().orderBy("count", ascending=False).show()


Delay category distribution:
+--------------+-----+
|delay_category|count|
+--------------+-----+
|   Minor Delay|15063|
|   Major Delay| 9305|
|       On-Time| 5632|
+--------------+-----+



# PREPROCESSING TECHNIQUE 3: OUTLIER DETECTION AND TREATMENT

In [None]:
print("\n=== PREPROCESSING TECHNIQUE 3: OUTLIER DETECTION AND TREATMENT ===")

print("\nJustification:")
print("- Identified extreme passenger counts (300 records with invalid values)")
print("- Unrealistic fare amounts affecting revenue analysis")
print("- Outliers can skew statistical models and lead to incorrect insights")


=== PREPROCESSING TECHNIQUE 3: OUTLIER DETECTION AND TREATMENT ===

Justification:
- Identified extreme passenger counts (300 records with invalid values)
- Unrealistic fare amounts affecting revenue analysis
- Outliers can skew statistical models and lead to incorrect insights


In [None]:
# Calculate outlier boundaries using IQR method
print("\nCalculating outlier boundaries using IQR method...")


Calculating outlier boundaries using IQR method...


In [None]:
def calculate_outlier_bounds(df, column):
    """Calculate outlier bounds using IQR method"""
    quantiles = df.select(
        percentile_approx(column, 0.25).alias("q1"),
        percentile_approx(column, 0.75).alias("q3")
    ).collect()[0]

    q1, q3 = quantiles["q1"], quantiles["q3"]
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr

    return lower_bound, upper_bound, q1, q3


In [None]:
# Calculate bounds for passenger count
passenger_bounds = calculate_outlier_bounds(df_consistent, "Passenger Count")
print(f"Passenger Count bounds: Lower={passenger_bounds[0]:.2f}, Upper={passenger_bounds[1]:.2f}")

Passenger Count bounds: Lower=2.00, Upper=18.00


In [None]:
# Calculate bounds for fare data
fare_bounds = calculate_outlier_bounds(df_consistent, "Fare Data")
print(f"Fare Data bounds: Lower={fare_bounds[0]:.2f}, Upper={fare_bounds[1]:.2f}")


Fare Data bounds: Lower=-36.75, Upper=146.08


In [None]:
# Strategy: Cap outliers instead of removing them (preserves data volume)
print("\nImplementing outlier capping...")


Implementing outlier capping...


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit

df_clean = df_consistent.withColumn(
    "Passenger Count",
    F.when(col("Passenger Count") < 0, lit(0))
     .when(col("Passenger Count") > passenger_bounds[1], lit(passenger_bounds[1]))
     .otherwise(col("Passenger Count"))
).withColumn(
    "Fare Data",
    F.when(col("Fare Data") < fare_bounds[0], lit(fare_bounds[0]))
     .when(col("Fare Data") > fare_bounds[1], lit(fare_bounds[1]))
     .otherwise(col("Fare Data"))
)

In [None]:
# Count outliers handled
passenger_outliers_before = df_consistent.filter(
    (col("Passenger Count") < 0) | (col("Passenger Count") > passenger_bounds[1])
).count()

fare_outliers_before = df_consistent.filter(
    (col("Fare Data") < fare_bounds[0]) | (col("Fare Data") > fare_bounds[1])
).count()

print(f"Passenger outliers handled: {passenger_outliers_before}")
print(f"Fare outliers handled: {fare_outliers_before}")

Passenger outliers handled: 514
Fare outliers handled: 0


# FEATURE ENGINEERING AND DATA TRANSFORMATION

In [None]:
print("\n=== FEATURE ENGINEERING AND DATA TRANSFORMATION ===")

print("\nJustification:")
print("- Raw timestamp data needs temporal features for time-based analysis")
print("- Geographic coordinates need distance calculations for route optimization")
print("- Revenue and efficiency metrics needed for business intelligence")


=== FEATURE ENGINEERING AND DATA TRANSFORMATION ===

Justification:
- Raw timestamp data needs temporal features for time-based analysis
- Geographic coordinates need distance calculations for route optimization
- Revenue and efficiency metrics needed for business intelligence


In [None]:
from pyspark.sql.functions import hour, dayofweek, month, year, when, col

print("\nExtracting temporal features...")
df_featured = df_clean.withColumn("hour_of_day", hour("Timestamp")) \
    .withColumn("day_of_week", dayofweek("Timestamp")) \
    .withColumn("month", month("Timestamp")) \
    .withColumn("year", year("Timestamp")) \
    .withColumn(
        "time_period",
        when((col("hour_of_day") >= 6) & (col("hour_of_day") < 10), "Morning Rush")
        .when((col("hour_of_day") >= 10) & (col("hour_of_day") < 16), "Midday")
        .when((col("hour_of_day") >= 16) & (col("hour_of_day") < 20), "Evening Rush")
        .otherwise("Off-Peak")
    )


Extracting temporal features...


In [None]:
# Revenue and efficiency metrics
print("Creating business intelligence features...")
df_featured = df_featured.withColumn(
    "revenue_per_passenger",
    when(col("Passenger Count") > 0, col("Fare Data") / col("Passenger Count"))
    .otherwise(lit(0))
).withColumn(
    "efficiency_score",
    when(col("unified_delay") <= 2, col("Passenger Count") * 1.2)  # Bonus for on-time
    .otherwise(col("Passenger Count") * (1 - (col("unified_delay") * 0.1)))  # Penalty for delays
)

Creating business intelligence features...


In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Define the function
def classify_peak_hour(hour):
    """Classify hour as peak or off-peak"""
    if 7 <= hour <= 9 or 17 <= hour <= 19:
        return "Peak"
    else:
        return "Off-Peak"

# Register as UDF
classify_peak_hour_udf = udf(classify_peak_hour, StringType())

# Use in DataFrame
df_featured = df_featured.withColumn("peak_indicator", classify_peak_hour_udf(col("hour_of_day")))

# EXPLORATORY DATA ANALYSIS

In [None]:
print("\n=== EXPLORATORY DATA ANALYSIS ===")


=== EXPLORATORY DATA ANALYSIS ===


In [None]:
# Basic statistics after preprocessing
print("\nPost-preprocessing dataset statistics:")
df_featured.describe().show()



Post-preprocessing dataset statistics:
+-------+--------+-------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+-----------------+--------------+------------------+------------------+-----------------+--------------------+------------+---------------------+------------------+--------------+
|summary|Route ID|Stop ID|   Passenger Count|          Latitude|          Longitude|        Delay Logs|         Fare Data|    delay_minutes|       fare_amount|    unified_delay|delay_category|       hour_of_day|       day_of_week|            month|                year| time_period|revenue_per_passenger|  efficiency_score|peak_indicator|
+-------+--------+-------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------------+-----------------+--------------+------------------+------------------+-----------------+--------------------+------------+

In [None]:
# Temporal patterns analysis
print("\nTemporal patterns analysis:")
print("1. Passenger volume by time period:")
df_featured.groupBy("time_period").agg(
    {"Passenger Count": "avg", "unified_delay": "avg"}
).withColumnRenamed("avg(Passenger Count)", "avg_passengers") \
 .withColumnRenamed("avg(unified_delay)", "avg_delay") \
 .orderBy("avg_passengers", ascending=False).show()

print("2. Route performance analysis:")
route_performance = df_featured.groupBy("Route ID").agg(
    {"Passenger Count": "sum", "Fare Data": "sum", "unified_delay": "avg"}
).withColumnRenamed("sum(Passenger Count)", "total_passengers") \
 .withColumnRenamed("sum(Fare Data)", "total_revenue") \
 .withColumnRenamed("avg(unified_delay)", "avg_delay")

route_performance.show(10)

print("3. Delay pattern analysis:")
df_featured.groupBy("delay_category").count().orderBy("count", ascending=False).show()


Temporal patterns analysis:
1. Passenger volume by time period:
+------------+------------------+-----------------+
| time_period|    avg_passengers|        avg_delay|
+------------+------------------+-----------------+
|Evening Rush|10.104008438818566|7.455164131305044|
|      Midday|10.100214745884037|7.526750643369904|
|    Off-Peak| 10.05142241021813|7.474079342500395|
|Morning Rush| 10.01119560625264|7.442319307429032|
+------------+------------------+-----------------+

2. Route performance analysis:
+--------+-----------------+----------------+------------------+
|Route ID|    total_revenue|total_passengers|         avg_delay|
+--------+-----------------+----------------+------------------+
|    R018|76722.59999999999|         13932.0|7.4890260631001375|
|    R014|79612.09999999992|         14562.0| 7.509114583333333|
|    R007|79419.91000000012|         14404.0| 7.517857142857143|
|    R005|80362.92000000004|         14852.0| 7.776493256262042|
|    R017|74345.04999999997|    

In [None]:
# Display final schema
print("\nFinal dataset schema:")
df_featured.printSchema()


Final dataset schema:
root
 |-- Route ID: string (nullable = true)
 |-- Stop ID: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Passenger Count: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Delay Logs: double (nullable = true)
 |-- Fare Data: double (nullable = true)
 |-- delay_minutes: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- unified_delay: integer (nullable = true)
 |-- delay_category: string (nullable = false)
 |-- hour_of_day: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- time_period: string (nullable = false)
 |-- revenue_per_passenger: double (nullable = true)
 |-- efficiency_score: double (nullable = true)
 |-- peak_indicator: string (nullable = true)



In [None]:
print(f"\nFinal dataset: {df_featured.count()} rows, {len(df_featured.columns)} columns")
print("Preprocessing pipeline completed successfully!")


Final dataset: 30000 rows, 20 columns
Preprocessing pipeline completed successfully!


#**Spark** Operations Comparison

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, avg, sum as spark_sum, count, max as spark_max, min as spark_min,
    hour, dayofweek, stddev, percentile_approx, desc, asc,
    lag, lead, window, collect_list, expr, round as spark_round
)
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import time
import psutil
import os

In [None]:
# Initialize Spark Session with performance monitoring
spark = SparkSession.builder \
    .appName("SmartCitySparkComparison") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

In [None]:
# Set log level to reduce noise
spark.sparkContext.setLogLevel("WARN")

print("=== SPARK OPERATIONS COMPARISON FOR SMART CITY TRANSPORT ===")
print(f"Spark Version: {spark.version}")
print(f"Available Cores: {spark.sparkContext.defaultParallelism}")

=== SPARK OPERATIONS COMPARISON FOR SMART CITY TRANSPORT ===
Spark Version: 3.5.1
Available Cores: 2


In [None]:
# Load preprocessed dataset
df = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("/content/drive/MyDrive/Big_Data_public_transport_dataset_v2.csv")

In [None]:
# Basic preprocessing for consistency
df = df.fillna({
    "Passenger Count": 10,
    "Fare Data": 45.0,
    "Delay Logs": 0.0,
    "delay_minutes": 0
}).withColumn("unified_delay", col("delay_minutes")) \
  .withColumn("hour_of_day", hour("Timestamp")) \
  .withColumn("day_of_week", dayofweek("Timestamp"))

print(f"Dataset loaded: {df.count()} rows, {len(df.columns)} columns")

Dataset loaded: 30000 rows, 13 columns


In [None]:
# Performance monitoring utility
class PerformanceMonitor:
    def __init__(self, operation_name):
        self.operation_name = operation_name
        self.start_time = None
        self.start_memory = None

    def __enter__(self):
        self.start_time = time.time()
        self.start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        end_time = time.time()
        end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024

        execution_time = end_time - self.start_time
        memory_usage = end_memory - self.start_memory

        print(f"[{self.operation_name}] Execution Time: {execution_time:.3f}s, Memory Delta: {memory_usage:.2f}MB")
        return execution_time, memory_usage

# ANALYSIS 1: ROUTE CONGESTION/TRAVEL TIME PREDICTION

In [None]:
print("\n" + "="*80)
print("ANALYSIS 1: ROUTE CONGESTION/TRAVEL TIME PREDICTION")
print("="*80)


ANALYSIS 1: ROUTE CONGESTION/TRAVEL TIME PREDICTION


In [None]:
# Create congestion indicator based on passenger density and delays
df_congestion = df.withColumn(
    "congestion_score",
    (col("Passenger Count") * 0.6 + col("unified_delay") * 0.4)
).withColumn(
    "congestion_level",
    when(col("congestion_score") <= 8, "Low")
    .when((col("congestion_score") > 8) & (col("congestion_score") <= 15), "Medium")
    .otherwise("High")
)

In [None]:
# Cache for performance comparison
df_congestion.cache()

print("\n--- RDD APPROACH: Route Congestion Analysis ---")
with PerformanceMonitor("RDD Congestion Analysis") as rdd_perf:
    # Convert to RDD and perform operations
    rdd_data = df_congestion.rdd


--- RDD APPROACH: Route Congestion Analysis ---
[RDD Congestion Analysis] Execution Time: 1.537s, Memory Delta: 0.20MB


In [None]:
congestion_rdd = rdd_data.map(lambda row: (
    row['Route ID'],
    (row['congestion_score'], row['Passenger Count'], row['unified_delay'], 1)
))

In [None]:
# Reduce by route to get aggregated metrics
route_congestion_rdd = congestion_rdd.reduceByKey(lambda a, b: (
    a[0] + b[0],  # Sum congestion scores
    a[1] + b[1],  # Sum passengers
    a[2] + b[2],  # Sum delays
    a[3] + b[3]   # Count records
))

In [None]:
route_congestion_results_rdd = route_congestion_rdd.map(lambda x: {
    'route': x[0],
    'avg_congestion': x[1][0] / x[1][3] if x[1][3] != 0 else None,
    'total_passengers': x[1][1],
    'avg_delay': x[1][2] / x[1][3] if x[1][3] != 0 else None,
    'trip_count': x[1][3]
}).collect()

In [None]:
# Sort by congestion score
sorted_routes = sorted(route_congestion_results_rdd, key=lambda x: x['avg_congestion'], reverse=True)

print("RDD Results - Top 5 Most Congested Routes:")
for i, route in enumerate(sorted_routes[:5]):
    print(f"{i+1}. Route {route['route']}: Congestion={route['avg_congestion']:.2f}, "
          f"Passengers={route['total_passengers']}, Avg Delay={route['avg_delay']:.2f}min")

RDD Results - Top 5 Most Congested Routes:
1. Route R018: Congestion=17.50, Passengers=35247.0, Avg Delay=7.49min
2. Route R019: Congestion=17.37, Passengers=35817.0, Avg Delay=7.50min
3. Route R013: Congestion=16.19, Passengers=34468.0, Avg Delay=7.43min
4. Route R020: Congestion=15.91, Passengers=31645.0, Avg Delay=7.48min
5. Route R003: Congestion=15.87, Passengers=33119.0, Avg Delay=7.39min


In [None]:
# Use DataFrame operations
route_congestion_df = df_congestion.groupBy("Route ID").agg(
    avg("congestion_score").alias("avg_congestion"),
    spark_sum("Passenger Count").alias("total_passengers"),
    avg("unified_delay").alias("avg_delay"),
    count("*").alias("trip_count")
).orderBy(desc("avg_congestion"))

route_congestion_results_df = route_congestion_df.collect()

print("DataFrame Results - Top 5 Most Congested Routes:")
for i, row in enumerate(route_congestion_results_df[:5]):
    print(f"{i+1}. Route {row['Route ID']}: Congestion={row['avg_congestion']:.2f}, "
          f"Passengers={row['total_passengers']}, Avg Delay={row['avg_delay']:.2f}min")

DataFrame Results - Top 5 Most Congested Routes:
1. Route R018: Congestion=17.50, Passengers=35247.0, Avg Delay=7.49min
2. Route R019: Congestion=17.37, Passengers=35817.0, Avg Delay=7.50min
3. Route R013: Congestion=16.19, Passengers=34468.0, Avg Delay=7.43min
4. Route R020: Congestion=15.91, Passengers=31645.0, Avg Delay=7.48min
5. Route R003: Congestion=15.87, Passengers=33119.0, Avg Delay=7.39min


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("your_app_name").getOrCreate()

In [None]:
# SQL query for congestion analysis
congestion_sql = """
SELECT
    `Route ID`,
    AVG(congestion_score) as avg_congestion,
    SUM(`Passenger Count`) as total_passengers,
    AVG(unified_delay) as avg_delay,
    COUNT(*) as trip_count,
    PERCENTILE_APPROX(congestion_score, 0.9) as congestion_90th_percentile
FROM transport_data
GROUP BY `Route ID`
ORDER BY avg_congestion DESC
"""

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create sample data
data = [
    ("R1", 1.5, 20, 5.0),
    ("R1", 2.0, 25, 7.0),
    ("R2", 3.0, 30, 10.0),
    ("R2", 3.5, 35, 12.0),
    ("R3", 1.0, 15, 3.0)
]

schema = StructType([
    StructField("Route ID", StringType(), True),
    StructField("congestion_score", DoubleType(), True),
    StructField("Passenger Count", IntegerType(), True),
    StructField("unified_delay", DoubleType(), True)
])

df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView("transport_data")

In [None]:
print("\n--- SQL APPROACH: Route Congestion Analysis ---")
with PerformanceMonitor("SQL Congestion Analysis") as sql_perf:
    # Register as temporary view
    df_congestion.createOrReplaceTempView("transport_data")



--- SQL APPROACH: Route Congestion Analysis ---
[SQL Congestion Analysis] Execution Time: 0.056s, Memory Delta: 0.00MB


# ANALYSIS 2: PUBLIC TRANSPORT RESOURCE UTILIZATION

In [None]:
print("\n" + "="*80)
print("ANALYSIS 2: PUBLIC TRANSPORT RESOURCE UTILIZATION")
print("="*80)


ANALYSIS 2: PUBLIC TRANSPORT RESOURCE UTILIZATION


In [None]:
print("\n--- RDD APPROACH: Resource Utilization Analysis ---")
with PerformanceMonitor("RDD Resource Utilization") as rdd_util_perf:
    # Calculate utilization metrics using RDD
    utilization_rdd = df_congestion.rdd.map(lambda row: (
        (row['Route ID'], row['hour_of_day']),
        (row['Passenger Count'], row['Fare Data'], 1)
    ))


--- RDD APPROACH: Resource Utilization Analysis ---
[RDD Resource Utilization] Execution Time: 0.002s, Memory Delta: 0.00MB


In [None]:
    # Aggregate by route and hour
    hourly_utilization_rdd = utilization_rdd.reduceByKey(lambda a, b: (
        a[0] + b[0],  # Sum passengers
        a[1] + b[1],  # Sum revenue
        a[2] + b[2]   # Count trips
    ))

In [None]:
# Calculate efficiency metrics with safeguards for division by zero
efficiency_rdd = hourly_utilization_rdd.map(lambda x: {
    'route': x[0][0],
    'hour': x[0][1],
    'passengers_per_trip': x[1][0] / x[1][2] if x[1][2] > 0 else 0,
    'revenue_per_trip': x[1][1] / x[1][2] if x[1][2] > 0 else 0,
    'total_trips': x[1][2]
})

In [None]:
# Find peak utilization hours
peak_hours_rdd = (efficiency_rdd
                  .filter(lambda x: x['passengers_per_trip'] > 12)
                  .sortBy(lambda x: x['passengers_per_trip'], ascending=False)
                  .collect())

In [None]:
print("RDD Results - Top 5 Peak Utilization Periods:")
for i, period in enumerate(peak_hours_rdd[:5]):
    print(f"{i+1}. Route {period['route']} at {period['hour']}:00 - "
          f"{period['passengers_per_trip']:.1f} passengers/trip, "
          f"${period['revenue_per_trip']:.2f}/trip")

RDD Results - Top 5 Peak Utilization Periods:
1. Route R012 at 7:00 - 73.2 passengers/trip, $50.43/trip
2. Route R013 at 0:00 - 62.5 passengers/trip, $56.97/trip
3. Route R004 at 14:00 - 61.0 passengers/trip, $57.61/trip
4. Route R003 at 9:00 - 57.8 passengers/trip, $59.05/trip
5. Route R008 at 10:00 - 57.5 passengers/trip, $55.19/trip


In [None]:
from pyspark.sql.functions import avg, sum, count, col, desc

# DataFrame operations for utilization
utilization_df = df_congestion.groupBy("Route ID", "hour_of_day").agg(
    avg("Passenger Count").alias("avg_passengers_per_trip"),
    avg("Fare Data").alias("avg_revenue_per_trip"),
    count("*").alias("total_trips"),
    sum("Passenger Count").alias("total_passengers")
).withColumn(
    "utilization_efficiency",
    col("avg_passengers_per_trip") * col("avg_revenue_per_trip") / 100
).orderBy(desc("avg_passengers_per_trip"))

In [None]:
utilization_results_df = utilization_df.filter(col("avg_passengers_per_trip") > 12).collect()


In [None]:
print("DataFrame Results - Top 5 Peak Utilization Periods:")
for i, row in enumerate(utilization_results_df[:5]):
    print(f"{i+1}. Route {row['Route ID']} at {row['hour_of_day']}:00 - "
          f"{row['avg_passengers_per_trip']:.1f} passengers/trip, "
          f"${row['avg_revenue_per_trip']:.2f}/trip")

DataFrame Results - Top 5 Peak Utilization Periods:
1. Route R012 at 7:00 - 73.2 passengers/trip, $50.43/trip
2. Route R013 at 0:00 - 62.5 passengers/trip, $56.97/trip
3. Route R004 at 14:00 - 61.0 passengers/trip, $57.61/trip
4. Route R003 at 9:00 - 57.8 passengers/trip, $59.05/trip
5. Route R008 at 10:00 - 57.5 passengers/trip, $55.19/trip


In [None]:
print("\n--- SQL APPROACH: Resource Utilization Analysis ---")
with PerformanceMonitor("SQL Resource Utilization") as sql_util_perf:
    utilization_sql = """
    SELECT
        `Route ID`,
        hour_of_day,
        AVG(`Passenger Count`) as avg_passengers_per_trip,
        AVG(`Fare Data`) as avg_revenue_per_trip,
        COUNT(*) as total_trips,
        SUM(`Passenger Count`) as total_passengers,
        (AVG(`Passenger Count`) * AVG(`Fare Data`)) / 100 as utilization_efficiency
    FROM transport_data
    GROUP BY `Route ID`, hour_of_day
    HAVING AVG(`Passenger Count`) > 12
    ORDER BY avg_passengers_per_trip DESC
    """


--- SQL APPROACH: Resource Utilization Analysis ---
[SQL Resource Utilization] Execution Time: 0.001s, Memory Delta: 0.00MB


In [None]:
utilization_sql_results = spark.sql(utilization_sql).collect()

In [None]:
print("SQL Results - Top 5 Peak Utilization Periods:")
for i, row in enumerate(utilization_sql_results[:5]):
    print(f"{i+1}. Route {row['Route ID']} at {row['hour_of_day']}:00 - "
          f"{row['avg_passengers_per_trip']:.1f} passengers/trip, "
          f"Efficiency Score: {row['utilization_efficiency']:.2f}")

SQL Results - Top 5 Peak Utilization Periods:
1. Route R012 at 7:00 - 73.2 passengers/trip, Efficiency Score: 36.90
2. Route R013 at 0:00 - 62.5 passengers/trip, Efficiency Score: 35.59
3. Route R004 at 14:00 - 61.0 passengers/trip, Efficiency Score: 35.14
4. Route R003 at 9:00 - 57.8 passengers/trip, Efficiency Score: 34.12
5. Route R008 at 10:00 - 57.5 passengers/trip, Efficiency Score: 31.74


# ANALYSIS 3: DELAY EFFECTIVENESS ANALYSIS

In [None]:
print("\n" + "="*80)
print("ANALYSIS 3: DELAY EFFECTIVENESS ANALYSIS")
print("="*80)


ANALYSIS 3: DELAY EFFECTIVENESS ANALYSIS


In [None]:
print("\n--- RDD APPROACH: Delay Effectiveness Analysis ---")
with PerformanceMonitor("RDD Delay Analysis") as rdd_delay_perf:
    # Analyze delay patterns and their impact
    delay_impact_rdd = df_congestion.rdd.map(lambda row: (
        row['Route ID'],
        (row['unified_delay'], row['Passenger Count'], row['Fare Data'], 1)
    ))


--- RDD APPROACH: Delay Effectiveness Analysis ---
[RDD Delay Analysis] Execution Time: 0.002s, Memory Delta: 0.00MB


In [None]:
# Calculate total delay impact by route
route_delay_rdd = delay_impact_rdd.reduceByKey(
    lambda a, b: (
        a[0] + b[0],  # Sum delays
        a[1] + b[1],  # Sum passengers
        a[2] + b[2],  # Sum revenue
        a[3] + b[3]   # Count trips
    )
)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DelayAnalysis").getOrCreate()

In [None]:
# Example data: (route, [total_delay, passengers_lost, revenue_loss, total_trips])
data = [
    ("route1", [120, 50, 2000, 20]),
    ("route2", [0, 0, 0, 10]),
    ("route3", [300, 80, 4000, 25]),
    # Add more data as needed.
]

In [None]:
route_delay_rdd = spark.sparkContext.parallelize(data)

In [None]:
def compute_metrics(x):
    route, metrics = x
    total_delay, passengers_lost, revenue_loss, total_trips = metrics
    if total_trips == 0:
        return None
    avg_delay = total_delay / total_trips
    if avg_delay > 0:
        passengers_lost_per_delay_min = (passengers_lost / total_trips) * avg_delay * 0.05
    else:
        passengers_lost_per_delay_min = 0
    revenue_impact = revenue_loss / total_trips
    return {
        'route': route,
        'avg_delay': avg_delay,
        'passengers_lost_per_delay_min': passengers_lost_per_delay_min,
        'revenue_impact': revenue_impact,
        'total_trips': total_trips
    }

delay_effectiveness_rdd = route_delay_rdd \
    .map(compute_metrics) \
    .filter(lambda x: x is not None and x['avg_delay'] > 0) \
    .sortBy(lambda x: x['passengers_lost_per_delay_min'], ascending=False) \
    .collect()

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DelayAnalysis").getOrCreate()

# Sample data: (route, [total_delay, passengers_lost, revenue_loss, total_trips])
data = [
    ("route1", [120, 50, 2000, 20]),
    ("route2", [0, 0, 0, 10]),
    ("route3", [300, 80, 4000, 25]),
    ("route4", [150, 60, 3000, 0]),  # edge case: zero trips
    ("route5", [200, 70, 3500, 25]),
    ("route6", [180, 65, 3300, 22]),
]

# Create RDD
route_delay_rdd = spark.sparkContext.parallelize(data)

# Define function to compute metrics
def compute_metrics(x):
    route, metrics = x
    total_delay, passengers_lost, revenue_loss, total_trips = metrics
    if total_trips == 0:
        return None  # skip invalid data
    avg_delay = total_delay / total_trips
    if avg_delay > 0:
        passengers_lost_per_delay_min = (passengers_lost / total_trips) * avg_delay * 0.05
    else:
        passengers_lost_per_delay_min = 0
    revenue_impact = revenue_loss / total_trips
    return {
        'route': route,
        'avg_delay': avg_delay,
        'passengers_lost_per_delay_min': passengers_lost_per_delay_min,
        'revenue_impact': revenue_impact,
        'total_trips': total_trips
    }

# Process RDD
delay_effectiveness_rdd = route_delay_rdd \
    .map(compute_metrics) \
    .filter(lambda x: x is not None and x['avg_delay'] > 0) \
    .sortBy(lambda x: x['passengers_lost_per_delay_min'], ascending=False) \
    .collect()

# Print top 5 routes most affected by delays
print("RDD Results - Routes Most Affected by Delays:")
for i, route in enumerate(delay_effectiveness_rdd[:5]):
    print(f"{i+1}. Route {route['route']}: Avg Delay={route['avg_delay']:.2f} min, "
          f"Passenger Impact={route['passengers_lost_per_delay_min']:.2f}, "
          f"Revenue=${route['revenue_impact']:.2f}")

# Placeholder for DataFrame analysis
print("\n--- DataFrame APPROACH: Delay Effectiveness Analysis ---")




RDD Results - Routes Most Affected by Delays:
1. Route route3: Avg Delay=12.00 min, Passenger Impact=1.92, Revenue=$160.00
2. Route route6: Avg Delay=8.18 min, Passenger Impact=1.21, Revenue=$150.00
3. Route route5: Avg Delay=8.00 min, Passenger Impact=1.12, Revenue=$140.00
4. Route route1: Avg Delay=6.00 min, Passenger Impact=0.75, Revenue=$100.00

--- DataFrame APPROACH: Delay Effectiveness Analysis ---


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder.appName("DelayCategories").getOrCreate()

# Sample data: list of tuples with delay values
data = [
    (1, 0),
    (2, 3),
    (3, 10),
    (4, 20)
]

# Define schema
columns = ["id", "unified_delay"]

# Create DataFrame
df_congestion = spark.createDataFrame(data, schema=columns)

# Now apply the category creation
from pyspark.sql.functions import when, col

delay_analysis_df = df_congestion.withColumn(
    "delay_category",
    when(col("unified_delay") <= 0, "On-time/Early")
    .when((col("unified_delay") > 0) & (col("unified_delay") <= 5), "Minor Delay")
    .when((col("unified_delay") > 5) & (col("unified_delay") <= 15), "Moderate Delay")
    .otherwise("Major Delay")
)

delay_analysis_df.show()

+---+-------------+--------------+
| id|unified_delay|delay_category|
+---+-------------+--------------+
|  1|            0| On-time/Early|
|  2|            3|   Minor Delay|
|  3|           10|Moderate Delay|
|  4|           20|   Major Delay|
+---+-------------+--------------+



In [None]:
delay_analysis_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- unified_delay: long (nullable = true)
 |-- delay_category: string (nullable = false)



# ANALYSIS 4: COST ANALYSIS PER ROUTE/VEHICLE TYPE

In [None]:
print("\n" + "="*80)
print("ANALYSIS 4: COST ANALYSIS PER ROUTE/VEHICLE TYPE")
print("="*80)


ANALYSIS 4: COST ANALYSIS PER ROUTE/VEHICLE TYPE


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

#  Initialize Spark session (if not already initialized)
spark = SparkSession.builder.appName("TransportationAnalysis").getOrCreate()


# For demonstration, create a sample DataFrame
data = [
    ("R001", 10, 100),
    ("R007", 15, 150),
    ("R012", 20, 200),
]
columns = ["Route ID", "Passenger Count", "OtherColumn"]
df_featured = spark.createDataFrame(data, schema=columns)


df_congestion = df_featured.withColumnRenamed("Route ID", "route_id")

#  Add vehicle_type and operational_cost columns
df_cost = df_congestion.withColumn(
    "vehicle_type",
    when(col("route_id").rlike("R00[1-5]"), "Bus")
    .when(col("route_id").rlike("R0[0-1][6-9]"), "Metro")
    .otherwise("Tram")
).withColumn(
    "cost",
    when(col("vehicle_type") == "Bus", col("Passenger Count") * 2.5 + 50)
    .when(col("vehicle_type") == "Metro", col("Passenger Count") * 1.8 + 120)
    .otherwise(col("Passenger Count") * 2.0 + 80)
)

# Show the result
df_cost.show()

+--------+---------------+-----------+------------+-----+
|route_id|Passenger Count|OtherColumn|vehicle_type| cost|
+--------+---------------+-----------+------------+-----+
|    R001|             10|        100|         Bus| 75.0|
|    R007|             15|        150|       Metro|147.0|
|    R012|             20|        200|        Tram|120.0|
+--------+---------------+-----------+------------+-----+



#Performance metrics to measure

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, avg, sum as spark_sum, count, max as spark_max, min as spark_min,
    hour, dayofweek, stddev, percentile_approx, desc, asc,
    lag, lead, window, collect_list, expr, round as spark_round, abs as spark_abs
)

In [None]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import time
import psutil
import os
import gc
import threading
import json
from datetime import datetime
from collections import defaultdict
import numpy as np
from typing import Dict, List, Tuple, Any
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Initialize Spark Session with enhanced monitoring
spark = SparkSession.builder \
    .appName("SmartCitySparkPerformanceAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [None]:
class ComprehensivePerformanceMonitor:
    """
    Enhanced performance monitoring system that tracks:
    - Execution time with granular breakdowns
    - Memory usage (RSS, VMS, peak usage)
    - CPU utilization during operations
    - Accuracy metrics for predictions
    - Spark-specific metrics (tasks, stages, etc.)
    """

In [None]:
def __init__(self, operation_name: str, enable_cpu_monitoring: bool = True):
        self.operation_name = operation_name
        self.enable_cpu_monitoring = enable_cpu_monitoring
        self.start_time = None
        self.end_time = None

        # Memory metrics
        self.start_memory_rss = None
        self.start_memory_vms = None
        self.peak_memory_rss = 0
        self.peak_memory_vms = 0
        self.memory_samples = []

        # CPU metrics
        self.cpu_samples = []
        self.cpu_monitoring_thread = None
        self.stop_cpu_monitoring = False

        # Spark metrics
        self.spark_metrics = {}

        # Process reference
        self.process = psutil.Process(os.getpid())

        # Results storage
        self.results = {}

In [None]:
def _monitor_resources(self):
        """Background thread to monitor CPU and memory continuously"""
        while not self.stop_cpu_monitoring:

                # CPU utilization
                cpu_percent = self.process.cpu_percent()
                self.cpu_samples.append(cpu_percent)

                # Memory usage
                memory_info = self.process.memory_info()
                current_rss = memory_info.rss / 1024 / 1024  # MB
                current_vms = memory_info.vms / 1024 / 1024  # MB

                self.memory_samples.append({
                    'timestamp': time.time(),
                    'rss': current_rss,
                    'vms': current_vms
                })

In [None]:
import psutil
import time
import os

# Initialize peak memory usage variables
peak_memory_rss = 0
peak_memory_vms = 0

# Duration for monitoring (in seconds)
monitor_duration = 10  # e.g., 10 seconds
start_time = time.time()

try:
    process = psutil.Process(os.getpid())
    while time.time() - start_time < monitor_duration:
        current_rss = process.memory_info().rss
        current_vms = process.memory_info().vms

        # Track peak memory
        peak_memory_rss = max(peak_memory_rss, current_rss)
        peak_memory_vms = max(peak_memory_vms, current_vms)

        time.sleep(0.1)  # Sample every 100ms
except Exception as e:
    print(f"Resource monitoring error: {e}")

# After loop ends, print peak memory usage
print(f"Peak RSS memory: {peak_memory_rss / (1024 ** 2):.2f} MB")
print(f"Peak VMS memory: {peak_memory_vms / (1024 ** 2):.2f} MB")

Peak RSS memory: 250.02 MB
Peak VMS memory: 1637.74 MB


In [None]:
def __enter__(self):
        # Reset collections
        self.cpu_samples = []
        self.memory_samples = []
        self.peak_memory_rss = 0
        self.peak_memory_vms = 0

In [None]:
import time
import psutil
import os

class ResourceMonitor:
    def __init__(self):
        self.process = psutil.Process(os.getpid())
        self.start_time = None
        self.start_memory_rss = None
        self.start_memory_vms = None

    def start_measurements(self):
        self.start_time = time.time()
        initial_memory = self.process.memory_info()
        self.start_memory_rss = initial_memory.rss / 1024 / 1024
        self.start_memory_vms = initial_memory.vms / 1024 / 1024



In [None]:
import threading

class YourClassName:
    def __init__(self):
        self.enable_cpu_monitoring = True  # or False, depending on your setting
        self.stop_cpu_monitoring = False
        self.cpu_monitoring_thread = None

    def _monitor_resources(self):
        # Implement resource monitoring logic here
        while not self.stop_cpu_monitoring:
            # Collect resource usage here
            # e.g., CPU, memory, etc.
            pass

    def start_resource_monitoring(self):
        # Start resource monitoring thread
        if self.enable_cpu_monitoring:
            self.stop_cpu_monitoring = False
            self.cpu_monitoring_thread = threading.Thread(target=self._monitor_resources)
            self.cpu_monitoring_thread.daemon = True
            self.cpu_monitoring_thread.start()

    def stop_resource_monitoring(self):
        # To stop the thread gracefully
        self.stop_cpu_monitoring = True
        if self.cpu_monitoring_thread:
            self.cpu_monitoring_thread.join()

In [None]:
class YourClass:
    def __init__(self, operation_name):
        self.operation_name = operation_name

    def _capture_spark_metrics(self, stage):
        # Your code to capture Spark metrics
        pass

    def start_performance_monitoring(self):
        # Get initial Spark metrics
        self._capture_spark_metrics('start')

        print(f"\nStarting performance monitoring for: {self.operation_name}")
        return self


In [None]:
def __exit__(self, exc_type, exc_val, exc_tb):

    pass

In [None]:
def stop_monitoring(self):
    # Record end time
    self.end_time = time.time()

    # Signal the thread to stop
    self.stop_cpu_monitoring = True

    # Wait for the thread to finish
    if self.cpu_monitoring_thread and self.cpu_monitoring_thread.is_alive():
        self.cpu_monitoring_thread.join(timeout=1.0)

In [None]:
def record_final_memory(self):
    # Get final memory info
    final_memory = self.process.memory_info()
    final_memory_rss = final_memory.rss / 1024 / 1024
    final_memory_vms = final_memory.vms / 1024 / 1024

    return final_memory_rss, final_memory_vms

In [None]:
import psutil

In [None]:
def record_final_memory(self):
    # Get final memory info
    final_memory = self.process.memory_info()
    final_memory_rss = final_memory.rss / 1024 / 1024
    final_memory_vms = final_memory.vms / 1024 / 1024

    print(f"Final Memory - RSS: {final_memory_rss:.2f} MB, VMS: {final_memory_vms:.2f} MB")

    return final_memory_rss, final_memory_vms


In [None]:
import psutil

class MemoryTracker:
    def __init__(self):
        self.process = psutil.Process()

    def record_final_memory(self):
        # Get final memory info
        final_memory = self.process.memory_info()
        final_memory_rss = final_memory.rss / 1024 / 1024
        final_memory_vms = final_memory.vms / 1024 / 1024

        print(f"Final Memory - RSS: {final_memory_rss:.2f} MB, VMS: {final_memory_vms:.2f} MB")
        return final_memory_rss, final_memory_vms

# Create an instance of the class
tracker = MemoryTracker()

# Call the method to print and return final memory
rss, vms = tracker.record_final_memory()


Final Memory - RSS: 250.84 MB, VMS: 1638.74 MB


In [None]:
def calculate_metrics(self):
    # Calculate metrics after execution
    execution_time = self.end_time - self.start_time
    memory_delta_rss = self.final_memory_rss - self.start_memory_rss
    memory_delta_vms = self.final_memory_vms - self.start_memory_vms

    print(f"Execution Time: {execution_time:.2f} seconds")
    print(f"Memory Change - RSS: {memory_delta_rss:.2f} MB")
    print(f"Memory Change - VMS: {memory_delta_vms:.2f} MB")

    return execution_time, memory_delta_rss, memory_delta_vms


In [None]:
class CPU :
    def __init__(self):
        self.cpu_samples = []

    def some_method(self):
        cpu_stats = self._calculate_cpu_stats() if self.cpu_samples else {}
        # rest of method

In [None]:
class Memory :
    def __init__(self):
        self.memory_samples = []

    def get_stats(self):

        memory_stats = self._calculate_memory_stats()
        return memory_stats

In [None]:
class Spark:
    def __init__(self):
        self.spark_samples = []

    def compute_stats(self):
        spark_stats = self._calculate_spark_stats()
        return spark_stats

In [None]:
from datetime import datetime

class PerformanceTracker:
    def __init__(self, operation_name):
        self.operation_name = operation_name
        self.results = None

    def record_metrics(
        self,
        execution_time,
        final_memory_rss,
        memory_delta_rss,
        final_memory_vms,
        memory_delta_vms,
        memory_stats,
        cpu_stats,
        spark_stats
    ):
        # Properly indented (4 spaces)
        self.results = {
            'operation_name': self.operation_name,
            'execution_time': execution_time,
            'memory': {
                'start_rss_mb': self.start_memory_rss,
                'final_rss_mb': final_memory_rss,
                'delta_rss_mb': memory_delta_rss,
                'peak_rss_mb': self.peak_memory_rss,
                'start_vms_mb': self.start_memory_vms,
                'final_vms_mb': final_memory_vms,
                'delta_vms_mb': memory_delta_vms,
                'peak_vms_mb': self.peak_memory_vms,
                **memory_stats
            },
            'cpu': cpu_stats,
            'spark_metrics': spark_stats,
            'timestamp': datetime.now().isoformat()
        }

In [None]:
class PerformanceMonitor:
    def __init__(self):
        self.results = None

    def finalize(self, execution_time, memory_stats, cpu_stats, spark_stats):
        # Store results
        self.results = {
            'execution_time': execution_time,
            'memory': memory_stats,
            'cpu': cpu_stats,
            'spark': spark_stats
        }

        # Print summary
        self._print_performance_summary()

    def _print_performance_summary(self):
        print("Performance Summary:")
        print(f"Execution Time: {self.results['execution_time']:.2f}s")


In [None]:
from datetime import datetime

class PerformanceMonitor:
    def __init__(self, operation_name):
        self.operation_name = operation_name
        # Mock data initialization
        self.start_memory_rss = 100.5
        self.peak_memory_rss = 250.3
        self.start_memory_vms = 200.0
        self.peak_memory_vms = 500.0
        self.results = None

    def record_metrics(self, execution_time):
        """Mock method to record metrics with sample data"""
        memory_stats = {'cache': 45.2, 'buffers': 30.1}
        cpu_stats = {'user_usage': 75.5, 'system_usage': 12.3}
        spark_stats = {'executor_memory': 8192, 'tasks_completed': 200}

        self.results = {
            'operation_name': self.operation_name,
            'execution_time': execution_time,
            'memory': {
                'start_rss_mb': self.start_memory_rss,
                'final_rss_mb': 220.7,
                'delta_rss_mb': 120.2,
                'peak_rss_mb': self.peak_memory_rss,
                'start_vms_mb': self.start_memory_vms,
                'final_vms_mb': 450.0,
                'delta_vms_mb': 250.0,
                'peak_vms_mb': self.peak_memory_vms,
                **memory_stats
            },
            'cpu': cpu_stats,
            'spark_metrics': spark_stats,
            'timestamp': datetime.now().isoformat()
        }
        self._print_performance_summary()

    def _print_performance_summary(self):
        """Prints a formatted performance summary"""
        if not self.results:
            print("No performance data available!")
            return

        print("\n" + "="*60)
        print("PERFORMANCE SUMMARY".center(60))
        print("="*60)

        # Basic Info
        print(f"\n{'Operation:':<15}{self.results['operation_name']}")
        print(f"{'Timestamp:':<15}{self.results['timestamp']}")
        print(f"{'Duration:':<15}{self.results['execution_time']:.2f} seconds")

        # Memory
        print("\nMEMORY USAGE (MB):")
        mem = self.results['memory']
        print(f"{'':<10}{'Start':>10}{'Final':>10}{'Delta':>10}{'Peak':>10}")
        print(f"{'RSS:':<10}{mem['start_rss_mb']:>10.1f}{mem['final_rss_mb']:>10.1f}{mem['delta_rss_mb']:>10.1f}{mem['peak_rss_mb']:>10.1f}")
        print(f"{'VMS:':<10}{mem['start_vms_mb']:>10.1f}{mem['final_vms_mb']:>10.1f}{mem['delta_vms_mb']:>10.1f}{mem['peak_vms_mb']:>10.1f}")

        # CPU
        if self.results.get('cpu'):
            print("\nCPU USAGE (%):")
            for metric, value in self.results['cpu'].items():
                print(f"{metric.replace('_', ' ').title():<20}{value:>5.1f}%")

        # Spark
        if self.results.get('spark_metrics'):
            print("\nSPARK METRICS:")
            for metric, value in self.results['spark_metrics'].items():
                print(f"{metric.replace('_', ' ').title():<25}{value:>10}")

        print("\n" + "="*60)

# Test it
if __name__ == "__main__":
    monitor = PerformanceMonitor("data_processing_job")
    monitor.record_metrics(12.34)


                    PERFORMANCE SUMMARY                     

Operation:     data_processing_job
Timestamp:     2025-07-25T06:11:24.437896
Duration:      12.34 seconds

MEMORY USAGE (MB):
               Start     Final     Delta      Peak
RSS:           100.5     220.7     120.2     250.3
VMS:           200.0     450.0     250.0     500.0

CPU USAGE (%):
User Usage           75.5%
System Usage         12.3%

SPARK METRICS:
Executor Memory                8192
Tasks Completed                 200



#**Hive** Queries for Transport **Analytics**

In [None]:
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 35 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u452-ga~us1-0ubuntu1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u452-ga~us1-0

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Spark session with Hive support
spark = SparkSession.builder \
    .appName("TransportationAnalysis") \
    .config("spark.sql.warehouse.dir", "/content/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

Database Schema

In [None]:
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Initialize Spark
spark = SparkSession.builder \
    .appName("TransportationAnalysis") \
    .config("spark.sql.warehouse.dir", "/content/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

# Create tables with corrected syntax
spark.sql("DROP TABLE IF EXISTS fact_trips")
spark.sql("""
CREATE TABLE IF NOT EXISTS fact_trips (
    trip_id STRING,
    vehicle_id STRING,
    driver_id STRING,
    route_id STRING,
    start_zone_id STRING,
    end_zone_id STRING,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    distance_km DOUBLE,
    fare_amount DOUBLE,
    toll_amount DOUBLE,
    surcharge_amount DOUBLE,
    total_amount DOUBLE,
    passenger_count INT,
    weather_condition STRING,
    event_id STRING,
    delay_minutes INT
) USING PARQUET
""")

spark.sql("DROP TABLE IF EXISTS dim_zones")
spark.sql("""
CREATE TABLE IF NOT EXISTS dim_zones (
    zone_id STRING,
    zone_name STRING,
    zone_type STRING,
    population_density STRING,
    income_level STRING,
    business_percentage DOUBLE
) USING PARQUET
""")

# Sample data with proper formatting
from pyspark.sql import Row

# Create sample data as Row objects
trips_data = [
    Row(trip_id="T1", vehicle_id="V1", driver_id="D1", route_id="R1",
        start_zone_id="Z1", end_zone_id="Z2",
        start_time="2023-01-01 08:15:00", end_time="2023-01-01 08:45:00",
        distance_km=10.5, fare_amount=15.0, toll_amount=2.0, surcharge_amount=1.5,
        total_amount=18.5, passenger_count=2, weather_condition="Rain",
        event_id=None, delay_minutes=5),
    Row(trip_id="T2", vehicle_id="V2", driver_id="D2", route_id="R2",
        start_zone_id="Z2", end_zone_id="Z3",
        start_time="2023-01-01 09:30:00", end_time="2023-01-01 10:15:00",
        distance_km=15.0, fare_amount=22.0, toll_amount=3.0, surcharge_amount=2.0,
        total_amount=27.0, passenger_count=3, weather_condition="Clear",
        event_id="E1", delay_minutes=12)
]

zones_data = [
    Row(zone_id="Z1", zone_name="Downtown", zone_type="Commercial",
        population_density="High", income_level="High", business_percentage=70.0),
    Row(zone_id="Z2", zone_name="Midtown", zone_type="Mixed",
        population_density="Medium", income_level="Medium", business_percentage=50.0),
    Row(zone_id="Z3", zone_name="Suburb", zone_type="Residential",
        population_density="Low", income_level="High", business_percentage=20.0)
]

# Create DataFrames
trips_df = spark.createDataFrame(trips_data)
zones_df = spark.createDataFrame(zones_data)

# Write to tables
trips_df.write.mode("overwrite").saveAsTable("fact_trips")
zones_df.write.mode("overwrite").saveAsTable("dim_zones")

# Register UDF
def peak_hour_classifier(time_str):
    from datetime import datetime
    if not time_str:
        return "Off-Peak"
    try:
        dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
        hour = dt.hour
        if 7 <= hour <= 9: return "Morning Peak"
        if 16 <= hour <= 19: return "Evening Peak"
        if 11 <= hour <= 13: return "Lunch Peak"
        return "Off-Peak"
    except:
        return "Off-Peak"

spark.udf.register("peak_hour", peak_hour_classifier)

# Run a sample query
result = spark.sql("""
SELECT
    start_zone_id,
    peak_hour(start_time) AS time_period,
    COUNT(*) AS trip_count,
    AVG(delay_minutes) AS avg_delay
FROM
    fact_trips
GROUP BY
    start_zone_id, peak_hour(start_time)
ORDER BY
    start_zone_id, time_period
""")

result.show()

openjdk-8-jdk-headless is already the newest version (8u452-ga~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.
+-------------+------------+----------+---------+
|start_zone_id| time_period|trip_count|avg_delay|
+-------------+------------+----------+---------+
|           Z1|Morning Peak|         1|      5.0|
|           Z2|Morning Peak|         1|     12.0|
+-------------+------------+----------+---------+



Peak Hour Classification

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from datetime import datetime

# Initialize Spark
spark = SparkSession.builder \
    .appName("PeakHourAnalysis") \
    .getOrCreate()

# Sample data
data = [("T1", "2023-01-01 08:15:00"),
        ("T2", "2023-01-01 17:30:00"),
        ("T3", "2023-01-01 12:00:00"),
        ("T4", "2023-01-01 14:00:00")]

# Create DataFrame
columns = ["trip_id", "start_time"]
trips_df = spark.createDataFrame(data, columns)

# Define and register UDF
def classify_peak_hour(time_str):
    try:
        dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
        hour = dt.hour
        if 7 <= hour <= 9: return "Morning Peak"
        if 16 <= hour <= 19: return "Evening Peak"
        if 11 <= hour <= 13: return "Lunch Peak"
        return "Off-Peak"
    except:
        return "Unknown"

peak_hour_udf = udf(classify_peak_hour, StringType())
spark.udf.register("peak_hour", classify_peak_hour, StringType())

# Using the UDF
trips_with_peak = trips_df.withColumn("time_period", peak_hour_udf(col("start_time")))
trips_with_peak.show()

# Using in SQL
trips_df.createOrReplaceTempView("trips")
spark.sql("""
    SELECT trip_id, start_time, peak_hour(start_time) AS period
    FROM trips
""").show()

+-------+-------------------+------------+
|trip_id|         start_time| time_period|
+-------+-------------------+------------+
|     T1|2023-01-01 08:15:00|Morning Peak|
|     T2|2023-01-01 17:30:00|Evening Peak|
|     T3|2023-01-01 12:00:00|  Lunch Peak|
|     T4|2023-01-01 14:00:00|    Off-Peak|
+-------+-------------------+------------+

+-------+-------------------+------------+
|trip_id|         start_time|      period|
+-------+-------------------+------------+
|     T1|2023-01-01 08:15:00|Morning Peak|
|     T2|2023-01-01 17:30:00|Evening Peak|
|     T3|2023-01-01 12:00:00|  Lunch Peak|
|     T4|2023-01-01 14:00:00|    Off-Peak|
+-------+-------------------+------------+



Delay Zone Analysis

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum as spark_sum, when, expr

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BottleneckAnalysis") \
    .getOrCreate()

# sample data
from pyspark.sql import Row
from datetime import date

# Sample trip data
trips_data = [
    Row(trip_id="T1", start_zone_id="Z1", trip_date=date(2023, 1, 1), delay_minutes=10),
    Row(trip_id="T2", start_zone_id="Z1", trip_date=date(2023, 1, 2), delay_minutes=25),
    Row(trip_id="T3", start_zone_id="Z2", trip_date=date(2023, 1, 1), delay_minutes=5),
    Row(trip_id="T4", start_zone_id="Z2", trip_date=date(2023, 1, 3), delay_minutes=30),
    Row(trip_id="T5", start_zone_id="Z1", trip_date=date(2023, 1, 4), delay_minutes=18),
    # Add more sample data as needed
]

# Sample zone data
zones_data = [
    Row(zone_id="Z1", zone_name="Downtown", zone_type="Commercial"),
    Row(zone_id="Z2", zone_name="Midtown", zone_type="Residential"),
]

# Create DataFrames
fact_trips_df = spark.createDataFrame(trips_data)
dim_zones_df = spark.createDataFrame(zones_data)

# Register temporary views for SQL queries
fact_trips_df.createOrReplaceTempView("fact_trips")
dim_zones_df.createOrReplaceTempView("dim_zones")

# Run the analysis using DataFrame API
bottleneck_results = (
    fact_trips_df.alias("t")
    .join(
        dim_zones_df.alias("z"),
        col("t.start_zone_id") == col("z.zone_id"),
        "inner"
    )
    .filter(
        (col("t.trip_date").between("2023-01-01", "2023-12-31")) &
        (col("t.delay_minutes") > 0)
    )
    .groupBy("z.zone_name", "z.zone_type")
    .agg(
        count("t.trip_id").alias("total_trips"),
        avg("t.delay_minutes").alias("avg_delay_minutes"),
        expr("percentile_approx(t.delay_minutes, 0.95)").alias("p95_delay_minutes"),
        spark_sum(when(col("t.delay_minutes") > 15, 1).otherwise(0)).alias("delayed_trips_count"),
        (spark_sum(when(col("t.delay_minutes") > 15, 1).otherwise(0)) / count("t.trip_id")).alias("delay_probability")
    )
    .filter(col("total_trips") > 0)  # Changed from >100 for sample data
    .orderBy(
        col("delay_probability").desc(),
        col("avg_delay_minutes").desc()
    )
    .limit(10)
)

# Show results
bottleneck_results.show()

+---------+-----------+-----------+------------------+-----------------+-------------------+------------------+
|zone_name|  zone_type|total_trips| avg_delay_minutes|p95_delay_minutes|delayed_trips_count| delay_probability|
+---------+-----------+-----------+------------------+-----------------+-------------------+------------------+
| Downtown| Commercial|          3|17.666666666666668|               25|                  2|0.6666666666666666|
|  Midtown|Residential|          2|              17.5|               30|                  1|               0.5|
+---------+-----------+-----------+------------------+-----------------+-------------------+------------------+



Time-of-Day Traffic Volume Trends with Peak Classification

In [None]:
from pyspark.sql.functions import hour, expr

# First add a timestamp column if possibl
fact_trips_df = fact_trips_df.withColumn(
    "start_time",
    expr("to_timestamp(concat(cast(trip_date as string), ' 12:00:00'))")  # Using noon as default time
)

# Then register the temporary view
fact_trips_df.createOrReplaceTempView("fact_trips")

# Now run the time analysis
time_analysis_query = """
SELECT
    hour(start_time) AS hour_of_day,
    COUNT(trip_id) AS trip_count,
    AVG(delay_minutes) AS avg_delay
FROM
    fact_trips
WHERE
    trip_date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY
    hour(start_time)
ORDER BY
    hour_of_day
"""

time_analysis_results = spark.sql(time_analysis_query)
time_analysis_results.show()

+-----------+----------+---------+
|hour_of_day|trip_count|avg_delay|
+-----------+----------+---------+
|         12|         5|     17.6|
+-----------+----------+---------+



 Revenue Analysis by Route

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, lit, rand

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RevenueAnalysis") \
    .getOrCreate()


from pyspark.sql import Row
from datetime import date

# Sample trip data with all required columns
trips_data = [
    Row(trip_id="T1", route_id="R1", start_zone_id="Z1", end_zone_id="Z2",
        trip_date=date(2023, 1, 1), delay_minutes=10, total_amount=25.50, distance_km=12.3),
    Row(trip_id="T2", route_id="R1", start_zone_id="Z1", end_zone_id="Z3",
        trip_date=date(2023, 1, 2), delay_minutes=15, total_amount=30.00, distance_km=15.0),
    Row(trip_id="T3", route_id="R2", start_zone_id="Z2", end_zone_id="Z1",
        trip_date=date(2023, 1, 3), delay_minutes=5, total_amount=18.75, distance_km=8.5),
    # Add more sample data as needed
]

# Sample route data
routes_data = [
    Row(route_id="R1", route_name="Downtown Express", origin_zone="Z1", destination_zone="Z3"),
    Row(route_id="R2", route_name="Cross-Town", origin_zone="Z2", destination_zone="Z1"),
]

# Sample zone data
zones_data = [
    Row(zone_id="Z1", zone_name="Central Business District", zone_type="Commercial"),
    Row(zone_id="Z2", zone_name="Midtown", zone_type="Mixed"),
    Row(zone_id="Z3", zone_name="Uptown", zone_type="Residential"),
]

# Create DataFrames
fact_trips_df = spark.createDataFrame(trips_data)
dim_routes_df = spark.createDataFrame(routes_data)
dim_zones_df = spark.createDataFrame(zones_data)

# Register temporary views
fact_trips_df.createOrReplaceTempView("fact_trips")
dim_routes_df.createOrReplaceTempView("dim_routes")
dim_zones_df.createOrReplaceTempView("dim_zones")

# Perform the revenue analysis
revenue_results = spark.sql("""
SELECT
    r.route_name,
    oz.zone_name AS origin_zone,
    dz.zone_name AS destination_zone,
    COUNT(t.trip_id) AS trip_count,
    SUM(t.total_amount) AS total_revenue,
    ROUND(SUM(t.total_amount) / COUNT(t.trip_id), 2) AS avg_revenue_per_trip,
    ROUND(SUM(t.total_amount) / SUM(t.distance_km), 2) AS revenue_per_km,
    SUM(t.distance_km) AS total_distance_km,
    ROUND(AVG(t.delay_minutes), 2) AS avg_delay_minutes
FROM
    fact_trips t
JOIN
    dim_routes r ON t.route_id = r.route_id
JOIN
    dim_zones oz ON t.start_zone_id = oz.zone_id
JOIN
    dim_zones dz ON t.end_zone_id = dz.zone_id
WHERE
    t.trip_date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY
    r.route_name, oz.zone_name, dz.zone_name
HAVING
    COUNT(t.trip_id) > 1  -- Changed from 50 for demo with small dataset
ORDER BY
    total_revenue DESC
LIMIT 20
""")

# Show results
revenue_results.show(truncate=False)

# Alternative DataFrame API version
df_revenue_results = (
    fact_trips_df.alias("t")
    .join(dim_routes_df.alias("r"), col("t.route_id") == col("r.route_id"))
    .join(dim_zones_df.alias("oz"), col("t.start_zone_id") == col("oz.zone_id"))
    .join(dim_zones_df.alias("dz"), col("t.end_zone_id") == col("dz.zone_id"))
    .filter(col("t.trip_date").between("2023-01-01", "2023-12-31"))
    .groupBy("r.route_name", "oz.zone_name", "dz.zone_name")
    .agg(
        count("t.trip_id").alias("trip_count"),
        sum("t.total_amount").alias("total_revenue"),
        avg("t.total_amount").alias("avg_revenue_per_trip"),
        (sum("t.total_amount")/sum("t.distance_km")).alias("revenue_per_km"),
        sum("t.distance_km").alias("total_distance_km"),
        avg("t.delay_minutes").alias("avg_delay_minutes")
    )
    .filter(col("trip_count") > 1)
    .orderBy(col("total_revenue").desc())
    .limit(20)
)

df_revenue_results.show(truncate=False)

+----------+-----------+----------------+----------+-------------+--------------------+--------------+-----------------+-----------------+
|route_name|origin_zone|destination_zone|trip_count|total_revenue|avg_revenue_per_trip|revenue_per_km|total_distance_km|avg_delay_minutes|
+----------+-----------+----------------+----------+-------------+--------------------+--------------+-----------------+-----------------+
+----------+-----------+----------------+----------+-------------+--------------------+--------------+-----------------+-----------------+

+----------+---------+---------+----------+-------------+--------------------+--------------+-----------------+-----------------+
|route_name|zone_name|zone_name|trip_count|total_revenue|avg_revenue_per_trip|revenue_per_km|total_distance_km|avg_delay_minutes|
+----------+---------+---------+----------+-------------+--------------------+--------------+-----------------+-----------------+
+----------+---------+---------+----------+----------

Impact Analysis

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from datetime import date

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ImpactAnalysis") \
    .getOrCreate()

# Create sample data for all required tables
from pyspark.sql import Row

# Sample trip data
trips_data = [
    Row(trip_id="T1", start_zone_id="Z1", trip_date=date(2023, 1, 1),
        delay_minutes=10, total_amount=25.50, passenger_count=2, event_id=None),
    Row(trip_id="T2", start_zone_id="Z2", trip_date=date(2023, 1, 2),
        delay_minutes=15, total_amount=30.00, passenger_count=3, event_id="E1"),
    Row(trip_id="T3", start_zone_id="Z1", trip_date=date(2023, 1, 3),
        delay_minutes=5, total_amount=18.75, passenger_count=1, event_id=None),
    # Add more sample data as needed
]

# Sample weather data
weather_data = [
    Row(weather_date=date(2023, 1, 1), weather_condition="Sunny"),
    Row(weather_date=date(2023, 1, 2), weather_condition="Rainy"),
    Row(weather_date=date(2023, 1, 3), weather_condition="Cloudy"),
]

# Sample zone data (extended with population_density and income_level)
zones_data = [
    Row(zone_id="Z1", zone_name="Downtown", zone_type="Commercial",
        population_density="High", income_level="High"),
    Row(zone_id="Z2", zone_name="Midtown", zone_type="Residential",
        population_density="Medium", income_level="Middle"),
]

# Sample events data
events_data = [
    Row(event_id="E1", event_name="Music Festival", event_type="Concert"),
]

# Create DataFrames
fact_trips_df = spark.createDataFrame(trips_data)
dim_weather_df = spark.createDataFrame(weather_data)
dim_zones_df = spark.createDataFrame(zones_data)
dim_events_df = spark.createDataFrame(events_data)

# Register temporary views
fact_trips_df.createOrReplaceTempView("fact_trips")
dim_weather_df.createOrReplaceTempView("dim_weather")
dim_zones_df.createOrReplaceTempView("dim_zones")
dim_events_df.createOrReplaceTempView("dim_events")

# Run the impact analysis query
impact_analysis = spark.sql("""
SELECT
    w.weather_condition,
    z.population_density,
    z.income_level,
    COUNT(t.trip_id) AS trip_count,
    ROUND(AVG(t.total_amount), 2) AS avg_fare,
    ROUND(AVG(t.delay_minutes), 2) AS avg_delay,
    ROUND(AVG(t.passenger_count), 2) AS avg_passengers
FROM
    fact_trips t
JOIN
    dim_weather w ON t.trip_date = w.weather_date
JOIN
    dim_zones z ON t.start_zone_id = z.zone_id
LEFT JOIN
    dim_events e ON t.event_id = e.event_id
WHERE
    t.trip_date BETWEEN '2023-01-01' AND '2023-12-31'
    AND e.event_id IS NULL -- Exclude days with major events
GROUP BY
    w.weather_condition, z.population_density, z.income_level
ORDER BY
    trip_count DESC
""")

# Show results
impact_analysis.show()

# Alternative DataFrame API version
df_impact_analysis = (
    fact_trips_df.alias("t")
    .join(dim_weather_df.alias("w"), col("t.trip_date") == col("w.weather_date"))
    .join(dim_zones_df.alias("z"), col("t.start_zone_id") == col("z.zone_id"))
    .join(dim_events_df.alias("e"), col("t.event_id") == col("e.event_id"), "left")
    .filter(
        (col("t.trip_date").between("2023-01-01", "2023-12-31")) &
        (col("e.event_id").isNull())
    )
    .groupBy("w.weather_condition", "z.population_density", "z.income_level")
    .agg(
        count("t.trip_id").alias("trip_count"),
        avg("t.total_amount").alias("avg_fare"),
        avg("t.delay_minutes").alias("avg_delay"),
        avg("t.passenger_count").alias("avg_passengers")
    )
    .orderBy(col("trip_count").desc())
)

df_impact_analysis.show()

+-----------------+------------------+------------+----------+--------+---------+--------------+
|weather_condition|population_density|income_level|trip_count|avg_fare|avg_delay|avg_passengers|
+-----------------+------------------+------------+----------+--------+---------+--------------+
|            Sunny|              High|        High|         1|    25.5|     10.0|           2.0|
|           Cloudy|              High|        High|         1|   18.75|      5.0|           1.0|
+-----------------+------------------+------------+----------+--------+---------+--------------+

+-----------------+------------------+------------+----------+--------+---------+--------------+
|weather_condition|population_density|income_level|trip_count|avg_fare|avg_delay|avg_passengers|
+-----------------+------------------+------------+----------+--------+---------+--------------+
|            Sunny|              High|        High|         1|    25.5|     10.0|           2.0|
|           Cloudy|          