In [None]:
from google.colab import files
upload = files.upload()

Saving dataset.csv to dataset (2).csv


In [None]:
!pip install pathway bokeh pandas numpy --quiet


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m37.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m71.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import pathway as pw
import pandas as pd
import numpy as np
from bokeh.plotting import figure, show, output_notebook
from bokeh.layouts import gridplot
output_notebook()

In [None]:
class ParkingEvent(pw.Schema):
    SystemCodeNumber: str
    Capacity: int
    Occupancy: int
    LastUpdatedDate: str
    LastUpdatedTime: str
    IsSpecialDay: int
    VehicleType: str
    Latitude: float
    Longitude: float
    TrafficConditionNearby: str
    QueueLength: int

table = pw.io.csv.read(
    "dataset.csv",
    schema=ParkingEvent,
    mode="streaming"
)


In [None]:
@pw.udf
def make_timestamp(date: str, time: str):
    return pd.to_datetime(date + " " + time, dayfirst=True)

@pw.udf
def traffic_to_num(tc: str) -> int:
    mapping = {'low': 1, 'average': 2, 'high': 3}
    return mapping.get(tc, 1)

@pw.udf
def vehicle_weight(vtype: str) -> float:
    weights = {'car': 1.0, 'bike': 0.7, 'truck': 1.5}
    return weights.get(vtype, 1.0)

table = table.select(
    SystemCodeNumber=table.SystemCodeNumber,
    Capacity=table.Capacity,
    Occupancy=table.Occupancy,
    LastUpdatedDate=table.LastUpdatedDate,
    LastUpdatedTime=table.LastUpdatedTime,
    IsSpecialDay=table.IsSpecialDay,
    VehicleType=table.VehicleType,
    Latitude=table.Latitude,
    Longitude=table.Longitude,
    TrafficConditionNearby=table.TrafficConditionNearby,
    QueueLength=table.QueueLength,
    timestamp=make_timestamp(table.LastUpdatedDate, table.LastUpdatedTime),
    traffic_num=traffic_to_num(table.TrafficConditionNearby),
    vtype_weight=vehicle_weight(table.VehicleType)
)


In [None]:
BASE_PRICE = 10.0

@pw.udf
def baseline_linear(occupancy: int, capacity: int, alpha: float = 2.0) -> float:
    occ_rate = occupancy / capacity
    price = BASE_PRICE + alpha * occ_rate
    return float(np.clip(price, BASE_PRICE * 0.5, BASE_PRICE * 2.0))

@pw.udf
def demand_based(occupancy: int, capacity: int, queue: int, traffic: int, special: int, vweight: float, lambda_: float = 0.5) -> float:
    occ_rate = occupancy / capacity
    queue_norm = queue / (capacity * 0.5 + 1e-6)
    traffic_norm = traffic / 3.0
    demand = (1.0 * occ_rate +
              0.5 * queue_norm +
              0.3 * traffic_norm +
              0.4 * special +
              0.2 * vweight)
    demand = np.clip((demand - 0) / 3.0, 0, 1)
    price = BASE_PRICE * (1 + lambda_ * demand)
    return float(np.clip(price, BASE_PRICE * 0.5, BASE_PRICE * 2.0))

table = table.select(
    SystemCodeNumber=table.SystemCodeNumber,
    Capacity=table.Capacity,
    Occupancy=table.Occupancy,
    LastUpdatedDate=table.LastUpdatedDate,
    LastUpdatedTime=table.LastUpdatedTime,
    IsSpecialDay=table.IsSpecialDay,
    VehicleType=table.VehicleType,
    Latitude=table.Latitude,
    Longitude=table.Longitude,
    TrafficConditionNearby=table.TrafficConditionNearby,
    QueueLength=table.QueueLength,
    timestamp=table.timestamp,
    traffic_num=table.traffic_num,
    vtype_weight=table.vtype_weight,
    price_linear=baseline_linear(table.Occupancy, table.Capacity),
    price_demand=demand_based(
        table.Occupancy, table.Capacity, table.QueueLength,
        table.traffic_num, table.IsSpecialDay, table.vtype_weight
    )
)


In [None]:
grouped = table.groupby(table.timestamp).reduce(
    timestamp=pw.this.timestamp,
    latitudes=pw.reducers.ndarray(table.Latitude),
    longitudes=pw.reducers.ndarray(table.Longitude),
    price_demands=pw.reducers.ndarray(table.price_demand),
    occupancies=pw.reducers.ndarray(table.Occupancy),
    capacities=pw.reducers.ndarray(table.Capacity)
)


