# Inroduction


[![Run in Colab](https://pathway.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1psChcNIg-CP9GkrCclcUksR6MpABPw0E?usp=sharing)

In [None]:
!pip install pathway bokeh --quiet

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('/content/dataset.csv', index_col= 0)
df.head()

# You can find the dataset here which was given with problem statement: https://drive.google.com/file/d/1RqHF3zphAFOtYZgReDJUxEFweOiVAxqP/view?usp=drive_link or in the github repo

In [None]:
df.shape

## Eventhough we have 11 columns we will only be using Timestamp(Data & Time combined together), LocationID (which is combined with Latitude and Longitude), Occupancy and Capacity for the base model

In [None]:
mapping = {'low': 0, 'average': 0.5, 'high': 1}

# Apply the mapping
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].map(mapping)

In [None]:
maps = {'car': 0.75 , 'bike': 0.5 , 'truck': 1, 'cycle': 0.25}
df['VehicleType'] = df['VehicleType'].map(maps)

## we are mapping weights based on area they occupy (automobiles) and how much the traffic is and they are normalised before hand to narrow down normalisation in later streaming phase

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')
# Combine the 'Latitude' and 'Longitude' columns into a single datetime column
df['LocationID'] = df['Latitude'].astype(str) + ',' + df['Longitude'].astype(str)
# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:

LOCATIONS = df.LocationID.unique() #will be useful when we will have to view price fluctuation in different locations in bokeh

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity", "LocationID", "TrafficConditionNearby" ,"QueueLength" , "IsSpecialDay" , "VehicleType"	]].to_csv("parking_stream.csv", index=False)


In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

## STEP 1 : DEFINING THE SCHEMA

In [None]:
import pathway as pw
import datetime

# defining the schema of our upcoming data stream
# Note : we have taken all values in float instead of int to not get into confusion of arthematic operations with decimals
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: float
    Capacity: float
    LocationID: str
    TrafficConditionNearby: float
    QueueLength: float
    IsSpecialDay: float
    VehicleType: float

data = pw.demo.replay_csv(
    "parking_stream.csv",
    schema=ParkingSchema,
    input_rate=1000
)

# we are taking data from the csv with the input rate of 1000 hence we would be taking into account around 1000 rows in the single tubling window
fmt = "%Y-%m-%d %H:%M:%S"


data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    occ_ratio = pw.this.Occupancy / pw.this.Capacity,
    traffic = pw.this.TrafficConditionNearby,
    vehicle_weight = pw.this.VehicleType
)

# we have tranformed our incoming data with new time that would act as timestamp and other three parameters would be useful in building pricing function for model 2

Note: sometimes code snippet was throwing random error if multiple pathway imports were not present in every cell

## STEP 2 : PRICING FUNCTION

In [None]:
import pathway as pw
# This is a user-defined function (UDF) in Pathway.
# It computes a dynamic parking price based on multiple real-time factors:
# - avg_occ_ratio: average occupancy ratio of parking slots
# - avg_queue_length: average queue length at entry points
# - avg_traffic: average nearby traffic congestion (reduces price)
# - is_special_day: binary flag for holidays or special events
# - avg_vehicle_weight: average vehicle weight (proxy for vehicle type)
#
# The function applies a weighted linear combination of these factors,
# scaled by constants (ALPHA, BETA, GAMMA, DELTA, EPSILON, LAMBDA).
# The computed raw price is then clamped between 5 and 16 to ensure
# it stays within reasonable operational limits.
#
# Using @pw.udf allows this function to run on each row of the Pathway table.
# It computes a dynamic parking price based on a demand model that adjusts the base price.
#
# Demand is calculated as:
#   Demand = α · (Occupancy / Capacity)
#            + β · QueueLength
#            − γ · Traffic
#            + δ · IsSpecialDay
#            + ε · VehicleTypeWeight
#
# The price at time t is then computed as:
#   Pricet = BasePrice · (1 + λ · NormalizedDemand)
#
# The demand value is inherently normalized by design factors (ratios and scaling)
# to ensure price variations remain smooth and within operational bounds.
# Finally, the price is clamped to stay within a minimum (5) and maximum (16).
#


