In [24]:
!pip install pathway bokeh panel --quiet

In [25]:
import pathway as pw
import pandas as pd
import panel as pn
import bokeh.plotting
from datetime import timedelta
import numpy as np
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import panel as pn
pn.extension()

In [26]:
df = pd.read_csv('dataset.csv') #loading the dataset and visualizing it
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [27]:
# Define schema for Pathway streaming
class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficLevel: float
    IsSpecialDay: int
    VehicleTypeWeight: float

In [28]:
import os

# Load simulated stream from preprocessed CSV
# Construct the absolute path to the file
file_path = "/content/parking_stream_full.csv"

stream = pw.demo.replay_csv(
    file_path,
    schema=ParkingSchema,
    input_rate=1000  # rows per second
)

In [29]:
# Convert timestamp string to datetime
fmt = "%Y-%m-%d %H:%M:%S"
stream = stream.with_columns(
    t = stream.Timestamp.dt.strptime(fmt),
    lot = stream.SystemCodeNumber
)

KeyError: 'Table has no column with name _pw_instance.'

In [None]:
# Define expressions to compute mean using sum / count for each metric
# (will be used after the reduce step)
# Note: Just symbolic – actual computation done inside `with_columns` later.
sum = pw.reducers.sum(pw.this.column)
count = pw.reducers.count()
mean = sum / count  # Example definition for concept clarity

# Aggregate metrics using 15-minute tumbling window


aggregated = (
    stream.windowby(
        pw.this.t,                      # Timestamp column to drive windowing
        instance=pw.this.lot,           # Group by parking lot for each window
        window=pw.temporal.tumbling(timedelta(minutes=15)),  # 15-min fixed-size windows
        behavior=pw.temporal.exactly_once_behavior()         # Ensures no duplication or skipping
    )
    .reduce(
        t = pw.this._pw_window_end,               # Use window end as the time reference
        _pw_instance = pw.this._pw_instance,      # Explicitly retain instance (lot ID)
        
        # Aggregate sums for computing means later
        occ_sum = pw.reducers.sum(pw.this.Occupancy),
        cap_sum = pw.reducers.sum(pw.this.Capacity),
        queue_sum = pw.reducers.sum(pw.this.QueueLength),
        traffic_sum = pw.reducers.sum(pw.this.TrafficLevel),
        veh_sum = pw.reducers.sum(pw.this.VehicleTypeWeight),
        
        # Count number of records per window to compute average
        count = pw.reducers.count(),
        
        # Check if any special day flag exists in this window (max == 1 if any special day)
        IsSpecialDay = pw.reducers.max(pw.this.IsSpecialDay),
    )
)


# Compute means and reformat the table with final columns


windowed = aggregated.with_columns(
    t = pw.this.t,                                # Window end timestamp
    Lot = pw.this._pw_instance,                   # Parking lot ID (grouping key)
    
    # Compute mean values for all features
    Occupancy = pw.this.occ_sum / pw.this.count,
    Capacity = pw.this.cap_sum / pw.this.count,
    QueueLength = pw.this.queue_sum / pw.this.count,
    TrafficLevel = pw.this.traffic_sum / pw.this.count,
    VehicleTypeWeight = pw.this.veh_sum / pw.this.count,
    
    # Directly carry forward special day flag
    IsSpecialDay = pw.this.IsSpecialDay
)

In [39]:
# Define UDF to compute demand-based dynamic pricing


@pw.udf
def demand_price(Occupancy, Capacity, QueueLength, TrafficLevel, IsSpecialDay, VehicleTypeWeight,
                 alpha=1.5, beta=1.2, gamma=1.0, delta=2.0, epsilon=1.0, lam=0.5):
    """
    Calculates the price based on multiple demand indicators:
    - Occupancy rate: how full the parking lot is.
    - Queue length: number of waiting vehicles.
    - Traffic level: traffic congestion near the parking.
    - IsSpecialDay: whether today is a holiday or event.
    - VehicleTypeWeight: car > bike > truck sensitivity.
    
    The formula builds a weighted demand score and normalizes it to [0,1],
    which is then scaled and added to the base price (10).
    Final price is clipped between 5 and 20.
    """
    
    # Weighted demand signal from all factors
    demand = (alpha * (Occupancy / Capacity) +
              beta * QueueLength -
              gamma * TrafficLevel +
              delta * IsSpecialDay +
              epsilon * VehicleTypeWeight)
    
    # Normalize demand to a ~[0,1] scale
    norm_demand = (demand + 5) / 10

    # Final dynamic price based on normalized demand
    price = 10 * (1 + lam * norm_demand)

    # Clip price to stay within $5 to $20
    return round(max(5, min(price, 20)), 2)



#Apply pricing model to aggregated windowed data


pricing_table = windowed.select(
    Lot = pw.this.Lot,    # Parking lot ID
    t = pw.this.t,        # Timestamp (end of 15-minute window)
    
    # Apply the demand_price function to compute final price
    DemandPrice = demand_price(
        pw.this.Occupancy,
        pw.this.Capacity,
        pw.this.QueueLength,
        pw.this.TrafficLevel,
        pw.this.IsSpecialDay,
        pw.this.VehicleTypeWeight
    )
)

KeyError: 'Table has no column with name Lot.'

In [21]:
from bokeh.layouts import gridplot
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
import panel as pn

# Activate Panel for Bokeh integration in interactive dashboards
pn.extension()


# Function: lotwise_plotter(source)
# Description:
#   Creates individual time series plots (Bokeh figures) 
#   for each parking lot using demand-based pricing data.
#   Returns a grid of plots organized by lot.


def lotwise_plotter(source):
    plots = []               # List to store individual Bokeh plots
    grouped_data = {}        # Dictionary to hold ColumnDataSource per lot

 
    #Group data by 'Lot'
  
    if source.data:
        # Loop over all unique Lot IDs in the dataset
        for lot_id in set(source.data['Lot']):
            # Find the row indices in source.data belonging to this lot
            indices = [i for i, lot in enumerate(source.data['Lot']) if lot == lot_id]

            # Create a new ColumnDataSource with filtered data for this lot
            grouped_data[lot_id] = ColumnDataSource({
                col: [source.data[col][i] for i in indices]  # Select only rows for this lot
                for col in source.data.keys()
            })

   
    #Create a Bokeh line plot for each parking lot

    for lot_id, lot_source in grouped_data.items():
        # Initialize a Bokeh figure with time on x-axis
        p = figure(
            title=f"Lot {lot_id} - Demand-Based Pricing",
            x_axis_type="datetime",  # Time-based x-axis
            width=400,
            height=300
        )

        # Plot the demand-based price over time
        p.line("t", "DemandPrice", source=lot_source, line_width=2, color="navy")

        # Add circular markers at each point for visibility
        p.circle("t", "DemandPrice", source=lot_source, size=5, color="red")

        # Add axis labels
        p.xaxis.axis_label = "Time"
        p.yaxis.axis_label = "Price ($)"

        # Store the plot
        plots.append(p)

    
    #Organize all plots in a grid layout (3 per row)
    
    grid = gridplot([plots[i:i + 3] for i in range(0, len(plots), 3)])
    return grid

In [22]:
# From your Pathway output table 'windowed'
multi_viz = windowed.plot(
    lotwise_plotter,
    sorting_col="t",
)



In [23]:
pn.Column(multi_viz).servable()

Output()