<a href="https://colab.research.google.com/github/Ashish-Rawat7/Dynamic-Pricing-for-Urban-Parking-Lots/blob/main/Pricing_for_Urban_Parking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dynamic Pricing for Urban Parking Lots

**Capstone Project – Summer Analytics 2025**

This notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [5]:
!pip install pathway bokeh --quiet

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [7]:
df = pd.read_csv('/content/dataset.csv')
df.head()

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00


In [9]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [10]:
df[["Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType", "Latitude", "Longitude"]].to_csv("parking_stream.csv", index=False)

In [11]:
base_price = 10.0
def baseline_linear_model(prev_price, occupancy, capacity):
    alpha = 0.1  # Price increase factor
    price_increase = alpha * (occupancy / capacity)
    return prev_price + price_increase

In [12]:
def demand_based_model(occupancy, capacity, queue_length, traffic_condition_nearby, is_special_day, vehicle_type):
    alpha = 0.5
    beta = 0.3
    gamma = 0.2
    delta = 1.0
    epsilon = 0.1

    # Calculate demand
    occupancy_rate = occupancy / capacity
    vehicle_weight = {'car': 1.0, 'bike': 0.5, 'truck': 1.5}.get(vehicle_type, 1.0)
    demand = (alpha * occupancy_rate) + (beta * queue_length) - (gamma * traffic_condition_nearby) + (delta * is_special_day) + (epsilon * vehicle_weight)

    # Normalize demand (between 0 and 1)
    normalized_demand = max(0, min(1, demand))

    # Adjust price with smoothing (bounded between 0.5x and 2x base price)
    lambda_factor = 0.5
    price = base_price * (1 + lambda_factor * normalized_demand)
    return max(base_price * 0.5, min(base_price * 2, price))

In [13]:
def competitive_pricing_model(occupancy, capacity, queue_length, traffic_condition_nearby, vehicle_type, competitor_prices, proximity):
    demand_price = demand_based_model(occupancy, capacity, queue_length, traffic_condition_nearby, 0, vehicle_type) # Assuming IsSpecialDay is 0 for competitive pricing context

    # Calculate weighted average of competitor prices based on proximity
    if proximity and len(competitor_prices) > 0:
        total_weight = sum(1/p for p in proximity if p > 0)
        weighted_price = sum(p * (1/d) for p, d in zip(competitor_prices, proximity) if d > 0) / total_weight if total_weight > 0 else np.mean(competitor_prices)
    else:
        weighted_price = np.mean(competitor_prices) if competitor_prices else demand_price

    # Adjust price based on competition
    if occupancy / capacity >= 0.9 and weighted_price < demand_price:
        return max(base_price * 0.5, demand_price * 0.9)  # Reduce if full and competitors are cheaper
    elif weighted_price > demand_price:
        return min(base_price * 2, demand_price * 1.1)  # Increase if competitors are expensive
    return demand_price

In [14]:
class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str
    Latitude: float
    Longitude: float

In [15]:
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [16]:
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [17]:
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
        VehicleType=pw.reducers.argmax(pw.this.Occupancy, pw.this.VehicleType),
        TrafficConditionNearby=pw.reducers.argmax(pw.this.Occupancy, pw.this.TrafficConditionNearby),
        QueueLength=pw.reducers.max(pw.this.QueueLength),
        IsSpecialDay=pw.reducers.max(pw.this.IsSpecialDay),
        Latitude=pw.reducers.max(pw.this.Latitude),
        Longitude=pw.reducers.max(pw.this.Longitude)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

In [25]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [26]:
pw.run()

Output()

