# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

In [None]:
from google.colab import files
uploaded = files.upload()



Saving dataset.csv to dataset (1).csv


In [None]:
df = pd.read_csv('dataset.csv')
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
stream_columns = [
    "Timestamp",
    "SystemCodeNumber",
    "Latitude",
    "Longitude",
    "Occupancy",
    "Capacity",
    "QueueLength",
    "TrafficConditionNearby",
    "IsSpecialDay",
    "VehicleType"
]

df[stream_columns].to_csv("parking_stream.csv", index=False)

In [None]:
# Define the schema for the streaming data using Pathway

class ParkingSchema(pw.Schema):
    Timestamp: str                      # Combined datetime in ISO format
    SystemCodeNumber: str              # Unique identifier for the parking lot
    Latitude: float                    # Latitude of the parking lot
    Longitude: float                   # Longitude of the parking lot
    Occupancy: int                     # Current number of parked vehicles
    Capacity: int                      # Total capacity of the lot
    QueueLength: int                   # Number of vehicles in queue
    TrafficConditionNearby: float     # Congestion level (assuming continuous scale, change if categorical)
    IsSpecialDay: int                  # 1 if special event/holiday, else 0
    VehicleType: str                   # Type of incoming vehicle (car, bike, truck)

In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


In [None]:
#checking the data
pw.debug.compute_and_print(delta_window.select(pw.this.t, pw.this.price))



            | t                   | price
^F83MSCQ... | 2016-10-05 00:00:00 | 10.821529745042493
^9WXJY3Q... | 2016-10-06 00:00:00 | 10.813546227143961
^2AJYF5K... | 2016-10-07 00:00:00 | 10.792943600309039
^PZGC6QT... | 2016-10-08 00:00:00 | 10.674220963172804
^F3RRNFJ... | 2016-10-09 00:00:00 | 10.39866082925573
^KBSN7GX... | 2016-10-10 00:00:00 | 10.258820499613702
^H66SECC... | 2016-10-11 00:00:00 | 10.76487252124646
^GY3RHFA... | 2016-10-12 00:00:00 | 10.811743497295906
^FXNC20K... | 2016-10-13 00:00:00 | 10.795776461498841
^FFGZ6RP... | 2016-10-14 00:00:00 | 10.814833891321143
^C31DQYW... | 2016-10-15 00:00:00 | 10.678856554210661
^EWZHR7P... | 2016-10-16 00:00:00 | 10.377800669585373
^W9JJXKM... | 2016-10-17 00:00:00 | 10.462013906773114
^YHATP6Y... | 2016-10-18 00:00:00 | 10.746587689930466
^HAG1X4Z... | 2016-10-19 00:00:00 | 10.791655936131857
^N1AS0Y7... | 2016-10-20 00:00:00 | 10.785217615245944
^JDABJWT... | 2016-10-23 00:00:00 | 10.342261138295132
^RMTWVEV... | 2016-10-24 

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension(comms='colab')  # Enables Colab display mode

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.scatter("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz)

In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()



> ## **MODEL 2**

In [None]:
# Define model weights
ALPHA = 0.4   # occupancy weight
BETA = 0.2    # queue length weight
GAMMA = 0.3   # traffic penalty
DELTA = 0.5   # boost for special day
LAMBDA = 0.5  # scaling factor for price impact
VEHICLE_TYPE_WEIGHTS = {       # boost according to vehicle
    "car": 0.4,
    "bike": 0.2,
    "truck": 0.6
}

In [None]:
data_enriched = data_with_time.with_columns(
    vehicle_weight = pw.cast(float, pw.apply(lambda v: VEHICLE_TYPE_WEIGHTS.get(v.lower(), 0.3), pw.this.VehicleType))
)


In [None]:
demand_stream = data_enriched.with_columns(

    demand_score = (
        ALPHA * (pw.this.Occupancy / pw.this.Capacity)
        + BETA * pw.this.QueueLength
        - GAMMA * pw.this.TrafficConditionNearby
        + DELTA * pw.this.IsSpecialDay
        + pw.this.vehicle_weight
    )
)
# clip(min, max) function limits the values like this: If < 0 → it becomes 0, If demand_score > 1 → it becomes 1, If it's between 0 and 1 → it stays as-is
demand_stream = demand_stream.with_columns(
    normalized_demand = pw.apply(lambda d: max(0, min(1, d)), pw.this.demand_score)
)

# Cast normalized_demand to float before using it in arithmetic
price_stream = demand_stream.with_columns(
    price = 10 * (1 + LAMBDA * pw.cast(float, pw.this.normalized_demand))
)

