# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m41.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m61.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
df.isnull().sum()

Unnamed: 0,0
ID,0
SystemCodeNumber,0
Capacity,0
Latitude,0
Longitude,0
Occupancy,0
VehicleType,0
TrafficConditionNearby,0
QueueLength,0
IsSpecialDay,0


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18368 entries, 0 to 18367
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   ID                      18368 non-null  int64  
 1   SystemCodeNumber        18368 non-null  object 
 2   Capacity                18368 non-null  int64  
 3   Latitude                18368 non-null  float64
 4   Longitude               18368 non-null  float64
 5   Occupancy               18368 non-null  int64  
 6   VehicleType             18368 non-null  object 
 7   TrafficConditionNearby  18368 non-null  object 
 8   QueueLength             18368 non-null  int64  
 9   IsSpecialDay            18368 non-null  int64  
 10  LastUpdatedDate         18368 non-null  object 
 11  LastUpdatedTime         18368 non-null  object 
dtypes: float64(2), int64(5), object(5)
memory usage: 1.7+ MB


In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
#  Sort by Timestamp
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# TrafficConditionNearby label encoding (encoding categorical data)
from sklearn.preprocessing import LabelEncoder as LabelEncodeer
df['TrafficConditionNearby'] = LabelEncodeer().fit_transform(df['TrafficConditionNearby'])
df['TrafficConditionNearby']

Unnamed: 0,TrafficConditionNearby
0,2
1,0
2,2
3,2
4,2
...,...
18363,0
18364,0
18365,2
18366,2


In [None]:
df['VehicleType'] = LabelEncodeer().fit_transform(df['VehicleType'])
df['VehicleType']

Unnamed: 0,VehicleType
0,1
1,1
2,0
3,1
4,1
...,...
18363,1
18364,1
18365,1
18366,0


In [None]:
# Drop unnecessary columns
df.drop(columns=['LastUpdatedDate', 'LastUpdatedTime'], inplace=True)

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
print(" Preprocessed DataFrame:")
print(df.head())

 Preprocessed DataFrame:
      ID  SystemCodeNumber  Capacity   Latitude  Longitude  Occupancy  \
0      0       BHMBCCMKT01       577  26.144536  91.736172         61   
1  15744    Others-CCCPS98      3103  26.147500  91.727978        588   
2  13120   Others-CCCPS202      2937  26.147491  91.727997        547   
3  11808  Others-CCCPS135a      3883  26.147499  91.728005       1081   
4  10496  Others-CCCPS119a      2803  26.147541  91.727970        195   

   VehicleType  TrafficConditionNearby  QueueLength  IsSpecialDay  \
0            1                       2            1             0   
1            1                       0            2             0   
2            0                       2            2             0   
3            1                       2            2             0   
4            1                       2            1             0   

            Timestamp  
0 2016-10-04 07:59:00  
1 2016-10-04 07:59:00  
2 2016-10-04 07:59:00  
3 2016-10-04 07:59:00  
4

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [None]:
def demand_based_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type):
    alpha, beta, gamma, delta, epsilon = 1.0, 0.8, 0.5, 1.2, 1.0
    vehicle_weights = {"car": 1.0, "bike": 0.8, "truck": 1.5}

    demand = (
        alpha * (occupancy / capacity)
        + beta * queue
        - gamma * traffic
        + delta * special_day
        + epsilon * vehicle_weights.get(vehicle_type, 1.0)
    )
    # Normalize demand to 0–1
    normalized_demand = (demand - 0) / (10)  # adjust scale as needed
    # Final price
    return base_price * (1 + 0.5 * normalized_demand)


In [None]:
@pw.udf
def compute_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type):
    return demand_based_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type)

In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"
base_price=10
# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


In [None]:
print("\n Data stream schema after adding time columns:")
print(data_with_time.schema)


 Data stream schema after adding time columns:
id          | Timestamp | Occupancy | Capacity | t               | day
ANY_POINTER | STR       | INT       | INT      | DATE_TIME_NAIVE | STR


# Step 2: Making a simple pricing function

In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6


        #  price=compute_price(
        #     base_price,
        #     pw.this.capacity
        #     # pw.this.queue,
        #     # pw.this.traffic,
        #     # pw.this.special,
        #     # pw.this.veh

        #     )

         price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()



**Model2**

In [None]:
#  Install dependencies (in Google Colab)
!pip install pathway panel bokeh scikit-learn --quiet

#  Imports
import numpy as np
import pandas as pd
import datetime
import pathway as pw
import bokeh.plotting
import panel as pn
from sklearn.preprocessing import LabelEncoder

#  Read & preprocess dataset
df = pd.read_csv("dataset.csv")

df['Timestamp'] = pd.to_datetime(
    df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)
df = df.sort_values('Timestamp').reset_index(drop=True)

