# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [9]:
!pip install pathway bokeh pandas


Collecting pathway
  Downloading pathway-0.24.1-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/60.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
Collecting h3>=4 (from pathway)
  Downloading h3-4.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting python-sat>=0.1.8.dev0 (from pathway)
  Downloading python_sat-1.8.dev17-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl.metadata (1.5 kB)
Collecting beartype<0.16.0,>=0.14.0 (from pathway)
  Downloading beartype-0.15.0-py3-none-any.whl.metadata (28 kB)
Collecting diskcache>=5.2.1 (from pathway)
  Downloading diskcache-5.6.3-py3-none-any.whl.metadata (20 kB)
Collecting boto3<1.36.0,>=1.26.76 (from pathway)
  Downloading boto3-1.35.99-py3-none-any.whl.metadata (6.7

In [10]:
import pandas as pd

# Load your CSV
df = pd.read_csv("Parking_stream(1).csv", parse_dates=["Timestamp"])

# Preview
df.head()


Unnamed: 0,SystemCodeNumber_Encoded,QueueLength_Normalized,VehicleTypeWeight,Traffic,IsSpecialDay,Occ_Cap_Norm,Timestamp
0,0,0.066667,0.66,0.0,0,0.098521,2016-10-04 07:59:00
1,4,0.133333,0.33,0.0,0,0.186953,2016-10-04 07:59:00
2,3,0.133333,0.66,0.0,0,0.366915,2016-10-04 07:59:00
3,5,0.133333,0.66,0.0,0,0.491326,2016-10-04 07:59:00
4,13,0.133333,0.0,0.0,0,0.304781,2016-10-04 07:59:00


In [11]:
import pathway as pw
import datetime

base_price = 10.0
alpha = 2.0

# Assuming df contains your data loaded already
data = pw.debug.table_from_pandas(df)

tumbling_window = (
    data.windowby(
        pw.this.Timestamp,
        instance=pw.this.SystemCodeNumber_Encoded,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        LotID=pw.reducers.max(pw.this.SystemCodeNumber_Encoded),
        occ_avg=pw.reducers.max(pw.this.Occ_Cap_Norm),
    )
    .with_columns(
        Price=base_price + alpha * pw.this.occ_avg
    )
)

pw.debug.compute_and_print(tumbling_window, include_id=False)


t                   | LotID | occ_avg            | Price
2016-10-05 00:00:00 | 0     | 0.4458506107690383 | 10.891701221538076
2016-10-05 00:00:00 | 1     | 0.8282123085818337 | 11.656424617163667
2016-10-05 00:00:00 | 2     | 0.9478651107327292 | 11.895730221465458
2016-10-05 00:00:00 | 3     | 0.9096758344217376 | 11.819351668843476
2016-10-05 00:00:00 | 4     | 0.6221355449889324 | 11.244271089977865
2016-10-05 00:00:00 | 5     | 0.7535579414431994 | 11.5071158828864
2016-10-05 00:00:00 | 6     | 0.9503904858368332 | 11.900780971673667
2016-10-05 00:00:00 | 7     | 0.7083781839920972 | 11.416756367984194
2016-10-05 00:00:00 | 8     | 0.2046233529035083 | 10.409246705807016
2016-10-05 00:00:00 | 9     | 0.8033443945169817 | 11.606688789033964
2016-10-05 00:00:00 | 10    | 0.4549566227663972 | 10.909913245532794
2016-10-05 00:00:00 | 11    | 0.6059568744525353 | 11.21191374890507
2016-10-05 00:00:00 | 12    | 0.3993885666283828 | 10.798777133256765
2016-10-05 00:00:00 | 13    | 0.7293

In [12]:
price_df = pw.debug.compute_and_print(tumbling_window, include_id=False)


t                   | LotID | occ_avg            | Price
2016-10-05 00:00:00 | 0     | 0.4458506107690383 | 10.891701221538076
2016-10-05 00:00:00 | 1     | 0.8282123085818337 | 11.656424617163667
2016-10-05 00:00:00 | 2     | 0.9478651107327292 | 11.895730221465458
2016-10-05 00:00:00 | 3     | 0.9096758344217376 | 11.819351668843476
2016-10-05 00:00:00 | 4     | 0.6221355449889324 | 11.244271089977865
2016-10-05 00:00:00 | 5     | 0.7535579414431994 | 11.5071158828864
2016-10-05 00:00:00 | 6     | 0.9503904858368332 | 11.900780971673667
2016-10-05 00:00:00 | 7     | 0.7083781839920972 | 11.416756367984194
2016-10-05 00:00:00 | 8     | 0.2046233529035083 | 10.409246705807016
2016-10-05 00:00:00 | 9     | 0.8033443945169817 | 11.606688789033964
2016-10-05 00:00:00 | 10    | 0.4549566227663972 | 10.909913245532794
2016-10-05 00:00:00 | 11    | 0.6059568744525353 | 11.21191374890507
2016-10-05 00:00:00 | 12    | 0.3993885666283828 | 10.798777133256765
2016-10-05 00:00:00 | 13    | 0.7293

In [13]:

# Sink the results to CSV
pw.io.csv.write(tumbling_window, "price_output.csv")

# Execute the pipeline to flush data
pw.run()



Output()

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


In [14]:
import pandas as pd

price_df = pd.read_csv("price_output.csv")
price_df.head()


Unnamed: 0,t,LotID,occ_avg,Price,time,diff
0,2016-10-24T00:00:00.000000000,1,0.855599,11.711198,0,1
1,2016-10-18T00:00:00.000000000,6,0.918274,11.836547,0,1
2,2016-10-17T00:00:00.000000000,11,0.586279,11.172557,0,1
3,2016-12-02T00:00:00.000000000,9,0.847016,11.694032,0,1
4,2016-11-04T00:00:00.000000000,5,0.68204,11.36408,0,1


In [30]:
pw.io.csv.write(tumbling_window, "price_output.csv")
pw.run()

import pandas as pd
price_df = pd.read_csv("price_output.csv")
print(price_df.head())


Output()

                               t  LotID   occ_avg      Price  time  diff
0  2016-10-24T00:00:00.000000000      1  0.855599  11.711198     0     1
1  2016-10-18T00:00:00.000000000      6  0.918274  11.836547     0     1
2  2016-10-17T00:00:00.000000000     11  0.586279  11.172557     0     1
3  2016-12-02T00:00:00.000000000      9  0.847016  11.694032     0     1
4  2016-11-04T00:00:00.000000000      5  0.682040  11.364080     0     1


In [17]:
print(price_df["LotID"].unique())


[ 1  6 11  9  5  4  0 13  2 10  7  8  3 12]


In [18]:
lot_id_demo = price_df["LotID"].unique()[0]


In [19]:
filtered_df = price_df[price_df["LotID"] == lot_id_demo].sort_values(by="t")
print(filtered_df.head())


                                 t  LotID   occ_avg      Price  time  diff
952  2016-10-05T00:00:00.000000000      1  0.828212  11.656425     0     1
508  2016-10-06T00:00:00.000000000      1  0.855599  11.711198     0     1
6    2016-10-07T00:00:00.000000000      1  0.818254  11.636507     0     1
798  2016-10-08T00:00:00.000000000      1  0.708708  11.417416     0     1
954  2016-10-09T00:00:00.000000000      1  0.855599  11.711198     0     1


In [20]:
import pandas as pd
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource

output_notebook()

# Step 1: Convert 't' to datetime
price_df["t"] = pd.to_datetime(price_df["t"])

# Step 2: Loop over unique LotIDs
lot_ids = price_df["LotID"].unique()

for lot_id in lot_ids:
    filtered_df = price_df[price_df["LotID"] == lot_id].sort_values(by="t")

    source = ColumnDataSource(data=dict(t=filtered_df["t"], price=filtered_df["Price"]))

    p = figure(x_axis_type="datetime", title=f"Price Evolution for Lot {lot_id}",
               width=700, height=400)
    p.line(x='t', y='price', source=source, line_width=2, color='blue')
    p.circle(x='t', y='price', source=source, size=5, color='red')

    show(p)




























