In [None]:
pip install pathway #Installing pathway



In [None]:
#Importing necessary libraries
import pandas as pd
import numpy as np
import itertools
import bokeh.plotting
import panel as pn

In [None]:
import pathway as pw #Import pathway

In [None]:
# Read dataset
df = pd.read_csv('dataset.csv')

In [None]:
# Printing the first five rows of the dataset
df.head()

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00


In [None]:
df.shape

(18368, 12)

In [None]:
# Combine 'LastUpdatedDate' and 'LastUpdatedTime' columns, convert to datetime, and store in new column 'Timestamp'
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'], format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the 'Timestamp' column in ascending order and reset index
df = df.sort_values('Timestamp').reset_index(drop=True)

df.head()

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,5248,BHMNCPHST01,1200,26.140014,91.731,237,bike,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,3936,BHMMBMMBX01,687,20.000035,78.000003,264,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,6560,BHMNCPNST01,485,26.140048,91.730972,249,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,17056,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00


In [None]:
# Save selected columns to a CSV file named 'parking_stream.csv' without writing the index
df[["SystemCodeNumber","Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

In [None]:
class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    SystemCodeNumber: str  # Unique identifier for the parking location


In [None]:
# Load data from 'parking_stream.csv' as a simulated data stream with given schema and input rate
# 'schema' defines the structure of the data; 'input_rate=1000' simulates 1000 rows per second
data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),
    lot = data.SystemCodeNumber
)



In [None]:
import datetime
import pathway as pw

# Define a user-defined function (UDF) to calculate price using logistic function
@pw.udf
def logistic_price(occupancy: float) -> float:
    # Hardcode constants here
    min_price = 10   # Set minimum price
    max_price = 15   #Set maximum price
    k = 10.0  #Steepness of the curve

    if occupancy is None:
        return None

    # Logistic pricing formula based on normalized occupancy
    return min_price + (max_price - min_price) / (1 + pow(2.71828, -k * (occupancy - 0.5)))

# Define a daily tumbling window for aggregating streaming data
delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Timestamp column for windowing
        instance=pw.this.day,  # Create separate windows per day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # 1-day window
        behavior=pw.temporal.exactly_once_behavior()  # Ensure exactly-once processing
    )
    # Aggregate values over each window
    .reduce(
        t=pw.this._pw_window_end,  # Timestamp at window end
        occ_max=pw.reducers.max(pw.this.Occupancy),   # Max occupancy in window
        occ_min=pw.reducers.min(pw.this.Occupancy),   # Min occupancy in window
        occ_sum=pw.reducers.sum(pw.this.Occupancy),   # Total occupancy
        occ_count=pw.reducers.count(pw.this.Occupancy),  # Count of occupancy readings
        cap=pw.reducers.max(pw.this.Capacity),  # Capacity (assumed constant, so take max)
        SystemCodeNumber=pw.reducers.any(pw.this.SystemCodeNumber)  # Get any lot ID from the window
    )
        # Add intermediate calculated columns
    .with_columns(
        occ_avg=pw.this.occ_sum / pw.this.occ_count,  # Average occupancy
        normalized_occ=(pw.this.occ_sum / pw.this.occ_count) / pw.this.cap,  # Normalized occupancy (0 to 1)

    )
    # Add pricing and debug columns
    .with_columns(
        price=logistic_price(pw.this.normalized_occ),  # Apply logistic pricing based on normalized occupancy
        debug_occ=pw.this.normalized_occ  # Keep normalized occupancy for debugging/visualization
    )
)



In [None]:
import panel as pn
import bokeh.plotting

# Activate Panel's extension for interactive plots
pn.extension()

# Define a function to create a Bokeh plot for pricing data
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=300,               # Height of the plot
        width=700,                # Width of the plot
        title="Pathway: Daily Parking Price",  # Plot title
        x_axis_type="datetime"    # X-axis is time-based
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")  # Line for price over time
    fig.scatter("t", "price", source=source, size=5, color="red")      # Red dots for each price point

    return fig

# List of 14 unique parking lot IDs (manually defined)
lot_ids = ['BHMBCCMKT01', 'BHMNCPHST01', 'BHMMBMMBX01', 'BHMNCPNST01',
           'Shopping', 'BHMEURBRD01', 'Broad Street', 'Others-CCCPS8',
           'Others-CCCPS105a', 'Others-CCCPS119a', 'BHMBCCTHL01',
           'Others-CCCPS135a', 'Others-CCCPS202', 'Others-CCCPS98']

# Create an empty list to store Panel tab content
tabs = []

# Loop through each lot, filter its data, and create visualizations
for lot in lot_ids:
    lot_data = delta_window.filter(pw.this.SystemCodeNumber == lot)  # Filter data for current lot
    viz = lot_data.plot(price_plotter, sorting_col="t")              # Plot using price_plotter
    tabs.append((f"Lot {lot}", pn.Column(viz)))                      # Add plot to tab with lot name

# Display all tabs in a single Panel layout
pn.Tabs(*tabs).servable()


In [None]:
%%capture --no-display  # Jupyter magic to suppress output and logs while running the cell
pw.run()  # Start and run the Pathway data pipeline

Output()

