<a href="https://colab.research.google.com/github/GundalaJohnPaul/Gundala-JohnPaul/blob/main/smart_parking_ai_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install PySpark and MLflow for Colab environment
!pip install pyspark mlflow
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow

# Initialize Spark Session
spark = SparkSession.builder.appName("SmartParkingAI").getOrCreate()
print("Environment Ready")

Environment Ready


In [None]:
# 1. Bronze Layer: Simulate raw sensor and camera data ingestion
data = {
    'sensor_id': [101, 102, 103, 104, 105] * 20,
    'timestamp': pd.date_range(start='2026-01-30', periods=100, freq='H'),
    'raw_occupancy': np.random.choice([0, 1], size=100), # 0: empty, 1: occupied
    'traffic_pattern_index': np.random.uniform(0.1, 1.0, 100)
}
bronze_df = spark.createDataFrame(pd.DataFrame(data))

# 2. Silver Layer: Data cleaning and defining occupancy status
# Calculating 'duration' to help predict future availability [cite: 47]
silver_df = bronze_df.withColumn("is_occupied", F.col("raw_occupancy").cast("boolean")) \
                     .withColumn("hour", F.hour("timestamp"))

# 3. Gold Layer: Aggregated data for revenue and availability prediction
gold_df = silver_df.groupBy("hour").agg(
    F.avg("traffic_pattern_index").alias("avg_traffic"),
    F.sum("raw_occupancy").alias("total_occupied")
)
gold_df.show(5)

  'timestamp': pd.date_range(start='2026-01-30', periods=100, freq='H'),


+----+-------------------+--------------+
|hour|        avg_traffic|total_occupied|
+----+-------------------+--------------+
|  12| 0.5144836593345931|             2|
|  22|  0.743627233512804|             3|
|   1| 0.5105972515520023|             3|
|  13|0.45772307360050435|             1|
|   6| 0.6665756315364075|             3|
+----+-------------------+--------------+
only showing top 5 rows


In [None]:
# Prepare features for the predictive model
assembler = VectorAssembler(inputCols=["hour", "avg_traffic"], outputCol="features")
model_data = assembler.transform(gold_df)

# Start MLflow Experiment
mlflow.set_experiment("Parking_Availability_Prediction")

with mlflow.start_run():
    # Model Training: Predicting occupied spaces
    rf = RandomForestRegressor(featuresCol="features", labelCol="total_occupied")
    model = rf.fit(model_data)

    # Evaluation [cite: 45]
    predictions = model.transform(model_data)
    evaluator = RegressionEvaluator(labelCol="total_occupied", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

    # Log metrics and model
    mlflow.log_metric("rmse", rmse)
    print(f"Model Training Complete. RMSE: {rmse}")

Model Training Complete. RMSE: 0.3223546430865796


In [None]:
from pyspark.sql.types import DoubleType

def calculate_dynamic_pricing(occupancy_count, base_rate=5.0):
    """
    Implements dynamic pricing based on AI demand analysis [cite: 66, 68]
    """
    # If occupancy is high, the AI system alters fees to improve returns
    if occupancy_count > 3:
        return float(base_rate * 1.5) # 50% increase during peak demand
    return float(base_rate)

# Registering UDF for Spark correctly using the types module
pricing_udf = F.udf(calculate_dynamic_pricing, DoubleType())

# Applying dynamic rates to the Gold Layer data (Aggregated business-level data) [cite: 63]
revenue_optimized_df = gold_df.withColumn("current_rate", pricing_udf(F.col("total_occupied")))

print("Optimized Revenue Strategy (Live Rates):")
revenue_optimized_df.select("hour", "total_occupied", "current_rate").show(10)

Optimized Revenue Strategy (Live Rates):
+----+--------------+------------+
|hour|total_occupied|current_rate|
+----+--------------+------------+
|  12|             2|         5.0|
|  22|             3|         5.0|
|   1|             3|         5.0|
|  13|             1|         5.0|
|   6|             3|         5.0|
|  16|             1|         5.0|
|   3|             3|         5.0|
|  20|             1|         5.0|
|   5|             1|         5.0|
|  19|             2|         5.0|
+----+--------------+------------+
only showing top 10 rows
