In [None]:
#!/usr/bin/env python3
"""
dynamic_pricing.py

Dynamic Pricing for Urban Parking Lots – Summer Analytics 2025
"""

import os
import datetime
import numpy as np
import pandas as pd
import pathway as pw
from pathway import JoinMode
import bokeh.plotting
import panel as pn
import gdown

# Panel extension
pn.extension()

# Dataset parameters
FILE_ID = "1GaO4Y1bbwZX1o1rZwd9hdNvNGqbpCKIt"
URL     = f"https://drive.google.com/uc?id={FILE_ID}"
CSV     = "parking_stream.csv"

# Download if missing
if not os.path.exists(CSV):
    gdown.download(URL, CSV, quiet=False)

# Read once (for schema validation, ordering)
_df = pd.read_csv(CSV)
_df["Timestamp"] = pd.to_datetime(_df["Timestamp"])
_df = _df.sort_values("Timestamp").reset_index(drop=True)

# Define streaming schema
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str
    Latitude: float
    Longitude: float
    LotID: str

def haversine(lat1, lon1, lat2, lon2):
    """Great‐circle distance in kilometers"""
    from math import radians, cos, sin, asin, sqrt
    lat1, lon1, lat2, lon2 = map(radians, (lat1, lon1, lat2, lon2))
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1)*cos(lat2)*sin(dlon/2)**2
    return 6371 * 2 * asin(sqrt(a))

def main():
    # Replay CSV as a stream
    src = pw.demo.replay_csv(CSV, schema=ParkingSchema, input_rate=1000)
    fmt = "%Y-%m-%d %H:%M:%S"
    data = src.with_columns(
        t   = src.Timestamp.dt.strptime(fmt),
        day = src.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),
    )

    # ──────────────────────────────
    # Model 1: Baseline Linear
    # ──────────────────────────────
    α = 2.0
    m1 = data.with_columns(
        price = 10 + α * (pw.this.Occupancy / pw.this.Capacity)
    ).with_columns(
        price = pw.apply(lambda x: round(min(max(x,5),20),2), pw.this.price)
    )

    w1 = m1.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    ).reduce(
        t           = pw.this._pw_window_end,
        total_price = pw.reducers.sum(pw.cast(float, pw.this.price)),
        count       = pw.reducers.count()
    ).with_columns(
        avg_price = pw.this.total_price / pw.cast(float, pw.this.count)
    )

    def plot1(src):
        fig = bokeh.plotting.figure(
            title="Model 1: Baseline Linear Pricing",
            x_axis_type="datetime", height=400, width=800
        )
        fig.line("t","avg_price",source=src, line_width=2, color="blue")
        fig.scatter("t","avg_price",source=src, size=6, color="black")
        return fig

    viz1 = w1.plot(plot1, sorting_col="t")

    # ──────────────────────────────
    # Model 2: Demand-Based
    # ──────────────────────────────
    data2 = data.with_columns(
        traffic_score = pw.cast(
            float,
            pw.apply(
                lambda c: 0.0 if c=="low" else 0.5 if c=="medium" else 1.0,
                pw.this.TrafficConditionNearby
            )
        ),
        vehicle_weight = pw.cast(
            float,
            pw.apply(
                lambda v: 0.5 if v=="bike" else 1.0 if v=="car" else 1.5,
                pw.this.VehicleType
            )
        )
    )

    α, β, γ, δ, ε = 1.0, 0.3, 0.5, 0.2, 0.2
    m2 = data2.with_columns(
        demand = (
            α * (pw.this.Occupancy / pw.this.Capacity)
            + β  * pw.this.QueueLength
            - γ  * pw.this.traffic_score
            + δ  * pw.this.IsSpecialDay
            + ε  * pw.this.vehicle_weight
        )
    ).with_columns(
        norm = pw.apply(lambda x: max(0, min(np.tanh(x/5),1.0)), pw.this.demand)
    ).with_columns(
        price = pw.apply(
            lambda x: round(min(max(x,5),20),2),
            pw.cast(float,pw.this.norm)*10 + 10
        )
    )

    w2 = m2.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    ).reduce(
        t           = pw.this._pw_window_end,
        total_price = pw.reducers.sum(pw.cast(float, pw.this.price)),
        count       = pw.reducers.count()
    ).with_columns(
        avg_price = pw.this.total_price / pw.cast(float, pw.this.count)
    )

    def plot2(src):
        fig = bokeh.plotting.figure(
            title="Model 2: Demand-Based Pricing",
            x_axis_type="datetime", height=400, width=800
        )
        fig.line("t","avg_price",source=src, line_width=2, color="green")
        fig.scatter("t","avg_price",source=src, size=6, color="orange")
        return fig

    viz2 = w2.plot(plot2, sorting_col="t")

    # ──────────────────────────────
    # Model 3: Competitive
    # ──────────────────────────────
    geo = m2.with_columns(base_price=pw.this.price)

    # join on same timestamp, exclude identical LotID
    pairs = geo.join(
        geo,
        on=(pw.this.t==pw.that.t),
        how=JoinMode.INNER
    ).filter(pw.this.LotID != pw.that.LotID)

    neigh = pairs.with_columns(
        dist = pw.apply(
            haversine,
            pw.this.Latitude, pw.this.Longitude,
            pw.that.Latitude, pw.that.Longitude
        ),
        neighbor_price = pw.that.base_price
    ).filter(pw.this.dist <= 1.0)\
     .groupby(pw.this.LotID, pw.this.t).reduce(
        total_price = pw.reducers.sum(pw.cast(float,pw.this.neighbor_price)),
        count       = pw.reducers.count()
    ).with_columns(
        avg_neighbor = pw.this.total_price / pw.cast(float,pw.this.count)
    )

    m3 = geo.join(
        neigh,
        on=(pw.this.LotID==pw.that.LotID, pw.this.t==pw.that.t),
        how=JoinMode.LEFT
    ).with_columns(
        final_price = pw.this.base_price
                      + pw.this.avg_neighbor.fillna(10)
                        .apply(lambda x:(x-10)*0.3)
    ).with_columns(
        final_price = pw.apply(lambda x:round(min(max(x,5),25),2),
                                pw.this.final_price)
    )

    w3 = m3.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    ).reduce(
        t           = pw.this._pw_window_end,
        total_price = pw.reducers.sum(pw.cast(float,pw.this.final_price)),
        count       = pw.reducers.count()
    ).with_columns(
        avg_price = pw.this.total_price / pw.cast(float,pw.this.count)
    )

    def plot3(src):
        fig = bokeh.plotting.figure(
            title="Model 3: Competitive Pricing",
            x_axis_type="datetime", height=400, width=800
        )
        fig.line("t","avg_price",source=src, line_width=2, color="red")
        fig.scatter("t","avg_price",source=src, size=6, color="black")
        return fig

    viz3 = w3.plot(plot3, sorting_col="t")

    # Serve all three plots in one dashboard
    pn.Column(viz1, viz2, viz3).servable()

    # start streaming
    pw.run()

if __name__ == "__main__":
    main()
