In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m39.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m69.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn
import math

In [None]:
# Load original CSV
df = pd.read_csv('dataset.csv')

# Create combined timestamp
df['Timestamp'] = pd.to_datetime(
    df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)

# Optional: filter for fewer lots for speed during testing
# df = df[df["SystemCodeNumber"] == "1"]

# Sort and save CSV for Pathway streaming
df[[
    "Timestamp",
    "SystemCodeNumber",
    "Latitude",
    "Longitude",
    "Occupancy",
    "Capacity",
    "QueueLength",
    "TrafficConditionNearby",
    "IsSpecialDay",
    "VehicleType"
]].to_csv("parking_stream.csv", index=False)


In [None]:
class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Latitude: float
    Longitude: float
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str


In [None]:
data = pw.demo.replay_csv(
    "parking_stream.csv",
    schema=ParkingSchema,
    input_rate=1000
)


In [None]:
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt)
)


In [None]:
base_price = 10.0
alpha = 1.0
beta = 0.1
gamma = 0.5
delta = 0.3
epsilon = 1.0
lambda_coeff = 0.5
min_price = 5.0
max_price = 20.0


In [None]:
@pw.udf
def vehicle_type_weight(vtype) -> float:
    return {
        'car': 0.3,
        'bike': 0.1,
        'truck': 0.5
    }.get(str(vtype).lower(), 0.2)

@pw.udf
def traffic_level_score(tlevel) -> float:
    return {
        'low': 0.1,
        'average': 0.5,
        'high': 1.0
    }.get(str(tlevel).lower(), 0.5)

@pw.udf
def multiply(x, weight) -> float:
    if x is None:
        return 0.0
    return float(x * weight)

@pw.udf
def normalize_demand(demand) -> float:
    return min(1.0, max(0.0, demand / 5.0))

@pw.udf
def compute_price(demand_norm) -> float:
    price = base_price * (1 + lambda_coeff * demand_norm)
    return min(max_price, max(min_price, price))


In [None]:
data_with_demand = data_with_time.with_columns(
    occ_ratio = pw.this.Occupancy / pw.this.Capacity,
    vehicle_score = vehicle_type_weight(pw.this.VehicleType),
    traffic_score = traffic_level_score(pw.this.TrafficConditionNearby)
)

data_with_demand = data_with_demand.with_columns(
    occ_term = multiply(data_with_demand.occ_ratio, alpha),
    queue_term = multiply(pw.this.QueueLength, beta),
    traffic_term = multiply(data_with_demand.traffic_score, gamma),
    special_term = multiply(pw.this.IsSpecialDay, delta),
    vehicle_term = multiply(data_with_demand.vehicle_score, epsilon)
)

data_with_demand = data_with_demand.with_columns(
    demand_raw = data_with_demand.occ_term +
                 data_with_demand.queue_term -
                 data_with_demand.traffic_term +
                 data_with_demand.special_term +
                 data_with_demand.vehicle_term
)

data_with_demand = data_with_demand.with_columns(
    demand_norm = normalize_demand(pw.this.demand_raw)
)

data_with_price = data_with_demand.with_columns(
    price = compute_price(pw.this.demand_norm)
)


In [None]:
@pw.udf
def haversine(lat1, lon1, lat2, lon2) -> float:
    R = 6371e3  # meters
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)

    a = math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    return R * c


In [None]:
left = data_with_price.select(
    SystemCodeNumber_left = pw.this.SystemCodeNumber,
    price_left = pw.this.price,
    Latitude_left = pw.this.Latitude,
    Longitude_left = pw.this.Longitude
)

right = data_with_price.select(
    SystemCodeNumber_right = pw.this.SystemCodeNumber,
    price_right = pw.this.price,
    Latitude_right = pw.this.Latitude,
    Longitude_right = pw.this.Longitude
)

pairs = left.join(right).select(
    **left, **right,
    distance = haversine(
        pw.this.Latitude_left,
        pw.this.Longitude_left,
        pw.this.Latitude_right,
        pw.this.Longitude_right
    ),
    price_diff = pw.this.price_left - pw.this.price_right
)

cheaper_competitors = pairs.filter(
    pw.this.price_right < pw.this.price_left
)

avg_diff_per_lot = cheaper_competitors.groupby(pw.this.SystemCodeNumber_left).reduce(
    SystemCodeNumber_left = pw.this.SystemCodeNumber_left,  # ✅ This ensures the key is preserved
    sum_diff = pw.reducers.sum(pw.this.price_diff),
    count_diff = pw.reducers.count()
)

avg_diff_per_lot = avg_diff_per_lot.select(
    **avg_diff_per_lot,
    avg_price_diff = pw.this.sum_diff / pw.cast(float, pw.this.count_diff)
)


In [None]:
phi = 0.5

@pw.udf
def adjust_price(base_price, avg_diff) -> float:
    if avg_diff is None:
        return base_price
    new_price = base_price - phi * avg_diff
    return max(min_price, min(max_price, new_price))

final_table = data_with_price.join_left(
    avg_diff_per_lot,
    pw.this.SystemCodeNumber == pw.right.SystemCodeNumber_left  # passed as positional argument
)



final_table = final_table.select(
    **final_table,
    final_price = adjust_price(pw.this.price, pw.this.avg_price_diff)
)


In [None]:
pn.extension()

def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Model 3: Dynamic Competitive Pricing",
        x_axis_type="datetime"
    )
    fig.line("t", "final_price", source=source, line_width=2, color="navy")
    fig.circle("t", "final_price", source=source, size=5, color="red")
    return fig

viz = final_table.plot(price_plotter, sorting_col="t")
pn.Column(viz).servable()




In [None]:
%%capture --no-display
pw.run()


Output()

