#1.Importing necessary libraries and packages

In [None]:
!pip install pathway bokeh --quiet

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime,timedelta
import pathway as pw
import bokeh.plotting
import panel as pn



#2.Loading data

In [None]:
df = pd.read_csv('dataset.csv')
print(df.head())

#3. Preprocessing

In [None]:
#Checking for null values
print(df.isna().sum())

# Combine date and time columns into datetime column
print(df.columns)
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

#Encoding categorical variables
print(df["VehicleType"].unique())
print(df["TrafficConditionNearby"].unique())
df['Veh_Weight']=df['VehicleType'].map({"car":3.0, "bike":2.0, "truck":4.5, "cycle":0.5})
df['Traffic_Level']=df['TrafficConditionNearby'].map({'low':0.0,'average':1.0,'high':2.0})

#To csv file for data streaming
exclude=['ID','VehicleType','TrafficConditionNearby','LastUpdatedDate','LastUpdatedTime']
desired_cols=[c for c in df.columns if c not in exclude]
df[desired_cols].to_csv("preprocessed_dataset.csv",index=False)

4. Streaming data from preprocessed_dataset.csv

In [None]:
#Schema for incoming/streaming dataset
class parkingSchema(pw.Schema):
    SystemCodeNumber:str
    Capacity:int
    Latitude:float
    Longitude:float
    Occupancy:int
    QueueLength:int
    IsSpecialDay:int
    Timestamp:str
    Veh_Weight:float
    Traffic_Level:float

data_live = pw.demo.replay_csv("preprocessed_dataset.csv", schema=parkingSchema, input_rate=1000)

In [None]:
#Add new columns to the data stream:
fmt = "%Y-%m-%d %H:%M:%S"

# 't' contains the parsed full datetime
# 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_live_with_time = data_live.with_columns(
    t = data_live.Timestamp.dt.strptime(fmt),
    day = data_live.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


In [None]:
#Data Streaming pipeline from a static csv file.
def pricing_pipeline():
    base, α, β, γ, δ, ε, λ = 10.0, 0.3, 0.2, 0.1, 0.05, 1.0, 0.5

    return (
        data_live_with_time
        .windowby(
            pw.this.t,
            instance=pw.this.SystemCodeNumber,
            window=pw.temporal.tumbling(timedelta(days=1)),
            behavior=pw.temporal.exactly_once_behavior()
        )
        .reduce(
            _pw_window_end = pw.this._pw_window_end,
            SystemCodeNumber = pw.reducers.max(pw.this.SystemCodeNumber),
            Occ_max        = pw.reducers.max(pw.this.Occupancy),
            Occ_min       = pw.reducers.min(pw.this.Occupancy),
            Capacity         = pw.reducers.max(pw.this.Capacity),
            QueueLength      = pw.reducers.max(pw.this.QueueLength),
            TrafficLevel     = pw.reducers.max(pw.this.Traffic_Level),
            IsSpecial        = pw.reducers.max(pw.this.IsSpecialDay),
            VehicleWeight    = pw.reducers.max(pw.this.Veh_Weight),
        )
        #Compute Demand
        .with_columns(
            Demand = (
                α * (pw.this.Occ_max-pw.this.Occ_min)/ pw.this.Capacity
                + β * pw.this.QueueLength
                - γ * pw.this.TrafficLevel
                + δ * pw.this.IsSpecial
                + ε * pw.this.VehicleWeight
            ),
            t = pw.this._pw_window_end #Rename window end column to t
        )
        #Compute normalized demand
        .with_columns(
            Dn = ( pw.this.Demand/pw.this.Capacity )
        )
        #Compute normalized price
        .with_columns(
            price = base * (1 + λ * pw.this.Dn)
        )
        #Final selection of output columns
        .select(pw.this.t, pw.this.SystemCodeNumber, pw.this.price)
    )

#Write directly to CSV file with just those 3 columns
pw.io.csv.write(pricing_pipeline(), "out.csv")


5. Visualization of dynamic price of each parking lot for one day

In [None]:
from bokeh.models import ColumnDataSource, CategoricalColorMapper
from bokeh.palettes import Category20_14

pn.extension()

#Read in your CSV once, to get the full list of 14 unique SystemCodeNumber values
df = pd.read_csv('preprocessed_dataset.csv')
color_codes= sorted(df['SystemCodeNumber'].unique())

#Build a CategoricalColorMapper
color_mapper = CategoricalColorMapper(
    factors=color_codes,
    palette=Category20_14
)

def price_plotter(source: ColumnDataSource):
    #Create the figure
    fig = bokeh.plotting.figure(
        height=700, width=1000,
        title="Daily Parking Price by SystemCodeNumber",
        x_axis_type="datetime",
        toolbar_location="above",
    )

    #Overall line
    fig.line(
        x="t", y="price", source=source,
        line_width=2, color="navy"
    )

    #Scatter with categorical color mapper and legend_field
    fig.circle(
        x='t', y='price', source=source,
        size=8,
        fill_color={'field': 'SystemCodeNumber', 'transform': color_mapper},
        line_color='black',
        legend_field='SystemCodeNumber'
    )

    #Legend & axes styling
    fig.legend.title        = "SystemCodeNumber"
    fig.legend.location     = "top_left"
    fig.legend.click_policy = "hide"
    fig.xaxis.axis_label    = "Date (t)"
    fig.yaxis.axis_label    = "Price"

    return fig


delta_window = pricing_pipeline()
viz = delta_window.plot(price_plotter, sorting_col="t")
pn.Column(viz).servable()

In [None]:
# Start the Pathway pipeline execution in the background
# This triggers the real-time data stream processing defined above
# %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()