In [1]:
from google.colab import drive
drive.mount('/content/drive')
!pip install pathway

import pandas as pd #imports
import pathway as pw
import os

os.makedirs("/content/drive/MyDrive/pricing_outputs", exist_ok=True) #output folder


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
df = pd.read_csv("/content/dataset.csv") #loadind the data
df["timestamp"] = pd.to_datetime(df["LastUpdatedDate"] + " " + df["LastUpdatedTime"], dayfirst=True)
df["TrafficConditionNearby"] = pd.to_numeric(df["TrafficConditionNearby"], errors="coerce").fillna(0).astype(int)
df["QueueLength"] = pd.to_numeric(df["QueueLength"], errors="coerce").fillna(0).astype(int)
df["IsSpecialDay"] = df["IsSpecialDay"].astype(bool)
df["VehicleType"] = df["VehicleType"].fillna("car")

In [3]:
cleaned_df = df[["timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]]
cleaned_df.columns = ["timestamp", "occupancy", "capacity", "queue", "traffic", "special_day", "vehicle_type"]

cleaned_df.to_csv("parking_stream.csv", index=False) #saved here

In [4]:
class ParkingSchema(pw.Schema): #common schema
    timestamp: str
    occupancy: int
    capacity: int
    queue: int
    traffic: int
    special_day: bool
    vehicle_type: str

In [5]:
# model 1 linear
@pw.udf
def linear_price(occupancy, capacity):
    base = 10
    if capacity == 0: return base
    return round(min(20, base + 10 * (occupancy / capacity)), 2)


In [6]:
# model 2 demand based
@pw.udf
def demand_score(occupancy, capacity, queue, traffic, special_day, vehicle_type):
    occ_ratio = occupancy / capacity if capacity else 0
    queue_score = queue / 10.0
    traffic_score = traffic / 10.0
    special_score = 1.0 if special_day else 0.0
    weight = {"car": 1.0, "bike": 0.5, "truck": 1.5}.get(vehicle_type.lower(), 1.0)
    score = 0.4 * occ_ratio + 0.2 * queue_score + 0.2 * traffic_score + 0.1 * special_score + 0.1 * weight
    return round(score, 3)

@pw.udf
def demand_price(score):
    return round(min(max(10 * (1 + 0.75 * score), 5), 20), 2)

In [7]:
# model 3: Competitive Pricing
@pw.udf
def competitor_price_adjusted(own_price):
    competitor_price = 12.0  # placeholder competitor price
    if own_price > competitor_price:
        return round(own_price - 2, 2)
    elif own_price < competitor_price:
        return round(own_price + 1, 2)
    return own_price

In [9]:
# streaming the input data
data = pw.io.csv.read("parking_stream.csv", schema=ParkingSchema, mode="static")

model1 = data.select(
    timestamp=data.timestamp,
    price=linear_price(data.occupancy, data.capacity)
)
model2 = data.select(
    timestamp=data.timestamp,
    demand=demand_score(data.occupancy, data.capacity, data.queue, data.traffic, data.special_day, data.vehicle_type),
).with_columns(
    price=demand_price(pw.this.demand)
)
model3 = model2.with_columns(
    competitive_price=competitor_price_adjusted(pw.this.price)
)

In [10]:
pw.io.jsonlines.write(model1, filename="/content/drive/MyDrive/pricing_outputs/model1_prices.jsonl")
pw.io.jsonlines.write(model2, filename="/content/drive/MyDrive/pricing_outputs/model2_prices.jsonl")
pw.io.jsonlines.write(model3, filename="/content/drive/MyDrive/pricing_outputs/model3_prices.jsonl")

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


In [11]:
pw.run()

Output()

