# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('/content/dataset.csv')
places = df['SystemCodeNumber'].unique()
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True).drop(columns = 'ID')


In [None]:
df

Unnamed: 0,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,BHMNCPHST01,1200,26.140014,91.731000,237,bike,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,BHMMBMMBX01,687,20.000035,78.000003,264,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,BHMNCPNST01,485,26.140048,91.730972,249,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,BHMEURBRD01,470,26.149020,91.739503,373,car,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18364,BHMBCCTHL01,387,26.144495,91.736205,387,car,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18365,BHMBCCMKT01,577,26.144536,91.736172,193,cycle,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18366,Others-CCCPS98,3103,26.147500,91.727978,1671,car,low,3,0,19-12-2016,16:30:00,2016-12-19 16:30:00


In [None]:
# df['TrafficConditionNearby'].unique() # low/average/high
# df['VehicleType'].unique() # car,bike,truck,cycle
# df['IsSpecialDay'].unique() 0/1
# queuelenth 0-15

# Label encoding for TrafficConditionNearby
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].map({
    'low': 1,
    'average': 2,
    'high': 3
})

# Label encoding for VehicleType
df['VehicleType'] = df['VehicleType'].map({
    'cycle': 1,
    'bike': 2,
    'car': 3,
    'truck': 4
})

df

Unnamed: 0,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,BHMBCCMKT01,577,26.144536,91.736172,61,3,1,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,BHMNCPHST01,1200,26.140014,91.731000,237,2,1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,BHMMBMMBX01,687,20.000035,78.000003,264,3,1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,BHMNCPNST01,485,26.140048,91.730972,249,3,1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,Shopping,1920,26.150504,91.733531,614,1,1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,BHMEURBRD01,470,26.149020,91.739503,373,3,1,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18364,BHMBCCTHL01,387,26.144495,91.736205,387,3,1,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18365,BHMBCCMKT01,577,26.144536,91.736172,193,1,1,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18366,Others-CCCPS98,3103,26.147500,91.727978,1671,3,1,3,0,19-12-2016,16:30:00,2016-12-19 16:30:00


In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["SystemCodeNumber","Timestamp", "Occupancy", "Capacity", "VehicleType", "TrafficConditionNearby", "QueueLength", "IsSpecialDay"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    SystemCodeNumber : str
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    VehicleType: int # Type of vehicle (1=cycle, 2=bike, 3=car, 4=truck)
    TrafficConditionNearby: int # Level of traffic (1=low, 2=average, 3=high)
    QueueLength: int # 0-15
    IsSpecialDay: int # 0/1


In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)


In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time
    .windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .groupby(pw.this.SystemCodeNumber)
    .reduce(
        SystemCodeNumber = pw.reducers.argmax(pw.this.t, pw.this.SystemCodeNumber),
        occupancy = pw.reducers.argmax(pw.this.t, pw.this.Occupancy),
        capacity = pw.reducers.argmax(pw.this.t, pw.this.Capacity),
        t = pw.reducers.argmax(pw.this.t, pw.this.t),
        day = pw.reducers.argmax(pw.this.t, pw.this.day),
        VehicleType = pw.reducers.argmax(pw.this.t, pw.this.VehicleType),
        TrafficConditionNearby = pw.reducers.argmax(pw.this.t, pw.this.TrafficConditionNearby),
        QueueLength = pw.reducers.argmax(pw.this.t, pw.this.QueueLength),
        IsSpecialDay = pw.reducers.argmax(pw.this.t, pw.this.IsSpecialDay),

    )
    .with_columns(
        # alpha = 2.0
        # beta = 1.5
        # gamma = 1.0
        # delta = 3.0
        # epsilon = 0.8
        # baseprice 10
        normalized_demand = (
                                2*(pw.this.occupancy/pw.this.capacity)
                             + 1.5*(pw.this.QueueLength)
                                  -(pw.this.TrafficConditionNearby)
                                +3*(pw.this.IsSpecialDay)
                             + 0.8*(pw.this.VehicleType)
                             )
                             /

                           (
                                  (pw.this.occupancy/pw.this.capacity)
                                +(pw.this.QueueLength)
                                +(pw.this.TrafficConditionNearby)
                                +(pw.this.IsSpecialDay)
                               + (pw.this.VehicleType)
                               )

    )
    .with_columns(

                   price = 10 *( 1 + pw.this.normalized_demand)
    )
)

delta_window


In [None]:
pw.io.csv.write(delta_window, '/content/output.csv')

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()



In [None]:
df_1 = pd.read_csv('/content/output.csv')
df_1['t'] = pd.to_datetime(df_1['t'])
df_1['price'] = df_1['price'].round(2)
df_1