@pw.udf
def compute_clamped_price(
    avg_occ_ratio: float,
    avg_queue_length: float,
    avg_traffic: float,
    is_special_day: float,
    avg_vehicle_weight: float
) -> float:
    # Same constants
    BASE_PRICE = 10
    ALPHA = 0.2
    BETA = 0.3
    GAMMA = 2
    DELTA = 0.8
    EPSILON = 1
    LAMBDA = 0.9

    raw = BASE_PRICE * (
        1 + LAMBDA * (
            ALPHA * avg_occ_ratio
            + BETA * avg_queue_length
            - GAMMA * avg_traffic
            + DELTA * is_special_day
            + EPSILON * avg_vehicle_weight
        )
    )
    return min(max(raw, 5), 16)


## STEP 3 : WINDOWING AND REDUCING OPERATIONS

In [None]:
# This block creates daily windows for each LocationID,
# computes aggregated features like average occupancy ratio,
# average queue length, maximum traffic, special day flag, and vehicle weight.
# These daily aggregates are then passed to the pricing UDF,
# which calculates a clamped daily price based on the demand model.
# This ensures that pricing adjusts daily for each location
# based on actual usage, queueing, traffic, and special conditions.

daily_price = (
    data_with_time.windowby(
        pw.this.t,
        instance = pw.this.LocationID,
        window = pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior = pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t = pw.this._pw_window_end,
        LocationID = pw.this._pw_instance,
        window_start = pw.reducers.min(pw.this.t),
        avg_occ_ratio = pw.reducers.avg(pw.this.occ_ratio),
        avg_queue_length = pw.reducers.avg(pw.this.QueueLength),
        avg_traffic = pw.reducers.max(pw.this.traffic),
        avg_vehicle_weight = pw.reducers.avg(pw.this.vehicle_weight),
        is_special_day = pw.reducers.max(pw.this.IsSpecialDay)
    )
    .with_columns(
        price = compute_clamped_price(
            pw.this.avg_occ_ratio,
            pw.this.avg_queue_length,
            pw.this.avg_traffic,
            pw.this.is_special_day,
            pw.this.avg_vehicle_weight
        )
    )
)

## STEP 4 : LIVE DATA VISUALIZATION IN BOKEH

In [None]:
import panel as pn
import bokeh.plotting
from bokeh.palettes import Category20
from bokeh.transform import factor_cmap

pn.extension()


# Initialize Panel for building interactive visualizations in notebooks or apps.
# This block defines a custom Bokeh plot function `price_plotter`:
# - It creates a scatter plot of daily parking prices over time.
# - Each point represents the price for a specific LocationID on a specific day.
# - Different LocationIDs are color-coded using a factor color map for easy comparison.
# - The plot uses Bokeh tools like pan, zoom, box select, reset, and hover for interactivity.
# - A legend shows which color corresponds to which LocationID.
# - The visualization helps track how prices vary across different locations and days,
#   making it easy to see trends, outliers, or effects of special days.
#
# Finally, `pn.Column(viz).servable()` makes the plot interactive and ready to be served
# inside a notebook, Panel app, or deployed as a web dashboard.


def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=500,
        width=800,
        title="Pathway: Daily Parking Price by Location",
        x_axis_type="datetime",
        tools="pan,wheel_zoom,box_zoom,reset,hover"
    )
#add a filter for diff locations
    color_map = factor_cmap(
        "LocationID",
        palette=Category20[20] ,
        factors=LOCATIONS #array
    )
#scatter t vs price plot
    fig.scatter(
        "t", "price",
        source=source,
        size=6,
        color=color_map,
        legend_field="LocationID"
    )

    fig.xaxis.axis_label = "Timestamp"
    fig.yaxis.axis_label = "Price"
    fig.legend.title = "LocationID"
    fig.legend.location = "top_left"

    return fig

#viz = daily_price.plot(price_plotter, sorting_col="t")  #UNCOMMENT THIS THING BEFORE YOU RUN THIS CELL

#pn.Column(viz).servable()  #UNCOMMENT THIS THING BEFORE YOU RUN THIS CEL


In [None]:
pw.run()