# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m42.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m61.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [35]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [36]:
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Modified - modified.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0.1,Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,LastUpdatedDate,LastUpdatedTime,IsSpecialDay,VehicleType,Latitude,Longitude,TrafficConditionNearby,QueueLength
0,0,BHMBCCMKT01,577,61,04-10-2016,07:59:42,0,car,28.5,77.15,low,2
1,1,BHMBCCMKT01,577,64,04-10-2016,08:25:42,0,car,28.5,77.15,average,2
2,2,BHMBCCMKT01,577,80,04-10-2016,08:59:42,0,car,28.5,77.15,low,2
3,3,BHMBCCMKT01,577,107,04-10-2016,09:32:46,0,car,28.5,77.15,low,3
4,4,BHMBCCMKT01,577,150,04-10-2016,09:59:48,0,car,28.5,77.15,low,3
...,...,...,...,...,...,...,...,...,...,...,...,...
1307,1307,BHMBCCMKT01,577,309,19-12-2016,14:30:33,0,bike,28.5,77.15,average,5
1308,1308,BHMBCCMKT01,577,300,19-12-2016,15:03:34,0,car,28.5,77.15,low,4
1309,1309,BHMBCCMKT01,577,274,19-12-2016,15:29:33,0,truck,28.5,77.15,low,3
1310,1310,BHMBCCMKT01,577,230,19-12-2016,16:03:35,0,cycle,28.5,77.15,low,2


In [37]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [38]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [74]:
# Cell 6: Save selected columns to CSV for streaming
# Ensure SystemCodeNumber is included for multi-lot processing
df[["SystemCodeNumber", "Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]].to_csv("parking_stream.csv", index=False)

# Debug: Print columns and unique SystemCodeNumber values
print("Columns in CSV:", df.columns.tolist())
print("Unique SystemCodeNumber values:", df["SystemCodeNumber"].unique().tolist())

Columns in CSV: ['Unnamed: 0', 'SystemCodeNumber', 'Capacity', 'Occupancy', 'LastUpdatedDate', 'LastUpdatedTime', 'IsSpecialDay', 'VehicleType', 'Latitude', 'Longitude', 'TrafficConditionNearby', 'QueueLength', 'Timestamp']
Unique SystemCodeNumber values: ['BHMBCCMKT01']


In [75]:
# Cell 7: Define the Pathway schema
class ParkingSchema(pw.Schema):
    SystemCodeNumber: str    # Identifier for the parking lot
    Timestamp: str           # Timestamp of the observation
    Occupancy: int           # Number of occupied parking spots
    Capacity: int            # Total parking capacity at the location
    QueueLength: int         # Number of vehicles waiting
    TrafficConditionNearby: str  # Traffic condition ("low", "average", "high")
    IsSpecialDay: int        # Indicator for special days (0 or 1)
    VehicleType: str         # Type of vehicle ("car", "bike", "truck")


In [76]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)
# Debug: Print schema to verify columns
print("Schema of data:", data.schema)

Schema of data: id          | SystemCodeNumber | Timestamp | Occupancy | Capacity | QueueLength | TrafficConditionNearby | IsSpecialDay | VehicleType
ANY_POINTER | STR              | STR       | INT       | INT      | INT         | STR                    | INT          | STR        


In [77]:
# Cell 9: Parse timestamps and add time-based columns
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t=data.Timestamp.dt.strptime(fmt),
    day=data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),
    traffic_code=pw.apply(
        lambda x: 0 if x == "low" else 1 if x == "average" else 2,
        data.TrafficConditionNearby
    ),
    vehicle_weight=pw.apply(
        lambda x: 1.0 if x == "car" else 0.5 if x == "bike" else 1.5,
        data.VehicleType
    ),
    occupancy_rate=data.Occupancy / data.Capacity
)

# Debug: Print schema
print("Schema of data_with_time:", data_with_time.schema)

Schema of data_with_time: id          | SystemCodeNumber | Timestamp | Occupancy | Capacity | QueueLength | TrafficConditionNearby | IsSpecialDay | VehicleType | t               | day | traffic_code | vehicle_weight | occupancy_rate
ANY_POINTER | STR              | STR       | INT       | INT      | INT         | STR                    | INT          | STR         | DATE_TIME_NAIVE | STR | ANY          | ANY            | FLOAT         


# Step 2: Making a simple pricing function

In [78]:
# Cell 10: Sample notebook's simple pricing function
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        # price = base_price + demand_fluctuation
        # where: base_price = 10, demand_fluctuation = (occ_max - occ_min) / cap
        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [79]:
# Cell 11: Visualization function from sample notebook
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
pn.Column(viz).servable()



In [80]:
# Cell 12: Run the Pathway pipeline
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()



In [81]:
# Cell 12.1: Debug - Inspect sample data
sample_data = data_with_time.select(
    data_with_time.SystemCodeNumber,
    data_with_time.t,
    data_with_time.Occupancy,
    data_with_time.Capacity,
    data_with_time.occupancy_rate
).to_pandas()
print("Sample data from data_with_time:\n", sample_data.head())

