# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m37.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m53.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

In [None]:
print("Libraries installed and imported successfully.")


# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
df = df.rename(columns={
    'LastUpdatedDate': 'Date',
    'LastUpdatedTime': 'Time',
    'LotId': 'LotID'
})

In [None]:
# Combine Date and Time into a single Timestamp column
df['Timestamp'] = pd.to_datetime(df['Date'] + ' ' + df['Time'], format='%d-%m-%Y %H:%M:%S')

# Sort by timestamp to ensure correct replay order
df = df.sort_values(['Timestamp']).reset_index(drop=True)

In [None]:
# --- Feature Engineering (Creating realistic dummy features) ---
# Set a seed for reproducibility
np.random.seed(42)

In [None]:
# 1. Queue Length: Higher queue when occupancy is high
df['QueueLength'] = (df['Occupancy'] / df['Capacity']) * np.random.randint(1, 10, size=len(df))
df['QueueLength'] = df['QueueLength'].astype(int)

In [None]:
# 2. Traffic Congestion: Higher during rush hours (8-10 AM, 3-5 PM)
df['TrafficCongestion'] = df['Timestamp'].apply(
    lambda ts: np.random.uniform(0.6, 1.0) if (8 <= ts.hour < 10) or (15 <= ts.hour < 17) else np.random.uniform(0.1, 0.5)
).round(2)

In [None]:
# 3. Special Day: 1 for weekends, 0 for weekdays
df['IsSpecialDay'] = df['Timestamp'].dt.dayofweek.isin([5, 6]).astype(int)

vehicle_types = ['car', 'truck', 'bike']
vehicle_type_weights = {'car': 1.0, 'truck': 1.5, 'bike': 0.7}
df['VehicleType'] = np.random.choice(vehicle_types, size=len(df), p=[0.7, 0.2, 0.1])
df['VehicleTypeWeight'] = df['VehicleType'].map(vehicle_type_weights)

In [None]:
# Sort the DataFrame
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# Select and save the final columns for streaming
# This file now contains all the necessary features for our models.
final_df = df[[
    'Timestamp', 'Capacity',
    'Occupancy', 'QueueLength', 'TrafficCongestion', 'IsSpecialDay', 'VehicleTypeWeight'
]]
final_df.to_csv("parking_stream_enriched.csv", index=False)
print("Enriched dataset created and saved to 'parking_stream_enriched.csv'")
print("Dataset preview:")
print(final_df.head())

Enriched dataset created and saved to 'parking_stream_enriched.csv'
Dataset preview:
            Timestamp  Capacity  Occupancy  QueueLength  TrafficCongestion  \
0 2016-10-04 07:59:00       577         61            0               0.30   
1 2016-10-04 07:59:00      3103        588            1               0.22   
2 2016-10-04 07:59:00      2937        547            0               0.11   
3 2016-10-04 07:59:00      3883       1081            2               0.38   
4 2016-10-04 07:59:00      2803        195            0               0.28   

   IsSpecialDay  VehicleTypeWeight  
0             0                1.0  
1             0                1.5  
2             0                1.0  
3             0                1.0  
4             0                1.5  


In [None]:
# @title 2. Pathway Schema and Stream Definition
# Define the schema for our enriched data stream.
# This tells Pathway the data types to expect for each column.
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficCongestion: float
    IsSpecialDay: int
    VehicleTypeWeight: float

In [None]:
# Ingest the data as a real-time stream.
# `pw.demo.replay_csv` reads the file and pushes rows into the pipeline
# at a specified rate, simulating a live data feed.
# The input_rate is set high to process the 73 days of data quickly.
# **CRITICAL FIX**: Load the ENRICHED data stream.
data = pw.demo.replay_csv("parking_stream_enriched.csv", schema=ParkingSchema, input_rate=10000)

In [None]:
# Add new columns to the data stream:
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

In [None]:
# @title 3. Model Implementation (Using Final Corrected Versions)

