# **Dynamic Pricing for Urban Parking Lots**

Installing the Libraries

---



In [1]:
# Installing the pathway and bokeh libraries that I need for this notebook
!pip install pathway bokeh --quiet

Importing All the Required Libraries

---



In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import pathway as pw  # for stream processing
import bokeh.plotting  # for interactive plotting
import panel as pn  # for creating dashboards

Loading the Dataset

---



In [3]:
df = pd.read_csv('/content/dataset.csv')  # reading the dataset from a CSV file

In [None]:
df.head()

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00


Creating and Sorting the Timestamp Column

---



In [4]:
# combining date and time columns and converting them into a proper datetime format
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# sorting the data by timestamp and resetting the index
df = df.sort_values('Timestamp').reset_index(drop=True)

# **Model 1: Baseline Linear Model**

Saving Selected Columns to a New CSV File

---



In [None]:
# saving only the required columns to a new CSV file for streaming
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

Defining the Data Schema for Pathway

---



In [None]:
# Creating a schema to define the structure of the streaming data
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int

Loading the Data into Pathway as a Stream

---



In [None]:
# Loading the CSV as a streaming dataset with the defined schema and setting input rate
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

Converting Timestamp to Datetime and Creating a Day Column

---



In [None]:
fmt = "%Y-%m-%d %H:%M:%S"  # setting the datetime format

# I'm converting the timestamp string to datetime format and also creating a day column
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

Creating a Windowed Table with Daily Max/Min Occupancy

---



**Formula used for this model**                                 
Price(t+1) = Price(t) + 0.1 * (Occupancy(t+1) - Occupancy(t)) / (Occupancy_max - Occupancy_min)

In [None]:
# Grouping the data into 1-day windows and calculating the max and min occupancy for each day
windowed = (
    data_with_time.windowby(
        pw.this.t,  # using timestamp to create time-based windows
        instance=pw.this.day,  # each window is grouped by 'day'
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # tumbling window of 1 day
        behavior=pw.temporal.exactly_once_behavior()  # ensures events are processed exactly once
    )
    .reduce(
        day=pw.reducers.min(pw.this.day),  # keeping one value per day
        occ_max=pw.reducers.max(pw.this.Occupancy),  # maximum occupancy in the day
        occ_min=pw.reducers.min(pw.this.Occupancy),  # minimum occupancy in the day
    )
)

# Joining the daily max/min occupancy values back to each row of original data
joined = data_with_time.join(
    windowed,
    data_with_time.day == windowed.day  # joining on 'day' column
).select(
    t = pw.left.t,
    day = pw.left.day,
    Occupancy = pw.left.Occupancy,
    occ_max = pw.right.occ_max,
    occ_min = pw.right.occ_min,
)

# Creating keys to look back for each timestamp
joined = joined.with_columns(
    lag_key = pw.make_tuple(pw.this.day, pw.this.t),  # current key
    lag_key_prev = pw.make_tuple(pw.this.day, pw.this.t - datetime.timedelta(hours=1))  # earlier key
)

left = joined
right = joined.copy()

# Joining the table with itself to get the occupancy from previous time
lagged = left.join(
    right,
    left.lag_key_prev == right.lag_key,  # joining current with previous time
    how=pw.JoinMode.LEFT  # using left join to keep all current records
).select(
    t = left.t,
    day = left.day,
    Occupancy = left.Occupancy,
    occ_max = left.occ_max,
    occ_min = left.occ_min,
    Occupancy_prev = pw.if_else(
        right.Occupancy.is_not_none(),  # if previous value exists
        right.Occupancy,
        left.Occupancy  # else use current as fallback
    )
)

# I'm calculating the price based on change in occupancy
result = lagged.with_columns(
    Price = pw.if_else(
        (lagged.occ_max - lagged.occ_min) != 0,
        lagged.Occupancy_prev + 0.1 * (lagged.Occupancy - lagged.Occupancy_prev) / (lagged.occ_max - lagged.occ_min),
        lagged.Occupancy  # fallback price if no change in occupancy
    )
)


Visualizing the Parking Price Stream

---



In [None]:
pn.extension()  # Enabling Panel's extension so that I can use its plotting and layout features

# Defining a custom plotting function for price visualization
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",
    )
    fig.line("t", "Price", source=source, line_width=2, color="navy")
    fig.circle("t", "Price", source=source, size=6, color="red")
    return fig

# Linking the plot to the streaming result from Pathway
viz = result.plot(price_plotter, sorting_col="t")

# I'm displaying the plot using Panel
pn.Column(viz).servable()



Running the Pathway Pipeline

---



In [None]:
%%capture --no-display
pw.run()  # this starts the execution of the entire Pathway pipeline

Output()

# **Model 2: Demand-Based Price Function**

