# Inroduction


starting the capstone project Here --


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [8]:
#importing the parking data csv file to dataframe
df = pd.read_csv('dataset.csv')

In [9]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)
print(df['VehicleType'].unique())
print(df['TrafficConditionNearby'].unique())

df.head()

['car' 'bike' 'cycle' 'truck']
['low' 'average' 'high']


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,5248,BHMNCPHST01,1200,26.140014,91.731,237,bike,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,3936,BHMMBMMBX01,687,20.000035,78.000003,264,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,6560,BHMNCPNST01,485,26.140048,91.730972,249,car,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,17056,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00


In [10]:
from sklearn.preprocessing import OrdinalEncoder

#encoding the vehicles column to numbers by ordinal encoder
encoder1 = OrdinalEncoder(categories=[['car' , 'bike'  ,'cycle' ,'truck']])
#encoding the traffic condition  column to numbers by ordinal encoder
encoder2 = OrdinalEncoder(categories=[['low' ,'average', 'high']])
df['VehicleType'] = encoder1.fit_transform(df[['VehicleType']])
df['TrafficConditionNearby'] = encoder2.fit_transform(df[['TrafficConditionNearby']])
df['VehicleType'] = df['VehicleType'].astype(int)
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].astype(int)
df.head()


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,0,0,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,5248,BHMNCPHST01,1200,26.140014,91.731,237,1,0,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,3936,BHMMBMMBX01,687,20.000035,78.000003,264,0,0,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,6560,BHMNCPNST01,485,26.140048,91.730972,249,0,0,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,17056,Shopping,1920,26.150504,91.733531,614,2,0,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00


In [11]:
df.columns

Index(['ID', 'SystemCodeNumber', 'Capacity', 'Latitude', 'Longitude',
       'Occupancy', 'VehicleType', 'TrafficConditionNearby', 'QueueLength',
       'IsSpecialDay', 'LastUpdatedDate', 'LastUpdatedTime', 'Timestamp'],
      dtype='object')

In [12]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity","Latitude","Longitude","VehicleType","TrafficConditionNearby","QueueLength","IsSpecialDay"]].to_csv("parking_stream.csv", index=False)


In [13]:
# the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    Latitude :float  # latitude of the parking space
    Longitude : float # longitude of the parking space
    VehicleType : int # type of vehicle
    TrafficConditionNearby : int # type of the traffic condition nearby
    QueueLength : int # specifies the no. of vehicles in the waiting area
    IsSpecialDay : int # specifies whether the day is a special day or not


In [14]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [15]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [16]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


# model 2


In [25]:
import datetime

delta_window_model2 = (
    data_with_time.windowby( #this groups the data time by time and day by day
                            #ensures that all the day goes through day by day
        pw.this.t,    # event time
        instance=pw.this.day,   # partition by day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()  # correct spelling!
    )
    #this tells the pathway to summarise the day's data

    .reduce(
    t=pw.this._pw_window_end,
    occ_max=pw.reducers.max(pw.this.Occupancy),
    cap=pw.reducers.max(pw.this.Capacity),
    queue_avg=pw.reducers.avg(pw.this.QueueLength),
    traffic_value=pw.reducers.max(pw.this.TrafficConditionNearby),
    vehicle_value=pw.reducers.max(pw.this.VehicleType),
    special_flag=pw.reducers.max(pw.this.IsSpecialDay),

)
.with_columns(


#the pricing model is written below



# Intuition:
# This pricing model is designed such that the dynamic parking price will always stay
# in the range of 10 to 20 rupees, depending on several key factors:
#
# - Availability of spaces relative to total capacity (occupancy rate)
# - Average queue length at the time interval
# - Traffic condition nearby (encoded as low=0, average=1, high=2)
# - Type of vehicle (car, bike, cycle, truck encoded as 0-3)
# - Whether the day is a special day or a regular working day (special flag)
#
# The occupancy rate is considered the strongest driver of price,
# followed by the average queue length.
#
# The traffic value has a negative impact on demand: worse traffic (higher number)
# slightly reduces the demand, so it subtracts from the demand function.
# The vehicle type and special day contribute positively,
# slightly raising the demand and thus the price.
#
# The overall demand function is defined as:
# demand = (
#    0.6 * (pw.this.occ_max / pw.this.cap) +
#    0.3 * (pw.this.queue_avg / 10) -
#    0.1 * pw.this.traffic_value +
#    0.1 * pw.this.vehicle_value +
#    0.05 * pw.this.special_flag
# )
#
# Since this demand is roughly normalized between 0 and 1
# (via inline normalization in the formula),
# we compute the final price as:
# price = 10 + 10 * demand
#
# This ensures the final dynamic parking price will always stay
# approximately in the range of 10 to 20 rupees,
# depending on all these influencing factors.


    price = 10 + 10 * (
        (
            0.6 * (pw.this.occ_max / pw.this.cap) +
            0.3 * (pw.this.queue_avg / 10) -
            0.1 * pw.this.traffic_value +
            0.1 * pw.this.vehicle_value +
            0.05 * pw.this.special_flag
            - 0.1
        ) / 1.1
    )


)

)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [26]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
# viz = delta_window.plot(price_plotter, sorting_col="t")

# # Create a Panel layout and make it servable as a web app
# # This line enables the interactive plot to be displayed when the app is served
# pn.Column(viz).servable()

# # for model 2
# viz2 = delta_window_model2.plot(price_plotter , sorting_col="t");
# pn.Column(viz2).servable()

viz1 = delta_window.plot(price_plotter, sorting_col="t")        # Model 1
viz2 = delta_window_model2.plot(price_plotter, sorting_col="t") # Model 2

pn.Row(viz1, viz2).servable()




In [27]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

