# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m34.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m67.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [164]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [165]:
# prompt: find unique values in TrafficConditionNearby

df['TrafficConditionNearby'].unique()

array(['low', 'high', 'average'], dtype=object)

In [166]:

# Define mapping from traffic categories to numeric values
traffic_map = {
    "low": 1.0,
    "average": 2.0,
    "high": 3.0
}

# Normalize case and map traffic values
df["TrafficConditionNearby"] = (
    df["TrafficConditionNearby"]
    .astype(str)
    .str.lower()
    .map(traffic_map)
)

# Drop rows where mapping failed (invalid category)
df = df.dropna(subset=["TrafficConditionNearby"])

# Ensure numeric types for all relevant columns
df = df.astype({
    "TrafficConditionNearby": float,
    "QueueLength": int,
    "Capacity": int
})


In [167]:
# See where the string "average" appears
mask = df.apply(lambda col: col.astype(str).str.contains("high", case=False)).any(axis=1)

# Show rows causing the issue
df[mask]


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime


In [168]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [169]:
# Combine and parse original columns
df['Timestamp'] = pd.to_datetime(
    df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)

# Reformat to ISO-style for Pathway parsing
df['Timestamp'] = df['Timestamp'].dt.strftime("%Y-%m-%d %H:%M:%S")

# Sort chronologically
df = df.sort_values('Timestamp').reset_index(drop=True)


In [170]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[[
    "Timestamp",
    "SystemCodeNumber",
    "Occupancy",
    "Capacity",
    "QueueLength",
    "TrafficConditionNearby",
    "IsSpecialDay",
    "VehicleType"
]].to_csv("parking_stream.csv", index=False)


In [171]:
print(df["TrafficConditionNearby"].unique())  # should show only: [1.0, 2.0, 3.0]


[1. 2. 3.]


In [172]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: float
    IsSpecialDay: int
    VehicleType: str


In [173]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [174]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 'parsed_time' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    parsed_time = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

Model 0: Daily Window (Baseline)
Based on daily fluctuation:

(occ_max - occ_min)/capacity

Simple, interpretable, but not responsive

In [175]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.parsed_time,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


Model 1: Row-wise Price Function

Price
=
10
+
𝛼
⋅
(
Occupancy
Capacity
)
+
𝛽
⋅
QueueLength
1.5
Price=10+α⋅(
Capacity
Occupancy
​
 )+β⋅QueueLength
1.5

Responds to live occupancy and queue

Bounded output: $5 ≤ \text{Price} ≤ 20$

In [176]:
@pw.udf
def model_1_price(occupancy: int, capacity: int, queue: int) -> float:
    base_price = 10
    alpha = 0.6
    beta = 0.4

    occupancy_ratio = occupancy / capacity
    queue_weighted = min(queue**1.5 if queue >= 4 else queue, 5)

    price = base_price + alpha * occupancy_ratio + beta * queue_weighted
    return min(max(price, 5), 20)


In [177]:
model_1_price_table = data_with_time.select(
    lot_id = data_with_time.SystemCodeNumber,
    timestamp = data_with_time.parsed_time,
    price = model_1_price(
        data_with_time.Occupancy,
        data_with_time.Capacity,
        data_with_time.QueueLength
    )
)


Model 2: Demand-Based Price Function
Demand
=
𝑓
(
Occupancy
,
Queue
,
Traffic
,
IsSpecialDay
,
VehicleType
)
Demand=f(Occupancy,Queue,Traffic,IsSpecialDay,VehicleType)

Price
=
10
⋅
(
1
+
𝜆
⋅
Sigmoid
(
𝐷
𝑒
𝑚
𝑎
𝑛
𝑑
)
)
Price=10⋅(1+λ⋅Sigmoid(Demand))
Smooth, bounded variation

Incorporates real-time features using a custom sigmoid demand model

In [178]:
@pw.udf
def model_2_price(
    occupancy: int,
    capacity: int,
    queue: int,
    traffic: float,
    is_special_day: int,
    vehicle_type: str
) -> float:
    import numpy as np

    vehicle_weights = {'car': 1.0, 'bike': 0.5, 'truck': 1.5, 'cycle': 0.2}
    weight = vehicle_weights.get(vehicle_type, 1.0)

    occ_ratio = occupancy / capacity
    vehicle_risk = weight / capacity
    event_traffic = is_special_day * traffic
    queue_weighted = min(queue**1.5 if queue >= 4 else queue, 5)

    x = (
        0.7 * occ_ratio +
        0.3 * queue_weighted +
        0.3 * traffic +
        0.3 * event_traffic +
        0.2 * vehicle_risk
    )

    demand = 1 / (1 + np.exp(-4 * (x - 1.5)))
    price = 10 * (1 + demand)
    return min(max(price, 5), 20)