Encoding Vehicle Types Using Ordinal Encoding

---



In [5]:
from sklearn.preprocessing import OrdinalEncoder

vehicle_order = [['cycle','bike', 'car', 'truck']]

encoder = OrdinalEncoder(categories=vehicle_order)

# Applying the encoder to the 'VehicleType' column
df['VehicleType_ordinal'] = encoder.fit_transform(df[['VehicleType']])

# I'm converting the encoded values to integers
df['VehicleType_ordinal'] = df['VehicleType_ordinal'].astype(int)

Encoding Traffic Conditions Using Ordinal Encoding

---



In [6]:
traffic_order = [['low','average', 'high']]

encoder = OrdinalEncoder(categories=traffic_order)

# Applying the encoder to the 'TrafficConditionNearby' column
df['TrafficConditionNearby_ordinal'] = encoder.fit_transform(df[['TrafficConditionNearby']])

# I'm converting the result to integers
df['TrafficConditionNearby_ordinal'] = df['TrafficConditionNearby_ordinal'].astype(int)

Saving the Final Set of Columns to a New CSV File

---



In [7]:
# Saving the selected features (including encoded ones) to a new CSV file
df[["Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby_ordinal", "IsSpecialDay", "VehicleType_ordinal"]].to_csv("parking_stream2.csv", index=False)

Defining the Extended Schema for the New CSV File

---



In [8]:
# Creating a new schema that includes all the additional features from the updated CSV
class ParkingSchema2(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby_ordinal: int
    IsSpecialDay: int
    VehicleType_ordinal: int

Loading the New CSV as a Streaming Dataset

---



In [9]:
# Loading the updated CSV file into Pathway using the extended schema
data = pw.demo.replay_csv("parking_stream2.csv", schema=ParkingSchema2, input_rate=1000)

Converting Timestamp and Creating a Day Column

---



In [10]:
fmt = "%Y-%m-%d %H:%M:%S"

# Converting the 'Timestamp' to datetime and extracting just the day part
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

Calculating Dynamic Price Based on Demand Factors

---



**Formula for Demand:**                                   
Demand = 1.5 * ((Occupancy_max - Occupancy_min) / Capacity)^2
         + 1 * QueueLength
         - 0.5 * sqrt(TrafficLevel)
         + 0.4 * IsSpecialDay
         + 0.6 * VehicleType

**Normalize the demand:**        
Normalized Demand = (Demand - 0) / (10 - 0)

**Calculate raw price:**      
RawPrice = 10 * (1 + 0.5 * NormalizedDemand)

**Apply price bounds:**   
Final Price = min(20, max(5, RawPrice))

In [11]:
# I'm defining a function to keep the price within the range of 5 to 20
def bound_price(price):
    if price < 5:
        return 5
    elif price > 20:
        return 20
    else:
        return price

# Creating a daily window to calculate features and estimate dynamic demand
delta_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # 1-day tumbling window
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,  # end time of window
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
        queue_length=pw.reducers.max(pw.this.QueueLength),
        traffic_level=pw.reducers.max(pw.this.TrafficConditionNearby_ordinal),
        special_day=pw.reducers.max(pw.this.IsSpecialDay),
        vehicle_type_weight=pw.reducers.max(pw.this.VehicleType_ordinal),
    )
    .with_columns(
        # Calculating a custom demand score using weighted factors
        demand = 1.5 * ((pw.this.occ_max - pw.this.occ_min) / pw.this.cap) ** 2 +
                 1 * pw.this.queue_length -
                 0.5 * (pw.this.traffic_level) ** 0.5 +
                 0.4 * pw.this.special_day +
                 0.6 * pw.this.vehicle_type_weight,
    )
    .with_columns(
        normalized_demand = (pw.this.demand - 0) / (10 - 0),
    )
    .with_columns(
        price_raw = 10 * (1 + 0.5 * pw.this.normalized_demand),  # raw price formula
    )
    .with_columns(
        price = pw.apply(bound_price, pw.this.price_raw)  # applying the bound to keep price between 0.5x and 2x
    )
)

Visualizing the Final Dynamic Price Output

---



In [12]:
pn.extension()  # I'm enabling Panel's features for interactive visualization

# Defining the function that will create the price chart using Bokeh
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Parking Price",
        x_axis_type="datetime",
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")
    fig.circle("t", "price", source=source, size=6, color="red")
    return fig

# Linking the plot to the streaming results and sorting by time
viz = delta_window.plot(price_plotter, sorting_col="t")

# Displaying the plot in a vertical layout using Panel
pn.Column(viz).servable()



Running the Pathway Pipeline

---



In [13]:
%%capture --no-display
pw.run()  # this starts the execution of the entire Pathway pipeline

Output()

