# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
# from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [10]:
df = pd.read_csv('dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
group = df.groupby('Latitude')
group.head()

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
17056,17056,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0,04-10-2016,07:59:00
17057,17057,Shopping,1920,26.150504,91.733531,761,car,low,2,0,04-10-2016,08:25:00
17058,17058,Shopping,1920,26.150504,91.733531,958,car,low,2,0,04-10-2016,08:59:00
17059,17059,Shopping,1920,26.150504,91.733531,1121,cycle,low,3,0,04-10-2016,09:32:00


In [11]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# data_with_time = data_with_time.sort(key=pw.this.t)

In [None]:
data_with_time

In [None]:
pw.run()

# Step 2: Making a simple pricing function

In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

In [None]:
df.columns

Index(['ID', 'SystemCodeNumber', 'Capacity', 'Latitude', 'Longitude',
       'Occupancy', 'VehicleType', 'TrafficConditionNearby', 'QueueLength',
       'IsSpecialDay', 'LastUpdatedDate', 'LastUpdatedTime', 'Timestamp',
       'LocationID'],
      dtype='object')

## Basic Data and Schema Preparation

In [12]:
# group according to same lat and long so that we can compare different locations

df['LocationID'] = df.groupby(['Latitude', 'Longitude']).ngroup()

location_mapping = df.groupby('LocationID')[['Latitude', 'Longitude']].first()

In [13]:
#save the necessary columns

class PriceModelSchema(pw.Schema):
  Occupancy: int
  Timestamp: str
  Capacity: int
  LocationID: int
  QueueLength: int
  IsSpecialDay: bool
  VehicleType: str
  TrafficConditionNearby: str


basic_price_model_columns = "Occupancy,Timestamp,Capacity,LocationID,QueueLength,IsSpecialDay,VehicleType,TrafficConditionNearby".split(',')
df[basic_price_model_columns].to_csv("basic_price_model.csv", index=False)

In [15]:
#load the saved data and add datetime

data = pw.demo.replay_csv("basic_price_model.csv", schema=PriceModelSchema, input_rate=1000)

fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# data_with_time = data_with_time.groupby(pw.this.LocationID).sort(key=pw.this.t)

## Bokeh Plotting Function

In [None]:
from bokeh.transform import factor_cmap
from bokeh.palettes import Category10, Category20
from bokeh.models import ColumnDataSource
import datetime

# basic scatter plotter
def plotter(source):
    unique_locations = list([str(i) for i in range(14)])

    palette = Category20[20] if len(unique_locations) > 10 else Category10[10]

    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Parking Price by Location",
        x_axis_type="datetime",
        tools="pan,wheel_zoom,box_zoom,reset,hover"
    )

    color_map = factor_cmap(
        'LocationID',
        palette=palette,
        factors=unique_locations
    )

    print(color_map)

    fig.scatter(
        't', "price",
        source=source,
        size=4,
        color=color_map,
        legend_field="LocationID"
    )

    # for i, loc in enumerate(unique_locations):
    #     loc_source = ColumnDataSource({
    #         't': [t for t, lid in zip(source.data['t'], source.data['LocationID']) if lid == loc],
    #         'price': [p for p, lid in zip(source.data['price'], source.data['LocationID']) if lid == loc]
    #     })
    #     fig.line(
    #         't', "price",
    #         source=loc_source,
    #         line_width=2,
    #         color=palette[i % len(palette)]
    #     )

    fig.xaxis.axis_label = "Time"
    fig.yaxis.axis_label = "Price"

    return fig

## Baseline Linear Model
$$price_{t+1}=price_t + \alpha\frac{Occupancy}{Capacity}$$

- We can use a stateful reducer in order to model this recurrence for individual locations

- We can compare the prices between competitors by plotting them for individual locations

In [9]:
#stateful reducer for calculation of price based on previous price

alpha=0.05
@pw.reducers.stateful_many
def update_price(state: int|None, vals:list[tuple[list[float], int]]) -> float:
  if state is None:
    return 10.0
  acc = 0.0
  for row, col in vals:
    acc += alpha*row[0]/row[1]
  return state + acc

In [None]:
import datetime

# use a custom stateful reducer to calculate price based on current values and previous price
first = data_with_time.with_columns(
    occ_ratio=alpha*pw.this.Occupancy / pw.this.Capacity
).groupby(pw.this.LocationID).reduce(
    pw.this.LocationID,
    t=pw.reducers.latest(pw.this.t)
    price=update_price(pw.this.Occupancy, pw.this.Capacity)
)

In [None]:
pw.io.csv.write(first, 'Output/output_first.csv')

pw.run()

Output()





In [None]:
import datetime
pn.extension()

class ResultSchema(pw.Schema):
  price: float
  t: str
  diff: int
  LocationID: str
  # t: str

#reload the file and convert to datetime
result_csv = pw.demo.replay_csv('Output/output_first.csv', schema=ResultSchema, input_rate=1000)
result_csv_file = result_csv.with_columns(
    t=pw.this.t.dt.strptime("%Y-%m-%d %H:%M:%S")
)

#plot the results
viz = result_csv_file.plot(plotter, sorting_col="t")
pn.Column(viz).servable()

