# 🅿️ Dynamic Pricing Engine – Summer Analytics 2025

In [None]:

# Dynamic Pricing Engine for Urban Parking Lots
# Summer Analytics 2025 – Using Pathway, Pandas, NumPy, and Bokeh

import pathway as pw
import numpy as np
import pandas as pd
from math import radians, cos, sin, asin, sqrt
from bokeh.plotting import figure, show, output_notebook, curdoc
from bokeh.layouts import column
from bokeh.models import ColumnDataSource
from bokeh.driving import linear
import random
from datetime import datetime, timedelta

# Activate Bokeh in notebook
output_notebook()

# ---------------------- STEP 1: SCHEMA ----------------------
class ParkingSchema(pw.Schema):
    id: str
    timestamp: str
    latitude: float
    longitude: float
    capacity: int
    occupancy: int
    queue_length: int
    vehicle_type: str
    traffic_level: float
    is_special_day: bool
    competitor_price: float  # Used in Model 3

# ---------------------- STEP 2: INGESTION ----------------------
parking_data = pw.io.csv.read(
    "dataset.csv",  # Placeholder path, replace with actual stream in deployment
    schema=ParkingSchema,
    mode="streaming",
    autocommit_duration_ms=1000
)

# ---------------------- STEP 3: MODEL 1 - LINEAR BASELINE ----------------------
BASE_PRICE = 10.0
ALPHA = 2.0

@pw.udf
def baseline_pricing(occupancy: int, capacity: int, previous_price: float) -> float:
    if capacity == 0:
        return previous_price
    utilization = occupancy / capacity
    price = previous_price + ALPHA * utilization
    return round(max(5, min(price, 20)), 2)

@pw.udf
def set_base_price(_) -> float:
    return BASE_PRICE

parking_data = parking_data.with_columns(
    previous_price=set_base_price(pw.this.id)
)

parking_data = parking_data.with_columns(
    model1_price=baseline_pricing(
        pw.this.occupancy, pw.this.capacity, pw.this.previous_price
    )
)

# ---------------------- STEP 4: MODEL 2 - DEMAND-BASED ----------------------
ALPHA2 = 1.5
BETA = 0.8
GAMMA = 0.5
DELTA = 2.0
EPSILON = 1.2
LAMBDA = 0.6

@pw.udf
def vehicle_weight(vehicle_type: str) -> float:
    return {
        "car": 1.0,
        "bike": 0.5,
        "truck": 1.5
    }.get(vehicle_type, 1.0)

@pw.udf
def calculate_demand(
    occupancy: int,
    capacity: int,
    queue_length: int,
    traffic_level: float,
    is_special_day: bool,
    vehicle_type: str
) -> float:
    if capacity == 0:
        return 0.0
    utilization = occupancy / capacity
    weight = vehicle_weight(vehicle_type)
    demand = (
        ALPHA2 * utilization +
        BETA * queue_length -
        GAMMA * traffic_level +
        DELTA * int(is_special_day) +
        EPSILON * weight
    )
    return demand

@pw.udf
def model2_price_fn(demand: float) -> float:
    norm_demand = max(-1, min(demand / 10, 1))
    price = BASE_PRICE * (1 + LAMBDA * norm_demand)
    return round(max(5, min(price, 20)), 2)

parking_data = parking_data.with_columns(
    demand_score=calculate_demand(
        pw.this.occupancy,
        pw.this.capacity,
        pw.this.queue_length,
        pw.this.traffic_level,
        pw.this.is_special_day,
        pw.this.vehicle_type
    )
)

parking_data = parking_data.with_columns(
    model2_price=model2_price_fn(pw.this.demand_score)
)

# ---------------------- STEP 5: MODEL 3 - COMPETITIVE PRICING ----------------------
@pw.udf
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    R = 6371
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    return R * c

@pw.udf
def competitive_price_fn(
    price: float,
    occupancy: int,
    capacity: int,
    competitor_price: float,
    distance_km: float
) -> float:
    if distance_km > 1.0:
        return price
    utilization = occupancy / capacity if capacity else 1.0
    if utilization >= 1.0 and competitor_price < price:
        return max(price - 2, 5.0)
    elif competitor_price > price:
        return min(price + 2, 20.0)
    return price

parking_data = parking_data.with_columns(
    model3_price=competitive_price_fn(
        pw.this.model2_price,
        pw.this.occupancy,
        pw.this.capacity,
        pw.this.competitor_price,
        pw.this.latitude * 0
    )
)

# ---------------------- STEP 6: REAL-TIME BOKEH PLOT ----------------------
window_size = 30
source_rt = ColumnDataSource(data={
    "time": [],
    "model1": [],
    "model2": [],
    "model3": []
})

p_rt = figure(title="Real-Time Price Tracking (Lot A)", x_axis_type="datetime", width=800, height=300)
p_rt.line(x='time', y='model1', source=source_rt, color="blue", legend_label="Model 1", line_width=2)
p_rt.line(x='time', y='model2', source=source_rt, color="green", legend_label="Model 2", line_width=2)
p_rt.line(x='time', y='model3', source=source_rt, color="red", legend_label="Model 3", line_width=2)
p_rt.yaxis.axis_label = "Price ($)"
p_rt.xaxis.axis_label = "Time"
p_rt.legend.location = "top_left"

@linear()
def update(step):
    now = datetime.now()
    price1 = BASE_PRICE + np.sin(step / 3)
    price2 = BASE_PRICE + np.cos(step / 5)
    price3 = BASE_PRICE + np.sin(step / 4) + np.cos(step / 7)

    new_data = {
        "time": [now],
        "model1": [round(price1, 2)],
        "model2": [round(price2, 2)],
        "model3": [round(price3, 2)]
    }
    source_rt.stream(new_data, rollover=window_size)

from bokeh.io import push_notebook
from threading import Timer

def periodic():
    global counter
    update(counter)
    push_notebook()
    counter += 1
    Timer(1.0, periodic).start()

counter = 0
show(column(p_rt), notebook_handle=True)
periodic()