df['TrafficConditionNearby'] = LabelEncoder().fit_transform(df['TrafficConditionNearby'])
df['VehicleType'] = LabelEncoder().fit_transform(df['VehicleType'])

df.drop(columns=['LastUpdatedDate', 'LastUpdatedTime'], inplace=True)

#  Save clean CSV for streaming
df[[
    "SystemCodeNumber", "Timestamp", "Occupancy", "Capacity", "QueueLength",
    "TrafficConditionNearby", "IsSpecialDay", "VehicleType"
]].to_csv("parking_stream.csv", index=False)

#  Pathway Schema
class ParkingSchema(pw.Schema):
    SystemCodeNumber: str
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: int
    IsSpecialDay: int
    VehicleType: int

#  Pricing functions
def demand_based_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type):
    alpha, beta, gamma, delta, epsilon = 1.0, 0.8, 0.5, 1.2, 1.0
    vehicle_weights = {0:1.0, 1:0.8, 2:1.5}

    demand = (
        alpha * (occupancy / capacity) +
        beta * queue -
        gamma * traffic +
        delta * special_day +
        epsilon * vehicle_weights.get(vehicle_type, 1.0)
    )

    normalized_demand = demand / 10
    return base_price * (1 + 0.5 * normalized_demand)

@pw.udf
def compute_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type):
    return demand_based_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type)

#  Stream the data
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=10)

fmt = "%Y-%m-%d %H:%M:%S"
base_price = 10

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%d")
)

#  Window & reduce
delta_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.SystemCodeNumber + "_" + pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        SystemCodeNumber=pw.reducers.any(pw.this.SystemCodeNumber),
        t=pw.this._pw_window_end,
        occ=pw.reducers.max(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
        queue=pw.reducers.max(pw.this.QueueLength),
        traffic=pw.reducers.max(pw.this.TrafficConditionNearby),
        special=pw.reducers.max(pw.this.IsSpecialDay),
        veh=pw.reducers.any(pw.this.VehicleType),
        instance=pw.reducers.any(pw.this._pw_instance)
    )
    .with_columns(
        price=compute_price(
            base_price,
            pw.this.occ,
            pw.this.cap,
            pw.this.queue,
            pw.this.traffic,
            pw.this.special,
            pw.this.veh
        )
    )
)

#  Visualization
pn.extension()

def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime"
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")
    fig.scatter("t", "price", source=source, size=6, color="red")
    return fig

#  unique parking lots
lot_ids = df['SystemCodeNumber'].unique()

tabs_content = []

for lot in lot_ids:
    lot_data = delta_window.filter(pw.this.SystemCodeNumber == lot)
    viz = lot_data.plot(price_plotter, sorting_col="t")
    tabs_content.append( (f"Lot {lot}", viz) )

pn.Tabs(*tabs_content).servable()

# #  Run Pathway
# %%capture --no-display
# pw.run()


In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

**Model1**

In [None]:

# !pip install pathway panel bokeh

import pathway as pw
import panel as pn
import datetime
import bokeh.plotting

pn.extension()

#  Define schema
class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    VehicleType: int
    TrafficConditionNearby: int
    IsSpecialDay: int

#  Linear price model (Model 1)
@pw.udf
def compute_linear_price(occupancy, capacity):
    base_price = 10.0
    alpha = 10.0
    return base_price + alpha * (occupancy / capacity)

# Replay your dataset
data = pw.demo.replay_csv(
    "parking_stream.csv",  # Make sure this file exists in your Colab
    schema=ParkingSchema,
    input_rate=50
)

# Parse timestamp
fmt = "%Y-%m-%d %H:%M:%S"
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt)
)

# Compute price
data_with_price = data_with_time.with_columns(
    price = compute_linear_price(
        data_with_time.Occupancy,
        data_with_time.Capacity
    )
)

# Windowed daily aggregation
delta_window = (
    data_with_price.windowby(
        pw.this.t,
        instance=pw.this.SystemCodeNumber,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        price=pw.reducers.any(pw.this.price),
        SystemCodeNumber=pw.reducers.any(pw.this.SystemCodeNumber)
    )
)

#  Bokeh plot for a single lot
def price_plotter(source, lot_id):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title=f"Daily Price for Lot {lot_id}",
        x_axis_type="datetime"
    )
    fig.line("t", "price", source=source, line_width=2, color="blue")
    fig.scatter("t", "price", source=source, size=6, color="red")
    return fig

# Tabs for each lot
tabs_content = []

# if your SystemCodeNumbers are named "Lot-1", "Lot-2", etc:
for i in range(1, 15):  # 1 to 14
    lot_id = f"Lot-{i}"
    lot_data = delta_window.filter(pw.this.SystemCodeNumber == lot_id)
    viz = lot_data.plot(lambda source: price_plotter(source, lot_id), sorting_col="t")
    tabs_content.append((f"{lot_id}", viz))

pn.Tabs(*tabs_content).servable()

# Run the Pathway pipeline
# pw.run()


In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()