Field(field='LocationID', transform=CategoricalColorMapper(id='34de5557-57b4-49e4-a4a7-984b2c31df8c', ...), units=Unspecified)


In [22]:
pw.run()

Output()

ERROR:pathway_engine.connectors:Parse error: cannot create a field "price" with type str from value None. Original error: TypeError: cannot create an object of type String from value None
ERROR:pathway_engine.connectors:Parse error: cannot create a field "diff" with type str from value None. Original error: TypeError: cannot create an object of type String from value None
ERROR:pathway_engine.connectors:Parse error: cannot create a field "diff" with type str from value None. Original error: TypeError: cannot create an object of type String from value None
ERROR:pathway_engine.connectors:Parse error: cannot create a field "diff" with type str from value None. Original error: TypeError: cannot create an object of type String from value None
ERROR:pathway_engine.connectors:Parse error: cannot create a field "diff" with type str from value None. Original error: TypeError: cannot create an object of type String from value None
ERROR:pathway_engine.connectors:Parse error: cannot create a fie

## Demand-Based Pricing Model

$$Demand = \alpha \frac{Occupancy}{Capacity}+\beta \times QueueLength - \gamma \times Traffic + \delta \times IsSpecialDay + \epsilon \times VehicleTypeWeight - 0.5$$

$$Normalized-Demand = tanh(Demand)$$

$$price = Base-Price \times (1+Normalized-Demand)$$

- In order to model this price functions, we can calculate weights based on values for the parameters.

- After that, we plot the prices for various days.

- It can be noticed that for a given day, the price rises during the beginning of the day and tends to fall near the end of the day.

- The price is maximum during the middle of the day.

In [25]:
import datetime

# constants and functions

ALPHA = 0.5
BETA = 0.1
GAMMA = 0.2
DELTA = 0.3
EPSILON = 0.15
LAMBDA = 1
BASE_PRICE = 10.0

@pw.udf
def traffic_to_numeric(traffic_condition: str) -> float:
    mapping = {
        "low": 0.2,
        "average": 0.5,
        "high": 0.8,
    }
    return mapping.get(traffic_condition, 0.5)

@pw.udf
def vehicle_type_weight(vehicle_type: str) -> float:
    mapping = {
        "cycle": 0.7,
        "bike": 1.0,
        "car": 1.2,
        "truck": 1.5
    }
    return mapping.get(vehicle_type, 1.0)

@pw.udf
def calculate_price(demand: float, base_price: float = BASE_PRICE) -> float:
    normalized = np.tanh(demand)
    mult = 1 + LAMBDA * normalized
    mult = max(0.5, min(2.0, mult))

    return base_price * mult

# @pw.udf
# def calculate_demand(occupancy: int, capacity: int, queue_length: int,
#                      traffic: str, is_special_day: bool, vehicle_type: str) -> float:
#     occupancy_rate = occupancy / capacity if capacity > 0 else 0
#     traffic_value = traffic_to_numeric(traffic)
#     special_day = 1.0 if is_special_day else 0.0
#     vehicle_weight = vehicle_type_weight(vehicle_type)

#     demand = (ALPHA * occupancy_rate +
#               BETA * queue_length -
#               GAMMA * traffic_value +
#               DELTA * special_day +
#               EPSILON * vehicle_weight)

#     return demand

# have to make an intermediate because pw doesnt support direct calculation
intermediate = data_with_time.with_columns(
    occupancy_rate=pw.if_else(
        pw.this.Capacity > 0,
        pw.this.Occupancy / pw.this.Capacity,
        0.0
    ),
    traffic_numeric=traffic_to_numeric(pw.this.TrafficConditionNearby),
    vehicle_weight=vehicle_type_weight(pw.this.VehicleType),
    special_day_numeric=pw.if_else(pw.this.IsSpecialDay, 1.0, 0.0)
)

# add demand column
with_demand = intermediate.with_columns(
    demand=(ALPHA * pw.this.occupancy_rate +
            BETA * pw.this.QueueLength -
            GAMMA * pw.this.traffic_numeric +
            DELTA * pw.this.special_day_numeric +
            EPSILON * pw.this.vehicle_weight) - 0.5
)

#calculate price, only require price, time and location for plotting
demand_pricing = (
    with_demand.with_columns(
        t=pw.this.t,
        price=calculate_price(pw.this.demand)
    )
).select(
    price=pw.this.price,
    LocationID=pw.this.LocationID,
    t=pw.this.t
)

In [None]:
pw.io.csv.write(demand_pricing, 'Output/output_second.csv')
pw.run()

Output()



In [None]:
pn.extension()

# reload the output for plotting
class ResultSchema(pw.Schema):
  price: float
  time: str
  diff: int
  LocationID: str
  t: str

#reload the file and convert to datetime
result_csv = pw.demo.replay_csv('Output/output_second.csv', schema=ResultSchema, input_rate=1000)

result_csv = result_csv.with_columns(
    t=pw.this.t.dt.strptime("%Y-%m-%dT%H:%M:%S.000000000")
)

#plot the results
viz = result_csv.plot(plotter, sorting_col="t")
pn.Column(viz).servable()

Field(field='LocationID', transform=CategoricalColorMapper(id='ccbaedee-4410-47c8-8ca7-173acf2eb07c', ...), units=Unspecified)


In [6]:
pw.run()

Output()

