In [1]:
  !pip install pathway bokeh --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m49.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m84.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

## Importing the libreries

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import pathway as pw
from datetime import datetime
import bokeh.plotting
import panel as pn
import time
from pathway.internals.dtype import FLOAT, INT, STR

## Data loading and Feature selection

In [3]:
df = pd.read_csv("dataset.csv")

# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)
df = df[['Capacity','Occupancy', 'VehicleType', 'TrafficConditionNearby', 'QueueLength','IsSpecialDay','Timestamp']]

df.to_csv('model2.csv',index=False)

## Defining Schema for pipeline and Creating Helping functions

In [4]:
class ParkingSchema(pw.Schema):
    Capacity: int
    Occupancy: int
    VehicleType: str
    TrafficConditionNearby: str
    QueueLength: int
    IsSpecialDay: int
    Timestamp: str

data = pw.demo.replay_csv("model2.csv", schema=ParkingSchema, input_rate=100)

# Mapping dictionaries
vehicle_weight_map = {"car": 1.0, "truck": 2.0, "bike": 0.8, "cycle": 0.5}
traffic_score_map = {"low": 0.2, "average": 0.5, "high": 0.8}

# Helper functions
def safe_float(x):  return float(x) if x is not None else 0.0
def safe_int(x):    return int(x) if x is not None else 0
def map_vehicle(vt): return vehicle_weight_map.get(vt.lower(), 1.0) if vt else 1.0
def map_traffic(tc): return traffic_score_map.get(tc.lower(), 0.5) if tc else 0.5
# def normalize_queue(q): return float(q) / 15.0 if q is not None else 0.0

## Feature Engneering In pipeline

In [5]:
import datetime

# Format used in Timestamp column
fmt = "%Y-%m-%d %H:%M:%S"

# Step 2: Extract all necessary features with cleaned types and parsed time
data_with_time = data.with_columns(
    # Parse timestamp
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),

    # Feature engineering with explicit types
    Capacity = pw.declare_type(FLOAT, pw.apply(safe_float, data.Capacity)),
    Occupancy = pw.declare_type(FLOAT, pw.apply(safe_float, data.Occupancy)),
    QueueLength = pw.declare_type(FLOAT, pw.apply(safe_float, data.QueueLength)),
    IsSpecialDay = pw.declare_type(INT, pw.apply(safe_int, data.IsSpecialDay)),
    VehicleTypeWeight = pw.declare_type(FLOAT, pw.apply(map_vehicle, data.VehicleType)),
    TrafficConditionScore = pw.declare_type(FLOAT, pw.apply(map_traffic, data.TrafficConditionNearby)),
    # QueueLengthNormalized = pw.declare_type(FLOAT, pw.apply(normalize_queue, data.QueueLength))
)


## Creating Class for Multiple Linear regression inside the pipeline

In [11]:
class RealTimeDemandPricing:
    def __init__(self):
        self.base_price = 10
        self.weights = np.array([1.0, 0.05, -0.4, 0.3, 0.2])  # Initial guess
        self.X_list = []
        self.y_list = []
        self.prev_price = 10
        self.min_d = 0
        self.max_d = 1
        self.lam = 0  # lambda to is initialised with 0 to make for baseprice
        self.lr_lambda = 0.1  # learning rate for lambda

    def normalize(self, d):
        self.min_d = min(self.min_d, d)
        self.max_d = max(self.max_d, d)
        return (d - self.min_d) / (self.max_d - self.min_d)

    def update_lambda(self, norm_d):
        error = norm_d - self.min_d
        self.lam += self.lr_lambda * error
        # cosidering as norm_d varies [0 1] So, price to be between 0.5X to 2.0X lambda should lie between [-0.5 to 1.0]
        self.lam = max(-0.5, min(1.0, self.lam))  # bound lambda
        return self.lam

    def update(self, occ, cap, qn, traf, special, veh_weight):
        x = np.array([occ / cap, qn, traf, special, veh_weight])
        demand = np.dot(self.weights, x)
        norm_d = self.normalize(demand)

        # Update lambda adaptively
        self.update_lambda(norm_d)

        # Compute price with updated lambda
        price = self.base_price * (1 + self.lam * norm_d)
        delta = (price - self.prev_price) / self.base_price

        self.X_list.append(x)
        self.y_list.append(delta)

        # Real-time weight update
        if len(self.X_list) >= 10:
            X_np = np.array(self.X_list)
            y_np = np.array(self.y_list)
            try:
                theta = np.linalg.pinv(X_np.T @ X_np) @ X_np.T @ y_np
                self.weights = theta.flatten()
                self.X_list.clear()
                self.y_list.clear()
            except:
                print("Weights update error")

        self.prev_price = price
        return price

## Defining the tumbling window for Price

In [12]:
from datetime import timedelta
model = RealTimeDemandPricing()

Price_stream = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        day=pw.reducers.any(pw.this.day),
        Capacity=pw.reducers.max(pw.this.Capacity),
        Occupancy=pw.reducers.avg(pw.this.Occupancy),
        QueueLength=pw.reducers.avg(pw.this.QueueLength),
        TrafficConditionScore=pw.reducers.avg(pw.this.TrafficConditionScore),
        IsSpecialDay=pw.reducers.max(pw.this.IsSpecialDay),
        VehicleTypeWeight=pw.reducers.avg(pw.this.VehicleTypeWeight)
    )
    .with_columns(
        Price=pw.declare_type(FLOAT,pw.apply(
            model.update,
            pw.this.Occupancy,
            pw.this.Capacity,
            pw.this.QueueLength,
            pw.this.TrafficConditionScore,
            pw.this.IsSpecialDay,
            pw.this.VehicleTypeWeight
        ))
    )
)


## Visualisation using bokeh

In [13]:

def plot_price_over_time(source):
    fig = bokeh.plotting.figure(
        height=600,
        width=1200,
        title="Demand Based Price vs Time",
        x_axis_type="datetime",
    )
    fig.line("t", "Price", source=source, line_width=1, color="blue")
    fig.circle("t", "Price", source=source, size=6, color="black")
    return fig

# Generate the visualization from the final price stream
viz = Price_stream.plot(plot_price_over_time, sorting_col="t")

# Panel UI
pn.extension()
pn.Column(viz).servable()




In [14]:
pw.run()

Output()

