<a href="https://colab.research.google.com/github/JRK-007/realtime-parking-pricing/blob/main/Dynamic_Pricing_for_Urban_Parking_Lots_(Google_Colab_Template).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# -*- coding: utf-8 -*-
"""
Dynamic Pricing for Urban Parking Lots

This notebook provides a template for implementing a dynamic pricing system
for urban parking lots, as outlined in the Summer Analytics 2025 Capstone Project.

It includes:
1.  Setup and necessary imports (pandas, numpy, bokeh, pathway - simulated).
2.  Helper functions for calculations like Haversine distance and data normalization.
3.  Implementation of three pricing models:
    -   Model 1: Baseline Linear Model
    -   Model 2: Demand-Based Price Function
    -   Model 3: Competitive Pricing Model (Simplified)
4.  A mock data generator to simulate real-time data streams.
5.  Bokeh visualization setup for real-time pricing plots.
6.  A simulated real-time loop to demonstrate the pricing updates and visualizations.

**IMPORTANT:**
-   This code simulates real-time data using a loop and mock data. You will need
    to replace the mock data generation and the simulation loop with actual
    Pathway integration (`pw.io.csv.read`, `pw.run`, etc.) once you have the
    provided Pathway sample notebook and `dataset.csv`.
-   The pricing model coefficients are placeholders and require tuning based on
    your analysis of the actual `dataset.csv`.
-   The competitive pricing model (Model 3) is simplified for demonstration.
    A full implementation would require more sophisticated logic for identifying
    nearby lots and fetching their real-time prices within the Pathway stream.
"""

# 1. Setup and Imports
import pandas as pd
import numpy as np
import time
import random
from math import radians, sin, cos, sqrt, atan2

# Bokeh for real-time visualizations
from bokeh.plotting import figure, show, curdoc
from bokeh.io import output_notebook, push_notebook
from bokeh.models import ColumnDataSource, LinearAxis, Range1d
from bokeh.layouts import column, row
from bokeh.palettes import Category10

# Pathway (simulated import - actual Pathway setup will differ)
# try:
#     import pathway as pw
# except ImportError:
#     print("Pathway not found. Please install it: pip install pathway")
#     print("Proceeding with simulated Pathway environment.")
#     # Mock Pathway objects for demonstration if Pathway is not installed
#     class MockPathway:
#         class io:
#             class csv:
#                 def read(self, *args, **kwargs):
#                     print("Mock Pathway CSV read called.")
#                     return self # Return self to allow chaining
#             class kafka:
#                 def read(self, *args, **kwargs):
#                     print("Mock Pathway Kafka read called.")
#                     return self # Return self to allow chaining
#         def run_until_exception(self, *args, **kwargs):
#             print("Mock Pathway run_until_exception called.")
#         def run(self, *args, **kwargs):
#             print("Mock Pathway run called.")
#         def select(self, *args, **kwargs):
#             print("Mock Pathway select called.")
#             return self
#         def transform(self, *args, **kwargs):
#             print("Mock Pathway transform called.")
#             return self
#         def stateful_map(self, *args, **kwargs):
#             print("Mock Pathway stateful_map called.")
#             return self
#         def map(self, *args, **kwargs):
#             print("Mock Pathway map called.")
#             return self
#         def groupby(self, *args, **kwargs):
#             print("Mock Pathway groupby called.")
#             return self
#         def reduce(self, *args, **kwargs):
#             print("Mock Pathway reduce called.")
#             return self
#         def debug_print(self, *args, **kwargs):
#             print("Mock Pathway debug_print called.")
#             return self
#         def from_dataframe(self, df):
#             print("Mock Pathway from_dataframe called.")
#             return self
#     pw = MockPathway()

# Ensure Bokeh plots appear in the notebook
output_notebook()

# 2. Global Constants
BASE_PRICE = 10.0  # Base price for parking
MIN_PRICE_FACTOR = 0.5 # Price should not be less than 0.5x base price
MAX_PRICE_FACTOR = 2.0 # Price should not be more than 2x base price

