Installing pathway

In [None]:
!pip install pathway bokeh --quiet

Importing libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

In [None]:
df = pd.read_csv('/content/Modified - modified.csv')
df

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

MODEL 1

In [None]:
import datetime

# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day

BASE_PRICE = 10.0
ALPHA = 2.0  # Controls price sensitivity to occupancy

delta_window_model1 = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_sum=pw.reducers.sum(pw.this.Occupancy),      # Sum of occupancy over the window
        cap_sum=pw.reducers.sum(pw.this.Capacity),       # Sum of capacity over the window
        count=pw.reducers.count(),                       # Number of records in the window
    )
    .with_columns(
        # Compute average occupancy rate for the day
        avg_occupancy_rate = (pw.this.occ_sum / pw.this.cap_sum),
    )
    .with_columns(

        # Model 1 Pricing Formula (daily average):
        #   price = BASE_PRICE + ALPHA * avg_occupancy_rate
        price_model1 = BASE_PRICE + ALPHA * pw.this.avg_occupancy_rate
    )
)


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function for Model 1 (tumbling window)
def price_plotter_model1(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Model 1 Daily Parking Price (Tumbling Window)",
        x_axis_type="datetime",
    )
    # Plot a line graph showing how the Model 1 price evolves over time
    fig.line("t", "price_model1", source=source, line_width=2, color="navy", legend_label="Model 1 Price")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price_model1", source=source, size=6, color="red")

    fig.legend.location = "top_left"
    fig.xaxis.axis_label = "Date"
    fig.yaxis.axis_label = "Price ($)"
    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window_model1) to the Bokeh plot
# - 'price_plotter_model1' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window_model1.plot(price_plotter_model1, sorting_col="t")

# Create a Panel layout and make it servable as a web app
pn.Column(viz).servable()


In [None]:

pw.run()

MODEL 2

In [None]:
df = pd.read_csv('/content/Modified - modified.csv')
df

In [None]:
df['Timestamp'] = pd.to_datetime(
    df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# Save all relevant columns for Model 2
df[[
    "Timestamp",
    "Occupancy",
    "Capacity",
    "QueueLength",
    "TrafficConditionNearby",
    "IsSpecialDay",
    "VehicleType"
]].to_csv("parking_stream_model2.csv", index=False)

In [None]:
class ParkingSchemaModel2(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str

In [None]:
data = pw.demo.replay_csv(
    "parking_stream_model2.csv",
    schema=ParkingSchemaModel2,
    input_rate=1000
)

In [None]:
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

In [None]:
import datetime
import pathway as pw

# --- Feature Mapping Dictionaries and Functions ---
traffic_map = {'low': 0, 'average': 1, 'high': 2}
vehicle_type_weight = {'car': 1.0, 'bike': 0.7, 'truck': 1.3, 'cycle': 0.5}

def map_traffic(val):
    return float(traffic_map.get(val, 1))

def map_vehicle(val):
    return float(vehicle_type_weight.get(val, 1.0))

def clip_price(val):
    lower = BASE_PRICE * 0.5
    upper = BASE_PRICE * 2.0
    return max(lower, min(val, upper))


# --- Map Categorical Features to Numeric ---
data_with_features = data_with_time.with_columns(
    TrafficLevelNum = pw.apply_with_type(map_traffic, float, data_with_time.TrafficConditionNearby),
    VehicleTypeWeight = pw.apply_with_type(map_vehicle, float, data_with_time.VehicleType)
)

# --- Demand Function Coefficients ---
ALPHA = 1.0
BETA = 0.2
GAMMA = 0.3
DELTA = 0.5
EPSILON = 0.5
LAMBDA = 0.5
BASE_PRICE = 10.0

# --- Daily Tumbling Window and Aggregation ---
delta_window_model2 = (
    data_with_features.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        occ_sum=pw.reducers.sum(pw.this.Occupancy),
        cap_sum=pw.reducers.sum(pw.this.Capacity),
        queue_sum=pw.reducers.sum(pw.this.QueueLength),
        traffic_sum=pw.reducers.sum(pw.this.TrafficLevelNum),
        special_sum=pw.reducers.sum(pw.this.IsSpecialDay),
        vehicle_sum=pw.reducers.sum(pw.this.VehicleTypeWeight),
        count=pw.reducers.count(),
    )
    .with_columns(
        avg_occupancy_rate = pw.this.occ_sum / pw.this.cap_sum,
        avg_queue = pw.this.queue_sum / pw.this.count,
        avg_traffic = pw.this.traffic_sum / pw.this.count,
        avg_special = pw.this.special_sum / pw.this.count,
        avg_vehicle = pw.this.vehicle_sum / pw.this.count,
    )
    .with_columns(
        raw_demand = (
            ALPHA * pw.this.avg_occupancy_rate +
            BETA * pw.this.avg_queue -
            GAMMA * pw.this.avg_traffic +
            DELTA * pw.this.avg_special +
            EPSILON * pw.this.avg_vehicle
        )
    )
)

# --- Compute Global Min/Max of Raw Demand (single-row table) ---
demand_stats = delta_window_model2.reduce(
    min_demand = pw.reducers.min(pw.this.raw_demand),
    max_demand = pw.reducers.max(pw.this.raw_demand)
)

# --- Use ix_ref() to Broadcast Min/Max to All Rows ---
delta_window_model2 = delta_window_model2.with_columns(
    min_demand = demand_stats.ix_ref().min_demand,
    max_demand = demand_stats.ix_ref().max_demand
)

# --- Normalize Demand ---
delta_window_model2 = delta_window_model2.with_columns(
    normalized_demand = (pw.this.raw_demand - pw.this.min_demand) / (pw.this.max_demand - pw.this.min_demand + 1e-6)
)

# --- Compute and Clip Price ---
delta_window_model2 = delta_window_model2.with_columns(
    price_model2 = pw.apply_with_type(
        clip_price, float,
        BASE_PRICE * (1 + LAMBDA * pw.this.normalized_demand)
    )
)

In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function for Model 2 (Demand-Based Price Function)
def price_plotter_model2(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Model 2 Daily Parking Price",
        x_axis_type="datetime",
    )
    # Plot a line graph showing how the Model 2 price evolves over time
    fig.line("t", "price_model2", source=source, line_width=2, color="navy", legend_label="Model 2 Price")
    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price_model2", source=source, size=6, color="red")
    fig.legend.location = "top_left"
    fig.xaxis.axis_label = "Date"
    fig.yaxis.axis_label = "Price ($)"
    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window_model2) to the Bokeh plot
# - 'price_plotter_model2' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window_model2.plot(price_plotter_model2, sorting_col="t")

# Create a Panel layout and make it servable as a web app
pn.Column(viz).servable()


In [None]:
pw.run()