# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m32.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m43.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [4]:
df = pd.read_csv('/content/dataset.csv')
df


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [5]:
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

df = df.sort_values('Timestamp').reset_index(drop=True)

In [6]:
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)


In [7]:

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location

In [8]:

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [9]:
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# Step 2: Making a simple pricing function

In [10]:
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [11]:
pn.extension()

def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")

    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

viz = delta_window.plot(price_plotter, sorting_col="t")

pn.Column(viz).servable()



In [12]:

%%capture --no-display
pw.run()

Output()



In [14]:
import pandas as pd
import numpy as np
import pathway as pw
from bokeh.plotting import figure, show, output_notebook
output_notebook()


In [20]:

df = pd.read_csv("/content/dataset.csv")

# Combine datetime
df["Timestamp"] = pd.to_datetime(
    df["LastUpdatedDate"] + ' ' + df["LastUpdatedTime"],
    dayfirst=True
)

# Now sort using correct column names
df.sort_values(["ID", "Timestamp"], inplace=True)

df.head()


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00,2016-10-04 08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00,2016-10-04 08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00,2016-10-04 09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00,2016-10-04 09:59:00


In [22]:
class ParkingDataSchema(pw.Schema):
    ParkingLotID: str
    Timestamp: str
    Latitude: float
    Longitude: float
    Capacity: int
    Occupancy: int
    QueueLength: int
    VehicleType: str
    Traffic: float
    IsSpecialDay: bool

In [24]:
parking_table = pw.io.csv.read(
    "/content/dataset.csv",
    schema=ParkingDataSchema,
    mode="streaming",
    autocommit_duration_ms=500,
)

In [25]:
vehicle_weights = {
    "car": 1.0,
    "bike": 0.7,
    "truck": 1.5
}

In [26]:
@pw.udf
def model1_pricing(prev_price: float, occupancy: int, capacity: int, alpha=0.5):
    return round(prev_price + alpha * (occupancy / capacity), 2)

pricing_model1 = parking_table.select(
    ParkingLotID=pw.this.ParkingLotID,
    Timestamp=pw.this.Timestamp,
    Price=model1_pricing(10.0, pw.this.Occupancy, pw.this.Capacity),
)

In [27]:
@pw.udf
def compute_demand(occ, cap, queue, traffic, is_special, vehicle_type):
    occupancy_ratio = occ / cap
    vt_weight = vehicle_weights.get(vehicle_type, 1.0)
    demand = 0.4*occupancy_ratio + 0.3*queue - 0.2*traffic + 0.2*int(is_special) + 0.5*vt_weight
    return demand

@pw.udf
def model2_pricing(demand, base=10, lam=0.3):
    normalized = (demand - 0.5) / 3.0
    price = base * (1 + lam * normalized)
    return round(min(max(price, base * 0.5), base * 2), 2)

demand_table = parking_table.select(
    ParkingLotID=pw.this.ParkingLotID,
    Timestamp=pw.this.Timestamp,
    Demand=compute_demand(pw.this.Occupancy, pw.this.Capacity, pw.this.QueueLength,
                           pw.this.Traffic, pw.this.IsSpecialDay, pw.this.VehicleType)
)

pricing_model2 = demand_table.select(
    ParkingLotID=pw.this.ParkingLotID,
    Timestamp=pw.this.Timestamp,
    Price=model2_pricing(pw.this.Demand)
)

In [28]:
@pw.udf
def haversine(lat1, lon1, lat2, lon2):
    R = 6371
    phi1, phi2 = np.radians(lat1), np.radians(lat2)
    d_phi = np.radians(lat2 - lat1)
    d_lambda = np.radians(lon2 - lon1)
    a = np.sin(d_phi/2)**2 + np.cos(phi1)*np.cos(phi2)*np.sin(d_lambda/2)**2
    return R * 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))

In [29]:
from bokeh.models import ColumnDataSource
from bokeh.layouts import layout

pricing_model2_df = df.copy()
pricing_model2_df["Price"] = 10 + 0.5 * (df["Occupancy"] / df["Capacity"])

source = ColumnDataSource(pricing_model2_df)

p = figure(x_axis_type="datetime", title="Dynamic Pricing")
p.line(x='Timestamp', y='Price', source=source, legend_label="Price", line_width=2)

show(p)

In [32]:
pw.io.jsonlines.write(pricing_model2, filename="pricing_output.jsonl")