AttributeError: Table has no column with name to_pandas.

In [82]:
# Cell 13: Model 1 - Baseline Linear Model (Multi-Lot)
# Uses a tumbling window to compute prices, avoiding prev()
# Pricing Formula: price_t = 0.9 * last_price + 0.1 * (10 + occupancy_rate_t)
# Computes last_price as the average price in the previous 30-minute window per lot
try:
    # Define a 30-minute tumbling window
    windowed_data = (
        data_with_time.windowby(
            data_with_time.t,
            instance=data_with_time.SystemCodeNumber,
            window=pw.temporal.tumbling(datetime.timedelta(minutes=30)),
            behavior=pw.temporal.exactly_once_behavior()
        )
        .reduce(
            t=pw.this._pw_window_end,
            system_code=pw.this.SystemCodeNumber,
            avg_occupancy_rate=pw.reducers.avg(pw.this.occupancy_rate),
            # Initialize price as 10 for the first window
            price=pw.reducers.avg(pw.this.price).fillna(10)
        )
    )

    # Compute new price using the previous window's price
    model1_price = windowed_data.with_columns(
        price=0.9 * windowed_data.price.prev().fillna(10) + 0.1 * (10 + windowed_data.avg_occupancy_rate)
    ).select(
        t=windowed_data.t,
        system_code=windowed_data.system_code,
        price=pw.this.price
    )

    # Visualization
    def model1_plotter(source):
        fig = bokeh.plotting.figure(
            height=400,
            width=800,
            title="Model 1: Baseline Linear Pricing (Multi-Lot)",
            x_axis_type="datetime",
            y_axis_label="Price ($)",
        )
        # Plot lines for each lot
        unique_lots = source.data["system_code"].unique()
        colors = ["green", "blue", "red", "purple", "orange"]  # Cycle through colors
        for i, lot in enumerate(unique_lots[:5]):  # Limit to 5 lots for clarity
            subset = source[source.data["system_code"] == lot]
            fig.line("t", "price", source=subset, line_width=2, color=colors[i % len(colors)], legend_label=lot)
        fig.legend.click_policy = "hide"
        return fig

    viz_model1 = model1_price.plot(model1_plotter, sorting_col="t")
    pn.Column(viz_model1).servable()
except Exception as e:
    print("Error in Model 1:", str(e))

Error in Model 1: 'ColumnBinaryOpExpression' object has no attribute 'fillna'


In [83]:
# Cell 14: Model 2 - Intermediate Model (Multi-Lot)
# Includes additional features
# Pricing Formula: price_t = 0.9 * last_price + 0.1 * (10 + occupancy_rate_t + 0.1 * queue_length_t + traffic_code_t + is_special_day_t)
try:
    windowed_data = (
        data_with_time.windowby(
            data_with_time.t,
            instance=data_with_time.SystemCodeNumber,
            window=pw.temporal.tumbling(datetime.timedelta(minutes=30)),
            behavior=pw.temporal.exactly_once_behavior()
        )
        .reduce(
            t=pw.this._pw_window_end,
            system_code=pw.this.SystemCodeNumber,
            avg_occupancy_rate=pw.reducers.avg(pw.this.occupancy_rate),
            avg_queue_length=pw.reducers.avg(pw.this.QueueLength),
            avg_traffic_code=pw.reducers.avg(pw.this.traffic_code),
            avg_is_special_day=pw.reducers.avg(pw.this.IsSpecialDay),
            price=pw.reducers.avg(pw.this.price).fillna(10)
        )
    )

    model2_price = windowed_data.with_columns(
        price=0.9 * windowed_data.price.prev().fillna(10) + 0.1 * (
            10 + pw.this.avg_occupancy_rate + 0.1 * pw.this.avg_queue_length +
            pw.this.avg_traffic_code + pw.this.avg_is_special_day
        )
    ).select(
        t=windowed_data.t,
        system_code=windowed_data.system_code,
        price=pw.this.price
    )

    # Visualization
    def model2_plotter(source):
        fig = bokeh.plotting.figure(
            height=400,
            width=800,
            title="Model 2: Intermediate Pricing (Multi-Lot)",
            x_axis_type="datetime",
            y_axis_label="Price ($)",
        )
        unique_lots = source.data["system_code"].unique()
        colors = ["green", "blue", "red", "purple", "orange"]
        for i, lot in enumerate(unique_lots[:5]):
            subset = source[source.data["system_code"] == lot]
            fig.line("t", "price", source=subset, line_width=2, color=colors[i % len(colors)], legend_label=lot)
        fig.legend.click_policy = "hide"
        return fig

    viz_model2 = model2_price.plot(model2_plotter, sorting_col="t")
    pn.Column(viz_model2).servable()
except Exception as e:
    print("Error in Model 2:", str(e))

Error in Model 2: 'ColumnBinaryOpExpression' object has no attribute 'fillna'


