# Introduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m7.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m48.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m75.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('dataset.csv')
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
df.isna().sum()

Unnamed: 0,0
ID,0
SystemCodeNumber,0
Capacity,0
Latitude,0
Longitude,0
Occupancy,0
VehicleType,0
TrafficConditionNearby,0
QueueLength,0
IsSpecialDay,0


In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,5248,BHMNCPHST01,1200,26.140014,91.731000,237,bike,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,3936,BHMMBMMBX01,687,20.000035,78.000003,264,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,6560,BHMNCPNST01,485,26.140048,91.730972,249,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,17056,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...
18363,3935,BHMEURBRD01,470,26.149020,91.739503,373,car,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18364,2623,BHMBCCTHL01,387,26.144495,91.736205,387,car,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18365,1311,BHMBCCMKT01,577,26.144536,91.736172,193,cycle,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18366,17055,Others-CCCPS98,3103,26.147500,91.727978,1671,car,low,3,0,19-12-2016,16:30:00,2016-12-19 16:30:00


In [None]:
# prompt: number of unique ID and SystemCodeNumber

print("Number of unique IDs:", df['ID'].nunique())
print("Number of unique SystemCodeNumbers:", df['SystemCodeNumber'].nunique())


Number of unique IDs: 18368
Number of unique SystemCodeNumbers: 14


In [None]:
# prompt: convert SystemCodeNumber by LabelEncoder

from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
df['SystemCodeNumber_Encoded'] = label_encoder.fit_transform(df['SystemCodeNumber'])

print(df[['SystemCodeNumber', 'SystemCodeNumber_Encoded']])


      SystemCodeNumber  SystemCodeNumber_Encoded
0          BHMBCCMKT01                         0
1          BHMNCPHST01                         4
2          BHMMBMMBX01                         3
3          BHMNCPNST01                         5
4             Shopping                        13
...                ...                       ...
18363      BHMEURBRD01                         2
18364      BHMBCCTHL01                         1
18365      BHMBCCMKT01                         0
18366   Others-CCCPS98                        12
18367         Shopping                        13

[18368 rows x 2 columns]


In [None]:
# prompt: label encode TrafficConditionNearby, VehicleType

label_encoder = LabelEncoder()
df['TrafficConditionNearby_Encoded'] = label_encoder.fit_transform(df['TrafficConditionNearby'])
df['VehicleType_Encoded'] = label_encoder.fit_transform(df['VehicleType'])

print(df[['TrafficConditionNearby', 'TrafficConditionNearby_Encoded']])
print(df[['VehicleType', 'VehicleType_Encoded']])

      TrafficConditionNearby  TrafficConditionNearby_Encoded
0                        low                               2
1                        low                               2
2                        low                               2
3                        low                               2
4                        low                               2
...                      ...                             ...
18363                    low                               2
18364                    low                               2
18365                    low                               2
18366                    low                               2
18367                    low                               2

[18368 rows x 2 columns]
      VehicleType  VehicleType_Encoded
0             car                    1
1            bike                    0
2             car                    1
3             car                    1
4           cycle                    2
...     

In [None]:
# prompt: all unique values of VehicleType', 'VehicleType_Encoded

print(df[['VehicleType', 'VehicleType_Encoded']].drop_duplicates())

   VehicleType  VehicleType_Encoded
0          car                    1
1         bike                    0
4        cycle                    2
14       truck                    3


In [None]:
# prompt: label encode traffic condition such that low=1, moderate =2, high=3

# Custom mapping for label encoding
traffic = {'low': 0, 'average': 0.5, 'high': 1}
df['Traffic'] = df['TrafficConditionNearby'].map(traffic)

print(df[['TrafficConditionNearby', 'Traffic']].drop_duplicates())

   TrafficConditionNearby  Traffic
0                     low      0.0
7                 average      0.5
86                   high      1.0


In [None]:
print("Number of unique TrafficConditionNearby:", df['TrafficConditionNearby'].nunique())

Number of unique TrafficConditionNearby: 3


In [None]:
# prompt: unique values of special day

print("Unique values of special day:", df['IsSpecialDay'].unique())

Unique values of special day: [0 1]


In [None]:
VType = {'cycle': 0, 'bike': 0.33, 'car': 0.66, 'truck': 1}
df['VehicleTypeWeight'] = df['VehicleType'].map(VType)

print(df[['VehicleType', 'VehicleTypeWeight']].drop_duplicates())

   VehicleType  VehicleTypeWeight