In [None]:
joined = table.join(grouped, table.timestamp == grouped.timestamp)

@pw.udf
def competitive_pricing(
    my_lat: float, my_lon: float, my_price_demand: float, my_occupancy: int, my_capacity: int,
    latitudes: list, longitudes: list, price_demands: list, occupancies: list, capacities: list,
    radius: float = 0.005
) -> tuple[float, bool]:
    latitudes = list(latitudes)
    longitudes = list(longitudes)
    price_demands = list(price_demands)
    occupancies = list(occupancies)
    capacities = list(capacities)
    competitors = []
    for lat, lon, price in zip(latitudes, longitudes, price_demands):
        if abs(lat - my_lat) < radius and abs(lon - my_lon) < radius and (lat != my_lat or lon != my_lon):
            competitors.append(price)
    if competitors:
        min_competitor_price = min(competitors)
        avg_competitor_price = np.mean(competitors)
    else:
        min_competitor_price = BASE_PRICE
        avg_competitor_price = BASE_PRICE
    if my_occupancy >= my_capacity:
        reroute = True
        price = min_competitor_price - 0.5
    else:
        reroute = False
        if avg_competitor_price > BASE_PRICE * 1.2:
            price = avg_competitor_price - 0.2
        else:
            price = my_price_demand
    price = float(np.clip(price, BASE_PRICE * 0.5, BASE_PRICE * 2.0))
    return price, reroute

joined = joined.select(
    SystemCodeNumber=joined.SystemCodeNumber,
    Capacity=joined.Capacity,
    Occupancy=joined.Occupancy,
    LastUpdatedDate=joined.LastUpdatedDate,
    LastUpdatedTime=joined.LastUpdatedTime,
    IsSpecialDay=joined.IsSpecialDay,
    VehicleType=joined.VehicleType,
    Latitude=joined.Latitude,
    Longitude=joined.Longitude,
    TrafficConditionNearby=joined.TrafficConditionNearby,
    QueueLength=joined.QueueLength,
    timestamp=joined.timestamp,
    traffic_num=joined.traffic_num,
    vtype_weight=joined.vtype_weight,
    price_linear=joined.price_linear,
    price_demand=joined.price_demand,
    price_competitive_and_reroute=competitive_pricing(
        joined.Latitude, joined.Longitude, joined.price_demand,
        joined.Occupancy, joined.Capacity,
        joined.latitudes, joined.longitudes, joined.price_demands,
        joined.occupancies, joined.capacities
    )
)
joined = joined.select(
    SystemCodeNumber=joined.SystemCodeNumber,
    Capacity=joined.Capacity,
    Occupancy=joined.Occupancy,
    LastUpdatedDate=joined.LastUpdatedDate,
    LastUpdatedTime=joined.LastUpdatedTime,
    IsSpecialDay=joined.IsSpecialDay,
    VehicleType=joined.VehicleType,
    Latitude=joined.Latitude,
    Longitude=joined.Longitude,
    TrafficConditionNearby=joined.TrafficConditionNearby,
    QueueLength=joined.QueueLength,
    timestamp=joined.timestamp,
    traffic_num=joined.traffic_num,
    vtype_weight=joined.vtype_weight,
    price_linear=joined.price_linear,
    price_demand=joined.price_demand,
    price_competitive=joined.price_competitive_and_reroute[0],
    reroute=joined.price_competitive_and_reroute[1]
)


In [None]:
print(joined.schema)


id          | SystemCodeNumber | Capacity | Occupancy | LastUpdatedDate | LastUpdatedTime | IsSpecialDay | VehicleType | Latitude | Longitude | TrafficConditionNearby | QueueLength | timestamp | traffic_num | vtype_weight | price_linear | price_demand | price_competitive | reroute
ANY_POINTER | STR              | INT      | INT       | STR             | STR             | INT          | STR         | FLOAT    | FLOAT     | STR                    | INT         | ANY       | INT         | FLOAT        | FLOAT        | FLOAT        | FLOAT             | BOOL   


In [None]:
filtered = joined.select(
    timestamp=joined.timestamp,
    lot_id=joined.SystemCodeNumber,
    price_linear=joined.price_linear,
    price_demand=joined.price_demand,
    price_competitive=joined.price_competitive,
    reroute=joined.reroute
)


In [None]:
pw.debug.compute_and_print_update_stream(filtered,include_id=False)