In [179]:
model_2_price_table = data_with_time.select(
    lot_id = data_with_time.SystemCodeNumber,
    timestamp = data_with_time.parsed_time,
    price = model_2_price(
        data_with_time.Occupancy,
        data_with_time.Capacity,
        data_with_time.QueueLength,
        data_with_time.TrafficConditionNearby,
        data_with_time.IsSpecialDay,
        data_with_time.VehicleType
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [180]:
# Activate Bokeh + Panel extension
pn.extension()

# Generic plotter for any price table
def price_plotter(source, title=""):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title=title,
        x_axis_type="datetime",
    )
    fig.line("timestamp", "price", source=source, line_width=2, color="green")
    fig.circle("timestamp", "price", source=source, size=6, color="blue")
    return fig

# Plot for Model 0 (delta_window uses 't' instead of 'timestamp')
def baseline_plot(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Model 0 (Daily Window): Price",
        x_axis_type="datetime"
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")
    fig.circle("t", "price", source=source, size=6, color="red")
    return fig

# Create plots for each model
plot_model_0 = delta_window.plot(baseline_plot, sorting_col="t")
plot_model_1 = model_1_price_table.plot(lambda s: price_plotter(s, "Model 1: Row-wise Price"), sorting_col="timestamp")
plot_model_2 = model_2_price_table.plot(lambda s: price_plotter(s, "Model 2: Demand-Based Price"), sorting_col="timestamp")

# Combine into a Panel layout
pn.Column(
    "# 🧭 Dynamic Pricing Models – Pathway Streaming",
    "## Model 0: Daily Tumbling Window",
    plot_model_0,
    "## Model 1: Row-wise Occupancy + Queue",
    plot_model_1,
    "## Model 2: Demand-Based (with Traffic, VehicleType)",
    plot_model_2
).servable()




In [181]:
# Activate Panel extension
pn.extension()

# Shared plotting function
def price_plotter(source, title=""):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title=title,
        x_axis_type="datetime",
    )
    fig.line("timestamp", "price", source=source, line_width=2, color="green")
    fig.circle("timestamp", "price", source=source, size=6, color="blue")
    return fig

# Special plotter for Model 0 (uses 't')
def baseline_plot(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Model 0: Daily Window Price",
        x_axis_type="datetime",
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")
    fig.circle("t", "price", source=source, size=6, color="red")
    return fig

# Three separate visualizations
plot_model_0 = delta_window.plot(baseline_plot, sorting_col="t")
plot_model_1 = model_1_price_table.plot(lambda s: price_plotter(s, "Model 1: Row-wise Price"), sorting_col="timestamp")
plot_model_2 = model_2_price_table.plot(lambda s: price_plotter(s, "Model 2: Demand-Based Price"), sorting_col="timestamp")

# Layout for display
pn.Column(
    "# 📈 Parking Lot Pricing Models",
    "### Model 0: Daily Window",
    plot_model_0,
    "### Model 1: Row-wise Queue & Occupancy",
    plot_model_1,
    "### Model 2: Demand Function (Traffic, Vehicle Type, etc.)",
    plot_model_2
).servable()




In [182]:
%%capture --no-display
pw.run()


Output()



In [183]:
selected_lot = "BHMBCCMKT01"

In [184]:
# Filter each model output for the selected lot
model_1_filtered = model_1_price_table.filter(pw.this.lot_id == selected_lot)
model_2_filtered = model_2_price_table.filter(pw.this.lot_id == selected_lot)
# Model 0 is already daily windowed (not per-lot), so skip filtering


In [185]:
def plot_model(source, title, x_col="timestamp", color="green"):
    fig = bokeh.plotting.figure(
        width=600,
        height=300,
        title=title,
        x_axis_type="datetime",
    )
    fig.line(x_col, "price", source=source, line_width=2, color=color)
    fig.circle(x_col, "price", source=source, size=6, color=color)
    return fig


In [186]:
# Plots for each model
plot0 = delta_window.plot(lambda s: plot_model(s, "Model 0 (Daily)", x_col="t", color="navy"), sorting_col="t")
plot1 = model_1_filtered.plot(lambda s: plot_model(s, f"Model 1: {selected_lot}", color="orange"), sorting_col="timestamp")
plot2 = model_2_filtered.plot(lambda s: plot_model(s, f"Model 2: {selected_lot}", color="green"), sorting_col="timestamp")

# Combine into layout
pn.Column(
    f"# 📍 Price Comparison for {selected_lot}",
    "### 🔵 Model 0: Daily Tumbling Window",
    plot0,
    "### 🟠 Model 1: Occupancy + Queue",
    plot1,
    "### 🟢 Model 2: Demand-Based Pricing",
    plot2
).servable()




In [187]:
%%capture --no-display
pw.run()


Output()