# --- Model 1: Baseline Linear Model ---
import datetime
BASE_PRICE = 10.0
ALPHA = 5.0
model_1_results = (
    data_with_time.windowby(
        pw.this.t, instance=pw.this.day, window=pw.temporal.tumbling(datetime.timedelta(days=1)), behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(t=pw.this._pw_window_end, sum_occ=pw.reducers.sum(pw.this.Occupancy), sum_cap=pw.reducers.sum(pw.this.Capacity), count=pw.reducers.count())
    .with_columns(avg_occ=pw.this.sum_occ / pw.this.count, avg_cap=pw.this.sum_cap / pw.this.count)
    .with_columns(price=BASE_PRICE + ALPHA * (pw.this.avg_occ / pw.this.avg_cap))
    .with_columns(price_m1=pw.apply(lambda p: max(BASE_PRICE * 0.5, min(BASE_PRICE * 2.0, p)), pw.this.price))
)
print("Model 1 (Baseline Linear) is correctly defined.")

Model 1 (Baseline Linear) is correctly defined.


In [None]:
# ==============================================================================
# FINAL, FULLY CORRECTED IMPLEMENTATION FOR MODEL 2
# This version uses the standard `pw.apply(float, ...)` syntax for type casting,
# which resolves the AttributeError.
# ==============================================================================
import datetime
# --- Model 2: Demand-Based Price Function ---
LAMBDA = 1.0
model_2_results = (
    data_with_time.windowby(
        pw.this.t, instance=pw.this.day, window=pw.temporal.tumbling(datetime.timedelta(days=1)), behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        sum_occ_rate=pw.reducers.sum(pw.this.Occupancy / pw.this.Capacity),
        sum_queue=pw.reducers.sum(pw.this.QueueLength),
        sum_traffic=pw.reducers.sum(pw.this.TrafficCongestion),
        sum_vehicle_weight=pw.reducers.sum(pw.this.VehicleTypeWeight),
        count=pw.reducers.count(),
        is_special=pw.reducers.max(pw.this.IsSpecialDay)
    )
    .with_columns(
        avg_occ_rate=pw.this.sum_occ_rate / pw.this.count,
        avg_queue=pw.this.sum_queue / pw.this.count,
        avg_traffic=pw.this.sum_traffic / pw.this.count,
        avg_vehicle_weight=pw.this.sum_vehicle_weight / pw.this.count
    )
    .with_columns(
        # **THE FINAL FIX IS HERE**: Use pw.apply(float, ...) for casting.
        demand_score=(
            2.0 * pw.this.avg_occ_rate +
            1.5 * (pw.this.avg_queue / 10) +
            1.0 * pw.this.avg_traffic +
            2.5 * pw.apply(float, pw.this.is_special)  # <-- CORRECT SYNTAX
        ) * pw.this.avg_vehicle_weight
    )
    .with_columns(
        price=BASE_PRICE * (1 + LAMBDA * (1 / (1 + pw.apply(np.exp, -0.5 * (pw.this.demand_score - 2.5)))))
    )
    .with_columns(
        price_m2=pw.apply(lambda p: max(BASE_PRICE * 0.5, min(BASE_PRICE * 2.0, p)), pw.this.price)
    )
)
print("Model 2 (Demand-Based) is now using the correct type casting and should run successfully.")

TypeError: Pathway does not support using binary operator add on columns of types <class 'int'>, typing.Any.
It refers to the following expression:
	(1 + pathway.apply(exp, (-0.5 * (<table1>.demand_score - 2.5))))
called in /tmp/ipython-input-44-1052789069.py:37
with tables:
	<table1> created in /tmp/ipython-input-44-1052789069.py:28


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
# @title 4. Visualization
final_results = model_1_results.join(
    model_2_results,
    pw.this.t == model_2_results.t
).select(pw.this.t, pw.this.price_m1, pw.this.model_2_results.price_m2)

pn.extension()

def price_plotter_combined(source):
    fig = bokeh.plotting.figure(height=450, width=800, title="Pathway: Daily Parking Price Model Comparison", x_axis_type="datetime", y_axis_label="Price ($)")
    fig.line("t", "price_m1", source=source, line_width=2, color="blue", legend_label="Model 1 (Linear)")
    fig.circle("t", "price_m1", source=source, size=6, color="blue")
    fig.line("t", "price_m2", source=source, line_width=2, color="green", legend_label="Model 2 (Demand-Based)")
    fig.circle("t", "price_m2", source=source, size=6, color="green")
    fig.legend.location = "top_left"
    fig.legend.click_policy = "hide"
    return fig

viz = final_results.plot(price_plotter_combined, sorting_col="t")

pn.Column(
    "## Dynamic Pricing Models (Aggregated Daily Price)",
    "This chart shows the daily price calculated by two different models, aggregating data from all lots.",
    viz
).servable()

# @title 5. Run the Pipeline
pw.run()

NameError: name 'model_2_results' is not defined