Unnamed: 0,SystemCodeNumber,occupancy,capacity,t,day,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,normalized_demand,price,time,diff
0,BHMEURBRD01,325,470,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.864229,18.64,1751807452212,1
1,BHMNCPNST01,304,485,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.853143,18.53,1751807452212,1
2,Others-CCCPS98,936,3103,2016-10-04 16:31:00,2016-10-04T00:00:00,2,1,2,0,0.792827,17.93,1751807452212,1
3,BHMBCCTHL01,245,387,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.854227,18.54,1751807452212,1
4,Others-CCCPS105a,1089,2009,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.838287,18.38,1751807452212,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025,Others-CCCPS8,806,1322,2016-12-19 16:30:00,2016-12-19T00:00:00,3,2,3,0,0.710754,17.11,18446744073709551614,1
2026,BHMNCPNST01,257,485,2016-12-19 16:30:00,2016-12-19T00:00:00,3,1,2,0,0.836123,18.36,18446744073709551614,1
2027,Others-CCCPS135a,2533,3883,2016-12-19 16:30:00,2016-12-19T00:00:00,3,1,3,0,0.941499,19.41,18446744073709551614,1
2028,BHMEURBRD01,373,470,2016-12-19 16:30:00,2016-12-19T00:00:00,3,1,2,0,0.881303,18.81,18446744073709551614,1


In [26]:
for place in places:
   mask = df_1['SystemCodeNumber'] == place
   df_2 = df_1[mask]
   data = df_2.copy().reset_index(drop=True)

#Update prices row-by-row using previous row
   for i in range(1, len(data)):
    prev_price = data.loc[i - 1, 'price']
    prev_demand = data.loc[i - 1, 'normalized_demand']
    demand = data.loc[i, 'normalized_demand']
    if (prev_demand) < (demand):
      alpha = 5
      demand_diff = demand - prev_demand
    else:
      alpha = -5
      demand_diff = prev_demand - demand

    data.loc[i, 'price'] = prev_price + alpha * (demand_diff)

   df_1.loc[mask, 'price'] = data['price'].values


df_1['price'] = df_1['price'].round(2)
df_1

Unnamed: 0,SystemCodeNumber,occupancy,capacity,t,day,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,normalized_demand,price,time,diff
0,BHMEURBRD01,325,470,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.864229,18.64,1751807452212,1
1,BHMNCPNST01,304,485,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.853143,18.53,1751807452212,1
2,Others-CCCPS98,936,3103,2016-10-04 16:31:00,2016-10-04T00:00:00,2,1,2,0,0.792827,17.93,1751807452212,1
3,BHMBCCTHL01,245,387,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.854227,18.54,1751807452212,1
4,Others-CCCPS105a,1089,2009,2016-10-04 16:31:00,2016-10-04T00:00:00,3,1,2,0,0.838287,18.38,1751807452212,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025,Others-CCCPS8,806,1322,2016-12-19 16:30:00,2016-12-19T00:00:00,3,2,3,0,0.710754,17.67,18446744073709551614,1
2026,BHMNCPNST01,257,485,2016-12-19 16:30:00,2016-12-19T00:00:00,3,1,2,0,0.836123,18.44,18446744073709551614,1
2027,Others-CCCPS135a,2533,3883,2016-12-19 16:30:00,2016-12-19T00:00:00,3,1,3,0,0.941499,19.38,18446744073709551614,1
2028,BHMEURBRD01,373,470,2016-12-19 16:30:00,2016-12-19T00:00:00,3,1,2,0,0.881303,18.73,18446744073709551614,1


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [35]:
# from bokeh.io import output_file, show
# from bokeh.plotting import figure
# from bokeh.io import curdoc
# from bokeh.plotting import ColumnDataSource, figure, output_file, show


# # print(df)

# curdoc().theme = 'dark_minimal'


# def plot_price_fluctuations(df, place):
#    p = figure(title=place ,width=1000, height=400, x_axis_type="datetime")
#    p.line(df.t, df.price)
#    p.scatter(df.t, df.price, fill_color="red", size=5)
#    show(p)


# for place in places:
#    plot_price_fluctuations(df_1[df_1['SystemCodeNumber'] == place], place)


In [37]:
from bokeh.io import show, curdoc
from bokeh.plotting import figure, ColumnDataSource
from bokeh.models import Dropdown, CustomJS
from bokeh.layouts import column
from bokeh.models import Select

places = df_1['SystemCodeNumber'].unique().tolist()
df_1['t_ms'] = pd.to_datetime(df_1['t']).astype('int64') // 10**6  # to milliseconds

data_dict = {
    place: df_1[df_1['SystemCodeNumber'] == place][['t_ms', 'price']].to_dict('list')
    for place in places
}

#Initialize with the first place
initial_place = places[0]
source = ColumnDataSource(data={
    't_ms': data_dict[initial_place]['t_ms'],
    'price': data_dict[initial_place]['price'],
})

#Create plot
curdoc().theme = 'dark_minimal'

p = figure(title=f"{initial_place}", x_axis_type="datetime", width=1000, height=400)
p.line('t_ms', 'price', source=source)
p.scatter('t_ms', 'price', source=source, fill_color='red', size=5)

#Select menu
menu = [(place, place) for place in places]
select = Select(title="Select Place", value=initial_place, options=places)
select.js_on_change("value", CustomJS(args=dict(source=source, full_data=data_dict, plot=p), code="""
    const selected = this.value;
    const data = full_data[selected];
    source.data = { t_ms: data.t_ms, price: data.price };
    plot.title.text = selected;
    source.change.emit();
"""))


layout = column(select, p)
show(layout)