0          car               0.66
1         bike               0.33
4        cycle               0.00
14       truck               1.00


In [None]:
# prompt: normalise value of QueueLength

from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
df['QueueLength_Normalized'] = scaler.fit_transform(df[['QueueLength']])

print(df[['QueueLength', 'QueueLength_Normalized']])

       QueueLength  QueueLength_Normalized
0                1                0.066667
1                2                0.133333
2                2                0.133333
3                2                0.133333
4                2                0.133333
...            ...                     ...
18363            2                0.133333
18364            2                0.133333
18365            2                0.133333
18366            3                0.200000
18367            2                0.133333

[18368 rows x 2 columns]


In [None]:
df['Occ/Cap'] = df['Occupancy'] / df['Capacity']

print(df[['Occupancy', 'Capacity', 'Occ/Cap']])

       Occupancy  Capacity   Occ/Cap
0             61       577  0.105719
1            237      1200  0.197500
2            264       687  0.384279
3            249       485  0.513402
4            614      1920  0.319792
...          ...       ...       ...
18363        373       470  0.793617
18364        387       387  1.000000
18365        193       577  0.334489
18366       1671      3103  0.538511
18367       1180      1920  0.614583

[18368 rows x 3 columns]


In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
df['Occ_Cap_Norm'] = scaler.fit_transform(df[['Occ/Cap']])

print(df[['Occ/Cap', 'Occ_Cap_Norm']])

        Occ/Cap  Occ_Cap_Norm
0      0.105719      0.098521
1      0.197500      0.186953
2      0.384279      0.366915
3      0.513402      0.491326
4      0.319792      0.304781
...         ...           ...
18363  0.793617      0.761314
18364  1.000000      0.960165
18365  0.334489      0.318942
18366  0.538511      0.515518
18367  0.614583      0.588814

[18368 rows x 2 columns]


In [None]:
df.VehicleTypeWeight

Unnamed: 0,VehicleTypeWeight
0,0.66
1,0.33
2,0.66
3,0.66
4,0.00
...,...
18363,0.66
18364,0.66
18365,0.00
18366,0.66


In [None]:
# Save the selected columns from the 'main' DataFrame to a CSV file for streaming or downstream processing
df[['SystemCodeNumber_Encoded','QueueLength_Normalized', 'VehicleTypeWeight', 'Traffic','IsSpecialDay','Occ_Cap_Norm','Timestamp']].to_csv('Parking_stream.csv', index=False)


In [None]:
# main = pd.read_csv( "Parking_stream.csv")
# main

In [None]:
# prompt: rename SystemCodeNumber_Encoded as ID in main

# main = main.rename(columns={'SystemCodeNumber_Encoded': 'ID'})
# main

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occ_Cap_Norm: float   # Number of occupied parking spots
    IsSpecialDay: int    # Total parking capacity at the location
    Traffic: float
    VehicleTypeWeight: float
    QueueLength_Normalized: float
    SystemCodeNumber_Encoded: int