# Define a few mock parking spaces for simulation
# In a real scenario, these would come from your dataset.csv
MOCK_PARKING_SPACES = {
    'P1': {'lat': 11.0045, 'lon': 76.9616, 'capacity': 100, 'initial_price': BASE_PRICE},
    'P2': {'lat': 11.0050, 'lon': 76.9620, 'capacity': 80, 'initial_price': BASE_PRICE},
    'P3': {'lat': 11.0030, 'lon': 76.9600, 'capacity': 120, 'initial_price': BASE_PRICE},
    'P4': {'lat': 11.0060, 'lon': 76.9630, 'capacity': 90, 'initial_price': BASE_PRICE},
    'P5': {'lat': 11.0040, 'lon': 76.9590, 'capacity': 70, 'initial_price': BASE_PRICE}
}

# Initial state for each parking space
parking_lot_states = {
    pid: {
        'current_price': MOCK_PARKING_SPACES[pid]['initial_price'],
        'occupancy': random.randint(20, MOCK_PARKING_SPACES[pid]['capacity']),
        'queue_length': random.randint(0, 5),
        'traffic_congestion_level': random.uniform(0.1, 0.9),
        'is_special_day': random.choice([0, 1]),
        'incoming_vehicle_type': random.choice(['car', 'bike', 'truck']),
        'lat': MOCK_PARKING_SPACES[pid]['lat'],
        'lon': MOCK_PARKING_SPACES[pid]['lon'],
        'capacity': MOCK_PARKING_SPACES[pid]['capacity']
    } for pid in MOCK_PARKING_SPACES
}


