In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('dataset.csv')
df.head()

In [None]:
df.shape

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
df.head()

Baseline Linear Model

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location

In [None]:
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Demand-Based Price Function


In [None]:
df[["Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]].to_csv("parking_stream2.csv", index=False)

In [None]:
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str

data = pw.demo.replay_csv("parking_stream2.csv", schema=ParkingSchema, input_rate=1000)

data_filtered = data.select(
    data.Timestamp,
    data.Occupancy,
    data.Capacity,
    data.QueueLength,
    data.TrafficConditionNearby,
    data.IsSpecialDay,
    data.VehicleType
)

In [None]:
# Define weights for the demand function features
alpha = 0.6  # Weight for Occupancy rate
beta = 0.2   # Weight for Queue Length
gamma = 0.1  # Weight for Traffic Condition
delta = 0.5  # Weight for Special Day
epsilon = 0.3 # Weight for Vehicle Type

# Define the price adjustment factor
lambda_val = 1.0

# Define the base price
BASE_PRICE = 10

In [None]:
@pw.udf
def calculate_demand(occupancy: int, capacity: int, queue_length: int, traffic_condition: str, is_special_day: int, vehicle_type: str, alpha: float, beta: float, gamma: float, delta: float, epsilon: float) -> float:
    # Handle potential division by zero
    occupancy_rate = occupancy / capacity if capacity > 0 else 0

    # Map traffic condition to a numerical value
    traffic_mapping = {"low": 0.2, "moderate": 0.6, "high": 1.0}
    traffic_value = traffic_mapping.get(traffic_condition.lower(), 0.5)

    # Map vehicle type to a numerical value
    vehicle_mapping = {"car": 0.5, "bike": 0.3, "cycle": 0.1, "others": 0.7}
    vehicle_value = vehicle_mapping.get(vehicle_type.lower(), 0.5)

    # Calculate demand using the formula
    demand = (alpha * occupancy_rate +
              beta * queue_length +
              gamma * traffic_value +
              delta * is_special_day +
              epsilon * vehicle_value)

    return demand

In [None]:
@pw.udf
def calculate_price(demand: float, base_price: float, lambda_val: float) -> float:
    # Calculate the raw price
    raw_price = base_price + lambda_val * demand

    # Define minimum and maximum price bounds
    min_price = 5.0
    max_price = 25.0

    # Apply bounding to the calculated price
    bounded_price = max(min_price, min(max_price, raw_price))

    return bounded_price

In [None]:
# Apply the calculate_demand UDF to the data_filtered table
data_with_demand = data_filtered.with_columns(
    Demand=calculate_demand(
        data_filtered.Occupancy,
        data_filtered.Capacity,
        data_filtered.QueueLength,
        data_filtered.TrafficConditionNearby,
        data_filtered.IsSpecialDay,
        data_filtered.VehicleType,
        alpha,
        beta,
        gamma,
        delta,
        epsilon
    )
)

# Apply the calculate_price UDF to the data_with_demand table
price_data = data_with_demand.with_columns(
    PredictedPrice=calculate_price(
        data_with_demand.Demand,
        BASE_PRICE,
        lambda_val
    )
).select(
    data_with_demand.Timestamp,
    pw.this.PredictedPrice
)

In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price (Demand-Based Model)",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time, using Timestamp and PredictedPrice
    fig.line("Timestamp", "PredictedPrice", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility, using Timestamp and PredictedPrice
    fig.circle("Timestamp", "PredictedPrice", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (price_data) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="Timestamp"' ensures the data is plotted in time order
viz = price_data.plot(price_plotter, sorting_col="Timestamp")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

In [None]:
pw.run()