In [84]:
# Cell 15: Model 3 - Advanced Model (Multi-Lot)
# Includes vehicle type and interaction term
# Pricing Formula: price_t = 0.9 * last_price + 0.1 * (10 + occupancy_rate_t + 0.1 * queue_length_t + traffic_code_t + is_special_day_t + vehicle_weight_t + 0.05 * occupancy_rate_t * queue_length_t)
try:
    windowed_data = (
        data_with_time.windowby(
            data_with_time.t,
            instance=data_with_time.SystemCodeNumber,
            window=pw.temporal.tumbling(datetime.timedelta(minutes=30)),
            behavior=pw.temporal.exactly_once_behavior()
        )
        .reduce(
            t=pw.this._pw_window_end,
            system_code=pw.this.SystemCodeNumber,
            avg_occupancy_rate=pw.reducers.avg(pw.this.occupancy_rate),
            avg_queue_length=pw.reducers.avg(pw.this.QueueLength),
            avg_traffic_code=pw.reducers.avg(pw.this.traffic_code),
            avg_is_special_day=pw.reducers.avg(pw.this.IsSpecialDay),
            avg_vehicle_weight=pw.reducers.avg(pw.this.vehicle_weight),
            price=pw.reducers.avg(pw.this.price).fillna(10)
        )
    )

    model3_price = windowed_data.with_columns(
        price=0.9 * windowed_data.price.prev().fillna(10) + 0.1 * (
            10 + pw.this.avg_occupancy_rate + 0.1 * pw.this.avg_queue_length +
            pw.this.avg_traffic_code + pw.this.avg_is_special_day +
            pw.this.avg_vehicle_weight +
            0.05 * (pw.this.avg_occupancy_rate * pw.this.avg_queue_length)
        )
    ).select(
        t=windowed_data.t,
        system_code=windowed_data.system_code,
        price=pw.this.price
    )

    # Visualization
    def model3_plotter(source):
        fig = bokeh.plotting.figure(
            height=400,
            width=800,
            title="Model 3: Advanced Pricing (Multi-Lot)",
            x_axis_type="datetime",
            y_axis_label="Price ($)",
        )
        unique_lots = source.data["system_code"].unique()
        colors = ["green", "blue", "red", "purple", "orange"]
        for i, lot in enumerate(unique_lots[:5]):
            subset = source[source.data["system_code"] == lot]
            fig.line("t", "price", source=subset, line_width=2, color=colors[i % len(colors)], legend_label=lot)
        fig.legend.click_policy = "hide"
        return fig

    viz_model3 = model3_price.plot(model3_plotter, sorting_col="t")
    pn.Column(viz_model3).servable()
except Exception as e:
    print("Error in Model 3:", str(e))

Error in Model 3: 'ColumnBinaryOpExpression' object has no attribute 'fillna'


In [85]:
# Cell 16: Run the Pathway pipeline
%%capture
try:
    pw.run()
except Exception as e:
    print("Error in pipeline execution:", str(e))



In [86]:
# Cell 17: Save model outputs to CSV
try:
    model1_price.to_csv("model1_prices.csv", columns=["t", "system_code", "price"])
    model2_price.to_csv("model2_prices.csv", columns=["t", "system_code", "price"])
    model3_price.to_csv("model3_prices.csv", columns=["t", "system_code", "price"])
except Exception as e:
    print("Error saving CSVs:", str(e))

Error saving CSVs: Table has no column with name to_csv.


# Model Summary for Report
## Model Descriptions and Assumptions

### Model 1: Baseline Linear Model
- **Formula**: \( price_t = 0.9 \cdot last_price + 0.1 \cdot (10 + occupancy_rate_t) \)
- **Features**: Occupancy rate (averaged over 30-minute window)
- **Assumptions**: Smoothed pricing using previous window’s price; base price of $10 per lot.
- **Justification**: Simple model for demand-based pricing across multiple lots.

### Model 2: Intermediate Model
- **Formula**: \( price_t = 0.9 \cdot last_price + 0.1 \cdot (10 + occupancy_rate_t + 0.1 \cdot queue_length_t + traffic_code_t + is_special_day_t) \)
- **Features**: Adds queue length, traffic condition, special day (averaged over 30-minute window)
- **Assumptions**: Linear combination; queue length scaled to balance impact.
- **Justification**: Captures external demand drivers while remaining interpretable.

### Model 3: Advanced Model
- **Formula**: \( price_t = 0.9 \cdot last_price + 0.1 \cdot (10 + occupancy_rate_t + 0.1 \cdot queue_length_t + traffic_code_t + is_special_day_t + vehicle_weight_t + 0.05 \cdot occupancy_rate_t \cdot queue_length_t) \)
- **Features**: Adds vehicle type and interaction term (averaged over 30-minute window)
- **Assumptions**: Vehicle type reflects pricing impact; interaction term captures combined pressure.
- **Justification**: Comprehensive model balancing complexity and explainability.

### Notes
- Models use 30-minute tumbling windows to aggregate features per lot.
- Prices are smoothed using the previous window’s price, starting at $10.
- Visualizations show price trends for up to 5 lots (clickable legend).
- Debug outputs in Cells 6, 8, 9, and 12.1 help diagnose issues.