# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m35.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m51.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [4]:
df = pd.read_csv('/content/dataset.csv')
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [5]:
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')
df = df.sort_values('Timestamp').reset_index(drop=True)

In [6]:
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

In [7]:
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int

In [8]:
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [9]:
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# Step 2: Making a simple pricing function

In [10]:
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
    )
    .with_columns(
        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [11]:
pn.extension()

def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

viz = delta_window.plot(price_plotter, sorting_col="t")

pn.Column(viz).servable()



In [12]:
%%capture --no-display
pw.run()

Output()



In [15]:
class ParkingSchema1(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: bool
    VehicleType: str

df[["Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]].to_csv("parking_stream1.csv", index=False)
data = pw.demo.replay_csv("parking_stream1.csv", schema=ParkingSchema1, input_rate=1000)

fmt = "%Y-%m-%d %H:%M:%S"
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

def map_traffic_level(val: str) -> float:
    mapping = {"low": 1.0, "average": 2.0, "high": 3.0}
    return mapping.get(val.lower().strip(), 2.0)

def map_vehicle_weight(val: str) -> float:
    mapping = {"bike": 0.5, "car": 1.0, "truck": 1.5}
    return mapping.get(val.lower().strip(), 1.0)

data_mapped = data_with_time.with_columns(
    traffic_level_numeric = pw.apply(map_traffic_level, pw.this.TrafficConditionNearby),
    vehicle_type_weight = pw.apply(map_vehicle_weight, pw.this.VehicleType)
)

BASE_PRICE = 10
LAMBDA = 0.3
ALPHA = 1.0
BETA = 0.5
GAMMA = 0.4
DELTA = 0.2
EPSILON = 0.3

def compute_raw_demand(occupancy, capacity, queue, traffic, is_special, vehicle_weight):
    if capacity == 0:
        return 0.0
    return (
        ALPHA * (occupancy / capacity) +
        BETA * queue -
        GAMMA * traffic +
        DELTA * is_special +
        EPSILON * vehicle_weight
    )

def normalize_demand(raw_demand, min_demand=0.0, max_demand=10.0):
    if max_demand - min_demand == 0:
        return 0.0
    return (raw_demand - min_demand) / (max_demand - min_demand)

def compute_price(normalized_demand):
    raw_price = BASE_PRICE * (1 + LAMBDA * normalized_demand)
    return min(max(0.5 * BASE_PRICE, raw_price), 2.0 * BASE_PRICE)

daily_window = (
    data_mapped.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t = pw.this._pw_window_end,
        occupancy = pw.reducers.max(pw.this.Occupancy),
        capacity = pw.reducers.max(pw.this.Capacity),
        queue_sum = pw.reducers.sum(pw.this.QueueLength),
        queue_count = pw.reducers.count(),
        traffic_sum = pw.reducers.sum(pw.this.traffic_level_numeric),
        vehicle_sum = pw.reducers.sum(pw.this.vehicle_type_weight),
        is_special = pw.reducers.max(pw.this.IsSpecialDay)
    )
    .with_columns(
        queue = pw.this.queue_sum / pw.this.queue_count,
        traffic = pw.this.traffic_sum / pw.this.queue_count,
        vehicle_weight = pw.this.vehicle_sum / pw.this.queue_count,
    )
    .with_columns(
        raw_demand = pw.apply(
            compute_raw_demand,
            pw.this.occupancy,
            pw.this.capacity,
            pw.this.queue,
            pw.this.traffic,
            pw.this.is_special,
            pw.this.vehicle_weight
        )
    )
    .with_columns(
        normalized_demand = pw.apply(normalize_demand, pw.this.raw_demand)
    )
    .with_columns(
      price = pw.apply(compute_price, pw.this.normalized_demand)
    )
)
pn.extension()
viz1 = daily_window.plot(price_plotter, sorting_col="t")

pn.Column(viz1).servable()



In [17]:
%%capture --no-display
pw.run()

Output()

