Notebook Title: Real-Time Dynamic Parking Pricing with Pathway

In [54]:
!pip install pathway bokeh --quiet

Imports and Panel Initialization

In [55]:
import pathway as pw #pathway is used for real-time data processing with streaming and windowing support.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt     #datetime helps in setting tumbling windows (e.g., 1 day).
import bokeh.plotting
import panel as pn
pn.extension()

In [56]:
df = pd.read_csv('sample_data/dataset.csv')
print(df.head(3))
print(df.info())

   ID SystemCodeNumber  Capacity   Latitude  Longitude  Occupancy VehicleType  \
0   0      BHMBCCMKT01       577  26.144536  91.736172         61         car   
1   1      BHMBCCMKT01       577  26.144536  91.736172         64         car   
2   2      BHMBCCMKT01       577  26.144536  91.736172         80         car   

  TrafficConditionNearby  QueueLength  IsSpecialDay LastUpdatedDate  \
0                    low            1             0      04-10-2016   
1                    low            1             0      04-10-2016   
2                    low            2             0      04-10-2016   

  LastUpdatedTime  
0        07:59:00  
1        08:25:00  
2        08:59:00  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18368 entries, 0 to 18367
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   ID                      18368 non-null  int64  
 1   SystemCodeNumber        18368 non-nul

In [57]:
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'], format='%d-%m-%Y %H:%M:%S')


In [58]:
cols = ["SystemCodeNumber", "Latitude", "Longitude", "Capacity", "Occupancy", "QueueLength",
        "TrafficConditionNearby", "IsSpecialDay", "VehicleType", "Timestamp"]
df = df[cols]


In [59]:
df.to_csv("parking_stream.csv", index=False)


In [60]:
class ParkingSchema(pw.Schema):
    SystemCodeNumber: str
    Latitude: float
    Longitude: float
    Capacity: int
    Occupancy: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str
    Timestamp: str


In [61]:
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)


In [62]:
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


Define UDFs for Pricing Models


In [63]:
@pw.udf
def model1_price(occ, cap):
    return 10 + 5 * (occ / cap)
#Model 1 uses only occupancy and capacity to increase price linearly from a base of ₹10.

In [64]:
@pw.udf
def model2_price(occ, cap, queue, traffic, vehicle, special_day):
    traffic_map = {"low": 1, "medium": 2, "high": 3}
    vehicle_map = {"bike": 0.8, "car": 1.0, "truck": 1.2}

    traffic_weight = traffic_map.get(traffic, 2)
    vehicle_weight = vehicle_map.get(vehicle, 1.0)

    demand = (occ / cap) + 0.2 * queue + 0.3 * traffic_weight + 0.5 * special_day + 0.3 * vehicle_weight
    norm_demand = min(2.0, max(0.5, demand))  # bound between 0.5x and 2x
    return 10 * norm_demand
# Model 2 adds queue length, traffic condition, vehicle type, and special day to the formula.
# Demand factor adjusts price range between ₹5 and ₹20.

In [65]:
@pw.udf
def model3_price(occ_max, occ_min, cap, queue_avg, traffic, vehicle, special_day):
    traffic_map = {"low": 0.0, "medium": 0.2, "high": 0.4}
    vehicle_map = {"bike": 0.0, "car": 0.1, "truck": 0.2}

    traffic_factor = traffic_map.get(traffic, 0.2)
    vehicle_factor = vehicle_map.get(vehicle, 0.1)
    special_factor = 0.2 if special_day else 0.0

    volatility = (occ_max - occ_min) / cap
    queue_factor = 0.1 * queue_avg

    surge = volatility + queue_factor + traffic_factor + vehicle_factor + special_factor
    return 10 + surge
# Model 3 uses daily occupancy fluctuation as a measure of demand volatility to adjust price.

Tumbling Window Aggregation for Daily Analysis

In [66]:
# Group by Lot (SystemCodeNumber) + Day and Apply Pricing Logic
final_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.SystemCodeNumber + "_" + pw.this.day,
        window=pw.temporal.tumbling(dt.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        SystemCodeNumber=pw.reducers.any(pw.this.SystemCodeNumber),
        day=pw.reducers.any(pw.this.day),
        t=pw.this._pw_window_end,
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
        occ_sum=pw.reducers.sum(pw.this.Occupancy),
        occ_count=pw.reducers.count(),
        queue_sum=pw.reducers.sum(pw.this.QueueLength),
        traffic=pw.reducers.any(pw.this.TrafficConditionNearby),
        vehicle=pw.reducers.any(pw.this.VehicleType),
        special_day=pw.reducers.any(pw.this.IsSpecialDay),
    )
)

#  Compute intermediate columns first
final_window = final_window.with_columns(
    occ_avg=pw.this.occ_sum / pw.this.occ_count,
    queue_avg=pw.this.queue_sum / pw.this.occ_count
)

#  Apply pricing models using occ_avg and queue_avg
final_window = final_window.with_columns(
    Price_Model1=10 + (pw.this.occ_avg / pw.this.cap),
    Price_Model2=model2_price(
        occ=pw.this.occ_avg,
        cap=pw.this.cap,
        queue=pw.this.queue_avg,
        traffic=pw.this.traffic,
        vehicle=pw.this.vehicle,
        special_day=pw.this.special_day
    ),
    Price_Model3=model3_price(
        occ_max=pw.this.occ_max,
        occ_min=pw.this.occ_min,
        cap=pw.this.cap,
        queue_avg=pw.this.queue_avg,
        traffic=pw.this.traffic,
        vehicle=pw.this.vehicle,
        special_day=pw.this.special_day
    )
)


Plot Prices Using Bokeh + Panel

In [67]:
import bokeh.plotting
import panel as pn
pn.extension()

def multi_price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Parking Price Comparison: Model 1 vs Model 2 vs Model 3",
        x_axis_type="datetime"
    )

    # Model 1
    fig.line("t", "Price_Model1", source=source, legend_label="Model 1", line_color="blue", line_width=2)
    fig.circle("t", "Price_Model1", source=source, color="blue", size=5)

    # Model 2
    fig.line("t", "Price_Model2", source=source, legend_label="Model 2", line_color="green", line_width=2)
    fig.circle("t", "Price_Model2", source=source, color="green", size=5)

    # Model 3
    fig.line("t", "Price_Model3", source=source, legend_label="Model 3", line_color="orange", line_width=2)
    fig.circle("t", "Price_Model3", source=source, color="orange", size=5)

    fig.legend.location = "top_left"
    fig.xaxis.axis_label = "Date"
    fig.yaxis.axis_label = "Price"
    return fig
#Visualizes how each pricing model responds over time using colored lines.

# Bind Pathway data stream to visualization
viz = final_window.plot(multi_price_plotter, sorting_col="t")

# Display
pn.Column(viz).servable()




Execute Pathway Pipeline

In [68]:
%%capture --no-display
pw.run() # Launches the streaming engine and starts processing real-time parking data.

Output()

