# Real-Time Dynamic Pricing System for Urban Parking Spaces

This notebook implements a real-time dynamic pricing system for urban parking spaces using Pathway for streaming data processing and Bokeh for interactive visualizations. The system adjusts parking rates based on a simple demand-based logic, aiming to optimize utilization and revenue.

## Getting Started

### Prerequisites
- Google Colab environment.
- Ensure you have the `dataset.csv` file uploaded to your Colab environment or accessible in the same directory as this notebook.

### Installation
The following libraries are required. Run the cell below to install them if you haven't already.


In [6]:
!pip install numpy pandas pathway bokeh panel



## Data Loading and Preprocessing

First, we load the `dataset.csv` file and preprocess the timestamp information. We combine the `LastUpdatedDate` and `LastUpdatedTime` columns into a single datetime column and sort the data chronologically. Finally, we save the relevant columns to `parking_stream.csv` for Pathway to simulate a data stream.


In [7]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import pathway as pw
import bokeh.plotting
import panel as pn

# Load the dataset
df = pd.read_csv("dataset.csv")

# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

# Save the selected columns to a CSV file for streaming or downstream processing
df[['Timestamp', 'Occupancy', 'Capacity']].to_csv('parking_stream.csv', index=False)

## Pathway Data Stream Setup

We define a Pathway schema for our streaming data and then use `pw.demo.replay_csv` to simulate a real-time data stream from the `parking_stream.csv` file. This allows us to process data as if it were arriving in real-time.


In [8]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream
class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location

# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

## Feature Engineering and Dynamic Pricing Logic

We add new columns to the data stream for parsed datetime (`t`) and the day (`day`). Then, we define a daily tumbling window to aggregate data and compute a dynamic price. The pricing formula is based on the difference between maximum and minimum occupancy within the window, normalized by capacity.

### Demand Function and Assumptions

In this simplified model, our 'demand' is implicitly represented by the fluctuation in occupancy within a day. The greater the difference between the peak and low occupancy, the higher the implied demand volatility, suggesting potential scarcity. This fluctuation is normalized by the parking lot's capacity.

**Pricing Formula:**
`price = base_price + demand_fluctuation`
where:
`base_price = 10` (fixed minimum price)
`demand_fluctuation = (occ_max - occ_min) / cap`

**Assumptions:**
*   **Occupancy Fluctuation as Demand Proxy:** We assume that a larger swing between daily maximum and minimum occupancy indicates higher demand or more dynamic usage patterns.
*   **Fixed Base Price:** A base price of $10 is set as a minimum. This could be adjusted based on operational costs or desired profit margins.
*   **Linear Relationship:** The demand fluctuation is added linearly to the base price. More complex, non-linear relationships could be explored for refined pricing.
*   **No External Factors (in this model):** This model does not explicitly account for external factors like special events, traffic, or competitor pricing. These would be incorporated in more advanced models (like Model 2 and Model 3 discussed in the report).

### How Price Changes with Demand and Competition

In this specific notebook's implementation (Model 1), the price primarily changes with the *fluctuation in demand* as measured by the daily occupancy range. Higher fluctuations lead to higher prices. Competition is not directly modeled here, but in a full system (as described in the accompanying report), competitive pricing would involve:

*   **Competitive Pricing Model (Model 3):** This model would analyze prices and occupancy of nearby parking lots. If a competitor is significantly cheaper and has availability, the system might suggest lowering the price or rerouting vehicles. Conversely, if competitors are more expensive, the system could increase prices to maximize revenue while remaining competitive.
*   **Dynamic Adjustment:** The system continuously adjusts prices based on real-time data, allowing it to respond quickly to changes in demand and competitive landscape.


In [9]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        # - the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)

## Bokeh Visualization

We use Bokeh to create an interactive, real-time visualization of the daily parking price. The plot will update as new data streams in.

**Note:** To view the interactive plot, you will need to run this notebook in a Google Colab environment and ensure that Panel is correctly configured to serve the Bokeh application. After running the cell below, a link will typically appear that you can click to open the visualization in a new tab.


In [12]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.scatter(marker="circle", "t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface
# The %%capture magic command needs to be in its own cell.
pw.run()

Output()



In [None]:

# Automatically close any open PythonReader data sources to avoid warnings
import warnings
from pathway_engine.connectors.monitoring import PythonReader

for obj in globals().values():
    if isinstance(obj, PythonReader):
        try:
            obj.close()
        except Exception:
            warnings.warn(f"Failed to close reader: {obj}")
