# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m35.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m48.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
from bokeh.models import ColumnDataSource, Select
from bokeh.plotting import figure, curdoc
from bokeh.layouts import column
from bokeh.io import output_notebook, show
from bokeh.models.callbacks import CustomJS
import panel as pn

pn.extension()

# Step 1: Importing and Preprocessing the Data

In [3]:
df = pd.read_csv('dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [4]:

df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'], format='%d-%m-%Y %H:%M:%S', errors='coerce')
df = df.dropna(subset=['Timestamp'])
df = df.sort_values('Timestamp').reset_index(drop=True)

df.to_csv("parking_stream_full.csv", index=False)

print("Preprocessing complete. `parking_stream_full.csv` is ready.")

Preprocessing complete. `parking_stream_full.csv` is ready.


In [15]:
class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Capacity: int
    Occupancy: int
    Latitude: float
    Longitude: float
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str

def haversine(lon1, lat1, lon2, lat2):
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a))
    r = 6371
    return c * r

@pw.udf(max_batch_size=20)
def compute_avg_competitor_prices_batch(
    ids: list[str],
    lats: list[float],
    longs: list[float],
    occupancies: list[int],
    capacities: list[int],
    queues: list[int]
) -> list[float]:

    batch_size = len(ids)
    results = [-1.0] * batch_size

    if batch_size <= 1:
        return results

    base_prices = [
        10.0 * (1 + (occ / cap if cap > 0 else 0))
        for occ, cap in zip(occupancies, capacities)
    ]

    for i in range(batch_size):
        competitor_prices = []
        for j in range(batch_size):
            if i == j: continue

            dist = haversine(longs[i], lats[i], longs[j], lats[j])
            if dist <= 2.0:
                competitor_prices.append(base_prices[j])

        if competitor_prices:
            results[i] = np.mean(competitor_prices)

    return results


@pw.udf
def calculate_final_price(
    occupancy: int,
    capacity: int,
    queue: int,
    avg_competitor_price: float,
    traffic_condition: str,
    is_special_day: int,
    vehicle_type: str,
) -> float:
    base_price = 10.0
    if capacity == 0:
        return base_price

    # Layer 1: Internal Demand Score
    occupancy_rate = occupancy / capacity
    queue_pressure = min(queue / 10, 1.0)
    internal_demand_score = 0.7 * occupancy_rate + 0.3 * queue_pressure
    price = base_price * (1 + internal_demand_score)

    if traffic_condition == 'high':
        traffic_multiplier = 1.15
    elif traffic_condition == 'average':
        traffic_multiplier = 1.05
    else:
        traffic_multiplier = 1.0

    if vehicle_type == 'truck':
        vehicle_multiplier = 1.25
    elif vehicle_type in ['bike', 'cycle']:
        vehicle_multiplier = 0.9
    else: # 'car'
        vehicle_multiplier = 1.0

    special_day_multiplier = 1.20 if is_special_day == 1 else 1.0

    context_adjusted_price = price * traffic_multiplier * vehicle_multiplier * special_day_multiplier

    final_price = context_adjusted_price
    if avg_competitor_price > 0:
        if occupancy_rate > 0.8 and context_adjusted_price > avg_competitor_price:
            final_price = (context_adjusted_price + avg_competitor_price) / 2
        elif context_adjusted_price < avg_competitor_price:
            final_price = context_adjusted_price * 1.05

    return np.clip(final_price, 5.0, 25.0)

In [16]:
data = pw.io.csv.read(
    "parking_stream_full.csv",
    schema=ParkingSchema,
    mode="streaming",
    autocommit_duration_ms=1000
)

data = data.with_columns(
    Timestamp=pw.this.Timestamp.dt.strptime("%Y-%m-%d %H:%M:%S")
)

data_grouped_by_time = data.groupby(pw.this.Timestamp)

data_with_competitors = data_grouped_by_time.with_columns(
    avg_competitor_price=compute_avg_competitor_prices_batch(
        pw.this.SystemCodeNumber,
        pw.this.Latitude,
        pw.this.Longitude,
        pw.this.Occupancy,
        pw.this.Capacity,
        pw.this.QueueLength
    )
)

final_prices_table = data_with_competitors.with_columns(
    final_price=calculate_final_price(
        pw.this.Occupancy,
        pw.this.Capacity,
        pw.this.QueueLength,
        pw.this.avg_competitor_price,
        pw.this.TrafficConditionNearby,
        pw.this.IsSpecialDay,
        pw.this.VehicleType,
    )
)

In [17]:
import panel as pn
from bokeh.plotting import figure

pn.extension()

unique_lots = sorted(df['SystemCodeNumber'].unique().tolist())

def create_plot_for_lot(lot_id, prices_stream):

    filtered_lot_data = prices_stream.filter(
        pw.this.SystemCodeNumber == lot_id
    )

    def bokeh_plotter(source, **kwargs):
        fig = figure(
            height=300,
            width=800,
            title=f"Dynamic Price for Lot: {lot_id}",
            x_axis_type="datetime",
            y_axis_label="Price ($)"
        )

        fig.line("Timestamp", "final_price", source=source, legend_label="My Final Price", color="blue", line_width=2)
        fig.line("Timestamp", "avg_competitor_price", source=source, legend_label="Avg. Competitor Price", color="red", line_width=2, line_dash="dashed")

        fig.legend.location = "top_left"
        fig.legend.label_text_font_size = '8pt'
        fig.legend.background_fill_alpha = 0.7

        return fig

    return filtered_lot_data.plot(bokeh_plotter, sorting_col="Timestamp")


all_plots = [create_plot_for_lot(lot_id, final_prices_table) for lot_id in unique_lots]

dashboard = pn.Column(
    pn.pane.Markdown("# Real-Time Parking Lot Dynamic Pricing\n### Live View of All Lots"),
    *all_plots
)

dashboard

In [None]:

pw.run()

Output()

