# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m51.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m91.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [4]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [5]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,5248,BHMNCPHST01,1200,26.140014,91.731000,237,bike,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,3936,BHMMBMMBX01,687,20.000035,78.000003,264,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,6560,BHMNCPNST01,485,26.140048,91.730972,249,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,17056,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...
18363,3935,BHMEURBRD01,470,26.149020,91.739503,373,car,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18364,2623,BHMBCCTHL01,387,26.144495,91.736205,387,car,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18365,1311,BHMBCCMKT01,577,26.144536,91.736172,193,cycle,low,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18366,17055,Others-CCCPS98,3103,26.147500,91.727978,1671,car,low,3,0,19-12-2016,16:30:00,2016-12-19 16:30:00


In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 18368 entries, 0 to 18367
Data columns (total 13 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   ID                      18368 non-null  int64         
 1   SystemCodeNumber        18368 non-null  object        
 2   Capacity                18368 non-null  int64         
 3   Latitude                18368 non-null  float64       
 4   Longitude               18368 non-null  float64       
 5   Occupancy               18368 non-null  int64         
 6   VehicleType             18368 non-null  object        
 7   TrafficConditionNearby  18368 non-null  object        
 8   QueueLength             18368 non-null  int64         
 9   IsSpecialDay            18368 non-null  int64         
 10  LastUpdatedDate         18368 non-null  object        
 11  LastUpdatedTime         18368 non-null  object        
 12  Timestamp               18368 non-null  dateti

In [7]:
df = df.rename(columns={
    "TrafficConditionNearby": "TrafficLevel",
    "ID": "LotID"
})
df.to_csv("parking_stream.csv", index=False)


In [10]:
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    VehicleType: str
    TrafficLevel: str
    IsSpecialDay: int
    Latitude: float
    Longitude: float
    LotID: int
    SystemCodeNumber: str


In [11]:
# Import pathway and your schema
import pathway as pw

# Load the CSV as a simulated stream
data = pw.demo.replay_csv(
    "parking_stream.csv",       # Your CSV file
    schema=ParkingSchema,       # The full schema you defined
    input_rate=1000             # Ingest ~1000 rows/sec to simulate real-time
)


In [12]:
# Define the datetime format for parsing
fmt = "%Y-%m-%d %H:%M:%S"

# Add parsed datetime and daily anchor columns
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),                      # Full datetime
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")  # Midnight timestamp for the day
)


# Step 2: Making a simple pricing function

In [13]:
import datetime

# Define a 1-day tumbling window for each calendar day per lot (if instance is lot-specific)
delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column (parsed datetime)
        instance=pw.this.day,  # Logical key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed daily windows
        behavior=pw.temporal.exactly_once_behavior()  # Exactly-once processing semantics
    )
    .reduce(
        t = pw.this._pw_window_end,                         # Use window end as time anchor
        occ_max = pw.reducers.max(pw.this.Occupancy),       # Max occupancy observed
        occ_min = pw.reducers.min(pw.this.Occupancy),       # Min occupancy observed
        cap = pw.reducers.max(pw.this.Capacity),            # Max capacity (typically fixed)
    )
    .with_columns(
        price = 10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap  # Dynamic pricing logic
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [14]:
import panel as pn
import bokeh.plotting
pn.extension()

# Define a Bokeh plotting function for dynamic pricing visualization
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",
        x_axis_label="Time",
        y_axis_label="Price ($)"
    )

    # Line for price evolution
    fig.line("t", "price", source=source, line_width=2, color="navy", legend_label="Price")

    # Circles at data points
    fig.circle("t", "price", source=source, size=6, color="red")

    fig.legend.location = "top_left"
    fig.toolbar.logo = None
    fig.toolbar.autohide = True

    return fig

# Bind Pathway streaming output to Bokeh using .plot()
viz = delta_window.plot(price_plotter, sorting_col="t")

# Wrap in a Panel layout and make it servable
pn.Column(viz).servable()




In [16]:
# Start the Pathway pipeline in the background
# This will execute all streaming logic (windowing, UDFs, visualization, etc.)
# %%capture --no-display suppresses console output/logs

%%capture --no-display
pw.run()


Output()