In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("Parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [None]:
import pathway as pw
import datetime

# Define daily tumbling window with per-station calculation
demand_window = (
    data_with_time.groupby(pw.this.SystemCodeNumber_Encoded)  # Group by station
    .windowby(
        pw.this.t,               # Use your timestamp column
        instance=pw.this.day,            # Logical day partition
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        SystemCodeNumber_Encoded=pw.this._pw_instance,  # Station SystemCodeNumber_Encoded
        t=pw.this._pw_window_end,
        occ_norm=pw.reducers.avg(pw.this.Occ_Cap_Norm),              # Normalized occupancy
        queue_norm=pw.reducers.max(pw.this.QueueLength_Normalized),  # Max normalized queue
        traffic_norm=pw.reducers.avg(pw.this.Traffic),               # Avg normalized traffic
        is_special_day=pw.reducers.max(pw.this.IsSpecialDay),        # Special day flag
        vehicle_weight=pw.reducers.avg(pw.this.VehicleTypeWeight)    # Avg vehicle type weight
    )
    .with_columns(
        base_price=10,
        demand_index=(
            0.4 * pw.this.occ_norm +
            0.3 * pw.this.queue_norm +
            0.2 * pw.this.traffic_norm +
            0.1 * pw.this.is_special_day +
            0.05 * pw.this.vehicle_weight
        ),
    )
    .with_columns(
        price=pw.this.base_price * (1 + pw.this.demand_index),
    )
    .with_columns(
        price_clamped=pw.apply(
            lambda x: max(5, min(20, x)),
            pw.this.price
        )
    )
)

In [None]:
# # Define a daily tumbling window over the data stream using Pathway
# # This block performs temporal aggregation and computes a dynamic price for each day
# import datetime

# delta_window = (
#     data_with_time.windowby(
#         pw.this.t,  # Event time column to use for windowing (parsed datetime)
#         instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
#         window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
#         behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
#     )
#     .reduce(
#         t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
#         occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
#         occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
#         cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
#     )
#     .with_columns(
#         # Compute the price using a simple dynamic pricing formula:
#         #
#         # Pricing Formula:
#         #     price = base_price + demand_fluctuation
#         #     where:
#         #         base_price = 10 (fixed minimum price)
#         #         demand_fluctuation = (occ_max - occ_min) / cap
#         #
#         # Intuition:
#         # - The greater the difference between peak and low occupancy in a day,
#         #   the more volatile the demand is, indicating potential scarcity.
#         # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
#         # - This fluctuation is added to the base price of 10 to set the final price.
#         # - Example: If occ_max = 90, occ_min = 30, cap = 100
#         #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

#         price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
#     )
# )


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price_clamped evolves over time
    fig.line("t", "price_clamped", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price_clamped", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = demand_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

KeyboardInterrupt: 

In [None]:
main = pd.read_csv( "Parking_stream.csv")
main

Unnamed: 0,SystemCodeNumber_Encoded,QueueLength_Normalized,VehicleTypeWeight,Traffic,IsSpecialDay,Occ_Cap_Norm,Timestamp
0,0,0.066667,0.66,0.0,0,0.098521,2016-10-04 07:59:00
1,4,0.133333,0.33,0.0,0,0.186953,2016-10-04 07:59:00
2,3,0.133333,0.66,0.0,0,0.366915,2016-10-04 07:59:00
3,5,0.133333,0.66,0.0,0,0.491326,2016-10-04 07:59:00
4,13,0.133333,0.00,0.0,0,0.304781,2016-10-04 07:59:00
...,...,...,...,...,...,...,...
18363,2,0.133333,0.66,0.0,0,0.761314,2016-12-19 16:30:00
18364,1,0.133333,0.66,0.0,0,0.960165,2016-12-19 16:30:00
18365,0,0.133333,0.00,0.0,0,0.318942,2016-12-19 16:30:00
18366,12,0.200000,0.66,0.0,0,0.515518,2016-12-19 16:30:00


In [None]:
print(main.columns)

Index(['SystemCodeNumber_Encoded', 'QueueLength_Normalized',
       'VehicleTypeWeight', 'Traffic', 'IsSpecialDay', 'Occ_Cap_Norm',
       'Timestamp'],
      dtype='object')


In [None]:
main['demand_index'] = (
    0.4 * main['Occ_Cap_Norm'] +
    0.3 * main['QueueLength_Normalized'] +
    0.2 * main['Traffic'] +
    0.1 * main['IsSpecialDay'] +
    0.05 * main['VehicleTypeWeight']
)
main['price'] = 10 * (1 + main['demand_index'])
main['price_clamped'] = main['price'].clip(lower=5, upper=20)


In [None]:
main['Timestamp'] = pd.to_datetime(main['Timestamp'])


In [None]:
import panel as pn
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource

pn.extension('bokeh')

station_ids = main['SystemCodeNumber_Encoded'].unique().tolist()
station_selector = pn.widgets.Select(options=station_ids, name='Station ID')

@pn.depends(station_selector)
def price_plotter_interactive(station_id):
    data = main[main['SystemCodeNumber_Encoded'] == station_id]
    if data.empty:
        return pn.pane.Markdown("No data for this station.", height=400)
    source = ColumnDataSource(data)
    p = figure(
        height=400,
        width=800,
        title=f"Daily Clamped Price for Station {station_id}",
        x_axis_type="datetime"
    )
    p.line("Timestamp", "price_clamped", source=source, line_width=2, color="navy")
    p.scatter("Timestamp", "price_clamped", source=source, size=6, color="red", marker="circle")
    p.xaxis.axis_label = "Date"
    p.yaxis.axis_label = "Clamped Price"
    return p

pn.Column(station_selector, price_plotter_interactive).servable()






In [None]:
%%capture --no-display
pw.run()

Output()

KeyboardInterrupt: 