# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [5]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [7]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [8]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [9]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [10]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [11]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [12]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [15]:
import datetime

# Model 1: Baseline Linear Pricing Model wrapped as UDF
@pw.udf
def baseline_price(occ_max, cap, base_price=10, alpha=0.5):
    occupancy_ratio = occ_max / cap if cap > 0 else 0
    price = base_price + alpha * occupancy_ratio
    return max(base_price * 0.5, min(price, base_price * 2))

# Model 2: Demand-Based Model (optional — add more features if available)
@pw.udf
def demand_based_price(occ_max, cap, occ_min, base_price=10, λ=0.5):
    α = 0.6
    β = 0.4
    fluctuation = (occ_max - occ_min) / cap if cap > 0 else 0
    demand = α * (occ_max / cap) + β * fluctuation
    demand_norm = min(1.0, max(0.0, demand / 10))
    price = base_price * (1 + λ * demand_norm)
    return max(base_price * 0.5, min(price, base_price * 2))

# Tumbling daily window with price computation added
delta_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
    )
    .with_columns(
        # Add price from Model 1 (baseline linear)
        price=baseline_price(pw.this.occ_max, pw.this.cap),

        # Optional: add price2 column from Model 2 (demand-based)
        price2=demand_based_price(pw.this.occ_max, pw.this.cap, pw.this.occ_min)
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [23]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="green")

    # Overlay red circles at each data point for better visibility
    fig.scatter("t", "price", source=source, size=6, color="cyan")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

In [24]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price2", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.scatter("t", "price2", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

In [32]:
def price_comparison_plot(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Model 1 vs Model 2 Pricing",
        x_axis_type="datetime",
    )

    fig.line("t", "price", source=source, line_width=2, color="green", legend_label="Model 1 (Linear)")
    fig.circle("t", "price", source=source, size=6, color="green")

    fig.line("t", "price2", source=source, line_width=2, color="blue", legend_label="Model 2 (Demand-Based)")
    fig.square("t", "price2", source=source, size=6, color="blue")

    fig.legend.location = "top_left"
    return fig


In [27]:
def price_vs_occupancy_plot(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Price vs Occupancy",
        x_axis_label="Occupancy (Max)",
        y_axis_label="Price",
    )

    fig.circle("occ_max", "price", source=source, size=8, color="orange", legend_label="Model 1")
    fig.triangle("occ_max", "price2", source=source, size=8, color="red", legend_label="Model 2")

    fig.legend.location = "top_left"
    return fig


In [28]:
@pw.udf
def fluctuation(occ_max, occ_min):
    return occ_max - occ_min

# Add this column to delta_window
delta_window = delta_window.with_columns(demand_fluctuation=fluctuation(pw.this.occ_max, pw.this.occ_min))

def fluctuation_plot(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Demand Fluctuation Over Time",
        x_axis_type="datetime",
        y_axis_label="Occupancy Fluctuation",
    )

    fig.line("t", "demand_fluctuation", source=source, line_width=2, color="crimson")
    fig.circle("t", "demand_fluctuation", source=source, size=6, color="black")

    return fig


In [29]:
def competitor_price_plot(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Your Price vs Competitor Avg Price",
        x_axis_type="datetime",
    )

    fig.line("t", "price", source=source, color="green", line_width=2, legend_label="Your Lot Price")
    fig.line("t", "competitor_avg_price", source=source, color="gray", line_dash="dashed", line_width=2, legend_label="Nearby Avg Price")

    fig.legend.location = "top_left"
    return fig


In [36]:
viz1 = delta_window.plot(price_comparison_plot, sorting_col="t")
viz2 = delta_window.plot(price_vs_occupancy_plot, sorting_col="t")
viz3 = delta_window.plot(fluctuation_plot, sorting_col="t")

# If competitor pricing is added:
viz4 = delta_window.plot(competitor_price_plot, sorting_col="t")

pn.Column(
    "# Parking Lot Pricing Dashboard",
    pn.Spacer(height=10),
    pn.pane.Markdown("### Model Comparison"),
    viz1,
    pn.pane.Markdown("### Price vs Occupancy"),
    viz2,
    pn.pane.Markdown("### Demand Fluctuation"),
    viz3,
    pn.pane.Markdown("### Competitor Price Comparison"),
     viz4
).servable()




In [37]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