# 3. Helper Functions
def haversine_distance(lat1, lon1, lat2, lon2):
    """
    Calculate the distance between two points on Earth using the Haversine formula.
    Args:
        lat1 (float): Latitude of point 1.
        lon1 (float): Longitude of point 1.
        lat2 (float): Latitude of point 2.
        lon2 (float): Longitude of point 2.
    Returns:
        float: Distance in kilometers.
    """
    R = 6371  # Radius of Earth in kilometers

    lat1_rad, lon1_rad, lat2_rad, lon2_rad = map(radians, [lat1, lon1, lat2, lon2])

    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad

    a = sin(dlat / 2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c
    return distance

def normalize_value(value, min_val, max_val):
    """
    Normalizes a value to a 0-1 range based on min/max.
    Args:
        value (float): The value to normalize.
        min_val (float): The minimum possible value.
        max_val (float): The maximum possible value.
    Returns:
        float: Normalized value between 0 and 1.
    """
    if max_val == min_val:
        return 0.0
    return (value - min_val) / (max_val - min_val)

def get_vehicle_type_weight(vehicle_type):
    """Assigns a weight based on vehicle type."""
    weights = {'car': 1.0, 'bike': 0.7, 'truck': 1.3} # Example weights, adjust as needed
    return weights.get(vehicle_type, 1.0) # Default to 1.0 if type not found

# 4. Pricing Logic Implementation (Core Requirements)

# Model 1: Baseline Linear Model
def calculate_price_model1(prev_price, occupancy, capacity):
    """
    Implements Model 1: Baseline Linear Model.
    Price_{t+1} = Price_t + alpha * (Occupancy / Capacity)

    Args:
        prev_price (float): The price from the previous time step.
        occupancy (int): Current number of parked vehicles.
        capacity (int): Maximum number of vehicles that can be parked.
    Returns:
        float: The calculated new price.
    """
    alpha = 5.0  # Coefficient for occupancy rate impact. Tune this value.
    occupancy_rate = occupancy / capacity if capacity > 0 else 0.0
    new_price = prev_price + alpha * occupancy_rate

    # Ensure price stays within reasonable bounds
    new_price = max(BASE_PRICE * MIN_PRICE_FACTOR, min(BASE_PRICE * MAX_PRICE_FACTOR, new_price))
    return new_price

# Model 2: Demand-Based Price Function
def calculate_price_model2(base_price, occupancy, capacity, queue_length, traffic_level, is_special_day, incoming_vehicle_type):
    """
    Implements Model 2: Demand-Based Price Function.
    Constructs a mathematical demand function and adjusts prices based on it.

    Demand = alpha_occ * (Occupancy/Capacity) + beta_queue * QueueLength +
             gamma_traffic * Traffic + delta_special * IsSpecialDay +
             epsilon_vehicle * VehicleTypeWeight

    Price = Base Price * (1 + lambda * NormalizedDemand)

    Args:
        base_price (float): The base price for the parking lot.
        occupancy (int): Current number of parked vehicles.
        capacity (int): Maximum number of vehicles that can be parked.
        queue_length (int): Vehicles waiting for entry.
        traffic_level (float): Nearby traffic congestion level (e.g., 0-1).
        is_special_day (int): 1 if special day (holiday/event), 0 otherwise.
        incoming_vehicle_type (str): Type of incoming vehicle ('car', 'bike', 'truck').
    Returns:
        tuple: (Calculated new price, raw demand value)
    """
    occupancy_rate = occupancy / capacity if capacity > 0 else 0.0
    vehicle_type_weight = get_vehicle_type_weight(incoming_vehicle_type)

    # Define your demand function coefficients - these need to be tuned!
    # These coefficients determine the sensitivity of demand to each feature.
    alpha_occ = 3.0   # Impact of occupancy rate on demand
    beta_queue = 0.8  # Impact of queue length on demand
    gamma_traffic = 1.5 # Impact of traffic congestion on demand
    delta_special = 2.5 # Impact of special day on demand (higher weight)
    epsilon_vehicle = 0.5 # Impact of vehicle type weight on demand

    # Construct the demand value
    raw_demand = (alpha_occ * occupancy_rate +
                  beta_queue * queue_length +
                  gamma_traffic * traffic_level +
                  delta_special * is_special_day +
                  epsilon_vehicle * vehicle_type_weight)

    # Normalize demand. These min/max values should ideally come from data analysis.
    # For simulation, we'll use assumed ranges.
    min_raw_demand = 0.0 # Assuming minimum possible demand is 0
    max_raw_demand = (alpha_occ * 1.0 + beta_queue * 10 + gamma_traffic * 1.0 +
                      delta_special * 1 + epsilon_vehicle * 1.3) # Max assumed values for features
    normalized_demand = normalize_value(raw_demand, min_raw_demand, max_raw_demand)

    # Lambda coefficient for price adjustment based on normalized demand. Tune this.
    lambda_val = 0.4

    # Calculate new price
    new_price = base_price * (1 + lambda_val * normalized_demand)

    # Ensure price variations are smooth and bounded
    new_price = max(base_price * MIN_PRICE_FACTOR, min(base_price * MAX_PRICE_FACTOR, new_price))

    return new_price, raw_demand

# Model 3: Competitive Pricing Model
def calculate_price_model3(current_lot_id, current_lot_data, all_parking_lot_states, price_model2):
    """
    Implements Model 3: Competitive Pricing Model.
    Factors in prices of nearby competitors.

    Args:
        current_lot_id (str): ID of the current parking lot.
        current_lot_data (dict): Dictionary of current data for the current lot.
        all_parking_lot_states (dict): Dictionary containing states of all parking lots.
        price_model2 (float): The price calculated by Model 2 for the current lot.
    Returns:
        float: The final price after considering competitive factors.
    """
    final_price = price_model2
    nearby_competitors = []
    proximity_threshold_km = 0.5 # Define what "nearby" means (e.g., within 0.5 km)

    # 1. Calculate geographic proximity of nearby parking spaces
    for other_lot_id, other_lot_data in all_parking_lot_states.items():
        if other_lot_id != current_lot_id:
            dist = haversine_distance(
                current_lot_data['lat'], current_lot_data['lon'],
                other_lot_data['lat'], other_lot_data['lon']
            )
            if dist <= proximity_threshold_km:
                nearby_competitors.append({
                    'id': other_lot_id,
                    'distance': dist,
                    'price': other_lot_data['current_price'], # Use their current price
                    'occupancy_rate': other_lot_data['occupancy'] / other_lot_data['capacity']
                })

    if not nearby_competitors:
        return final_price # No competitors nearby, return Model 2 price

    # Sort competitors by distance (closest first)
    nearby_competitors.sort(key=lambda x: x['distance'])

    # 2. Implement competitive logic
    current_occupancy_rate = current_lot_data['occupancy'] / current_lot_data['capacity']

    # Scenario 1: Your lot is full/near full and nearby lots are cheaper
    if current_occupancy_rate > 0.90: # If lot is more than 90% full
        for comp in nearby_competitors:
            if comp['price'] < final_price * 0.95: # If a nearby lot is significantly cheaper
                # Suggest rerouting (conceptual, would be a message in a real app)
                # Or slightly reduce price to avoid losing customers entirely if they're willing to wait
                final_price *= 0.98 # Small price reduction
                # print(f"  {current_lot_id}: Near full, nearby {comp['id']} is cheaper. Reducing price.")
                break # Act on the first significantly cheaper competitor

    # Scenario 2: If nearby lots are expensive, your price can increase while still being attractive
    # Consider average price of closest competitors
    if nearby_competitors:
        avg_comp_price = np.mean([comp['price'] for comp in nearby_competitors])
        if avg_comp_price > final_price * 1.1: # If average competitor price is 10% higher
            final_price *= 1.02 # Small price increase
            # print(f"  {current_lot_id}: Nearby competitors expensive. Slightly increasing price.")

    # Apply bounds again after competitive adjustments
    final_price = max(BASE_PRICE * MIN_PRICE_FACTOR, min(BASE_PRICE * MAX_PRICE_FACTOR, final_price))
    return final_price


# 5. Mock Data Generator (Simulates real-time stream)
def generate_mock_data_for_parking_lot(parking_lot_id, current_state):
    """
    Generates mock real-time data for a single parking lot.
    Simulates changes in occupancy, queue, traffic, etc.
    """
    capacity = current_state['capacity']

    # Simulate occupancy changes: tendency towards full or empty based on time/randomness
    # For simplicity, let's make it fluctuate around a mid-point, but with spikes
    current_occupancy = current_state['occupancy']
    change = random.randint(-10, 10) # Random change in occupancy
    new_occupancy = max(0, min(capacity, current_occupancy + change))
    # Introduce some "events" that might cause spikes
    if random.random() < 0.1: # 10% chance of a large influx/outflux
        new_occupancy = max(0, min(capacity, new_occupancy + random.randint(-30, 30)))

    new_queue_length = max(0, min(15, current_state['queue_length'] + random.randint(-1, 2)))
    new_traffic_congestion_level = max(0.1, min(0.9, current_state['traffic_congestion_level'] + random.uniform(-0.1, 0.1)))
    new_is_special_day = random.choices([0, 1], weights=[0.95, 0.05], k=1)[0] # Mostly not a special day
    new_incoming_vehicle_type = random.choices(['car', 'bike', 'truck'], weights=[0.7, 0.2, 0.1], k=1)[0]

    return {
        'parking_lot_id': parking_lot_id,
        'latitude': current_state['lat'],
        'longitude': current_state['lon'],
        'capacity': capacity,
        'occupancy': new_occupancy,
        'queue_length': new_queue_length,
        'traffic_congestion_level': new_traffic_congestion_level,
        'is_special_day': new_is_special_day,
        'incoming_vehicle_type': new_incoming_vehicle_type,
        'timestamp': time.time() # Current timestamp
    }

# 6. Bokeh Visualization Setup
# Create a ColumnDataSource for each parking lot to hold real-time data
sources = {pid: ColumnDataSource(data=dict(time=[], price=[], occupancy=[], demand=[]))
           for pid in MOCK_PARKING_SPACES}

# Create a figure for each parking lot's price
plots = {}
handles = {} # To store plot handles for updating
colors = Category10[len(MOCK_PARKING_SPACES)] # Get distinct colors

for i, (pid, _) in enumerate(MOCK_PARKING_SPACES.items()):
    p = figure(
        x_axis_label='Time (seconds)',
        y_axis_label='Price ($)',
        title=f'Real-time Price for Parking Lot {pid}',
        height=300,
        width=800,
        x_axis_type='datetime', # For better time display
        tools="pan,wheel_zoom,box_zoom,reset,save"
    )

    # Add a second y-axis for occupancy rate
    p.extra_y_ranges = {"occupancy_range": Range1d(start=0, end=1.1)}
    p.add_layout(LinearAxis(y_range_name="occupancy_range", axis_label="Occupancy Rate"), 'right')

    # Price line
    price_line = p.line(
        x='time',
        y='price',
        source=sources[pid],
        line_width=2,
        color=colors[i],
        legend_label=f'{pid} Price'
    )

    # Occupancy rate line
    occupancy_line = p.line(
        x='time',
        y='occupancy',
        source=sources[pid],
        line_width=1,
        color='gray',
        line_dash='dashed',
        legend_label=f'{pid} Occupancy Rate',
        y_range_name="occupancy_range"
    )

    p.legend.location = "top_left"
    p.legend.click_policy="hide" # Allows hiding lines by clicking legend
    plots[pid] = p
    handles[pid] = {'price': price_line, 'occupancy': occupancy_line} # Store handles if needed for dynamic updates

# Combine all plots into a column layout
layout = column(*plots.values())

# Show the initial plots
# This will render the plots statically. For true real-time updates in a browser,
# you'd typically run a Bokeh server or use push_notebook in a live Jupyter env.
# For Colab, `push_notebook()` is often used with `show(notebook_handle=True)`.
# For simplicity, we'll re-show the plot in the loop, which will redraw it.
# In a real Pathway setup, you'd feed data to Bokeh's `stream` method.

# show(layout, notebook_handle=True) # Use this for live updates in Jupyter/Colab
# If using push_notebook, store the handle:
# plot_handle = show(layout, notebook_handle=True)


# 7. Simulated Real-Time Loop
print("Starting real-time pricing simulation...")
print("This simulation runs for a few iterations. In a real scenario, Pathway would handle continuous streaming.")

num_iterations = 50 # Number of simulated time steps
update_interval_seconds = 1 # How often to simulate a new data point

# Keep track of the start time for plotting x-axis
start_time = time.time()

for i in range(num_iterations):
    print(f"\n--- Iteration {i+1} ---")
    current_time_offset = time.time() - start_time # Time elapsed since start

    # Generate data for all parking lots and update their prices
    for pid in MOCK_PARKING_SPACES:
        # Generate new mock data for the current lot
        new_data_point = generate_mock_data_for_parking_lot(pid, parking_lot_states[pid])

        # Update the state of the current lot with new mock data
        parking_lot_states[pid].update({
            'occupancy': new_data_point['occupancy'],
            'queue_length': new_data_point['queue_length'],
            'traffic_congestion_level': new_data_point['traffic_congestion_level'],
            'is_special_day': new_data_point['is_special_day'],
            'incoming_vehicle_type': new_data_point['incoming_vehicle_type']
        })

        # Calculate prices using the models
        # Model 1
        price_m1 = calculate_price_model1(
            parking_lot_states[pid]['current_price'], # Use the last calculated price as prev_price
            parking_lot_states[pid]['occupancy'],
            parking_lot_states[pid]['capacity']
        )

        # Model 2
        price_m2, raw_demand = calculate_price_model2(
            BASE_PRICE, # Model 2 uses base price, not previous price
            parking_lot_states[pid]['occupancy'],
            parking_lot_states[pid]['capacity'],
            parking_lot_states[pid]['queue_length'],
            parking_lot_states[pid]['traffic_congestion_level'],
            parking_lot_states[pid]['is_special_day'],
            parking_lot_states[pid]['incoming_vehicle_type']
        )

        # Model 3 (Optional) - uses Model 2 price as base
        price_m3 = calculate_price_model3(
            pid,
            parking_lot_states[pid],
            parking_lot_states, # Pass all states for competitive analysis
            price_m2
        )

        # Choose which model's price to use for the final output
        # For this simulation, let's use Model 3 if implemented, otherwise Model 2.
        final_price = price_m3 # Or price_m2 if Model 3 is not used/desired

        # Update the current price in the state
        parking_lot_states[pid]['current_price'] = final_price

        # Prepare data for Bokeh ColumnDataSource
        new_bokeh_data = dict(
            time=[(start_time + current_time_offset) * 1000], # Bokeh expects milliseconds
            price=[final_price],
            occupancy=[parking_lot_states[pid]['occupancy'] / parking_lot_states[pid]['capacity']],
            demand=[raw_demand] # You might want to plot normalized demand too
        )

        # Stream new data to Bokeh ColumnDataSource
        sources[pid].stream(new_bokeh_data, rollover=50) # Keep last 50 data points for visualization

        print(f"  Lot {pid}: Occupancy={parking_lot_states[pid]['occupancy']}/{parking_lot_states[pid]['capacity']}"
              f" ({parking_lot_states[pid]['occupancy'] / parking_lot_states[pid]['capacity']:.2f}), "
              f"Queue={parking_lot_states[pid]['queue_length']}, "
              f"Traffic={parking_lot_states[pid]['traffic_congestion_level']:.2f}, "
              f"SpecialDay={parking_lot_states[pid]['is_special_day']}, "
              f"Vehicle={parking_lot_states[pid]['incoming_vehicle_type']}, "
              f"RawDemand={raw_demand:.2f}, Price={final_price:.2f}")

    # Update the Bokeh plot (re-show or push_notebook)
    # If using `show(notebook_handle=True)` and `push_notebook(handle=plot_handle)`:
    # push_notebook(handle=plot_handle)
    # For a simpler Colab setup, re-showing the plot will redraw it with new data.
    # This can be slow for many iterations but works for demonstration.
    if i == num_iterations - 1: # Only show the final plot, or you can show it every few iterations
        show(layout)

    time.sleep(update_interval_seconds)

print("\nSimulation finished.")
print("Remember to replace the mock data and simulation loop with your actual Pathway integration.")
print("Tune the model coefficients for optimal pricing behavior based on your dataset.")

Starting real-time pricing simulation...
This simulation runs for a few iterations. In a real scenario, Pathway would handle continuous streaming.

--- Iteration 1 ---
  Lot P1: Occupancy=82/100 (0.82), Queue=5, Traffic=0.14, SpecialDay=0, Vehicle=car, RawDemand=7.18, Price=11.83
  Lot P2: Occupancy=57/80 (0.71), Queue=0, Traffic=0.10, SpecialDay=0, Vehicle=truck, RawDemand=2.94, Price=10.75
  Lot P3: Occupancy=26/120 (0.22), Queue=3, Traffic=0.17, SpecialDay=0, Vehicle=car, RawDemand=3.81, Price=10.97
  Lot P4: Occupancy=67/90 (0.74), Queue=4, Traffic=0.72, SpecialDay=0, Vehicle=car, RawDemand=7.02, Price=11.79
  Lot P5: Occupancy=69/70 (0.99), Queue=3, Traffic=0.58, SpecialDay=0, Vehicle=car, RawDemand=6.73, Price=11.49

--- Iteration 2 ---
  Lot P1: Occupancy=86/100 (0.86), Queue=6, Traffic=0.21, SpecialDay=0, Vehicle=car, RawDemand=8.19, Price=12.09
  Lot P2: Occupancy=59/80 (0.74), Queue=2, Traffic=0.12, SpecialDay=0, Vehicle=car, RawDemand=4.49, Price=11.15
  Lot P3: Occupancy=30


Simulation finished.
Remember to replace the mock data and simulation loop with your actual Pathway integration.
Tune the model coefficients for optimal pricing behavior based on your dataset.
