<a href="https://colab.research.google.com/github/Akankshya001/Pathway_Project/blob/main/Pathway_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('dataset.csv')
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
df[['SystemCodeNumber', 'Capacity', 'Latitude', 'Longitude']].drop_duplicates()

Unnamed: 0,SystemCodeNumber,Capacity,Latitude,Longitude
0,BHMBCCMKT01,577,26.144536,91.736172
1312,BHMBCCTHL01,387,26.144495,91.736205
2624,BHMEURBRD01,470,26.14902,91.739503
3936,BHMMBMMBX01,687,20.000035,78.000003
5248,BHMNCPHST01,1200,26.140014,91.731
6560,BHMNCPNST01,485,26.140048,91.730972
7872,Broad Street,690,26.137958,91.740994
9184,Others-CCCPS105a,2009,26.147473,91.728049
10496,Others-CCCPS119a,2803,26.147541,91.72797
11808,Others-CCCPS135a,3883,26.147499,91.728005


In [None]:
df['SystemCodeNumber'].nunique()

14

In [None]:
df.groupby('SystemCodeNumber')[['Capacity','Occupancy','QueueLength']].agg(['min','max','mean'])

Unnamed: 0_level_0,Capacity,Capacity,Capacity,Occupancy,Occupancy,Occupancy,QueueLength,QueueLength,QueueLength
Unnamed: 0_level_1,min,max,mean,min,max,mean,min,max,mean
SystemCodeNumber,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2
BHMBCCMKT01,577,577,577.0,2,573,162.029726,0,11,3.643293
BHMBCCTHL01,387,387,387.0,39,403,288.35747,1,12,4.128049
BHMEURBRD01,470,470,470.0,28,470,302.49314,0,11,4.00686
BHMMBMMBX01,687,687,687.0,170,688,477.301829,1,13,4.463415
BHMNCPHST01,1200,1200,1200.0,55,954,557.686738,1,12,4.509909
BHMNCPNST01,485,485,485.0,136,467,285.938262,1,11,4.102134
Broad Street,690,690,690.0,48,690,436.159299,1,12,4.269817
Others-CCCPS105a,2009,2009,2009.0,452,1914,1138.500762,1,14,5.140244
Others-CCCPS119a,2803,2803,2803.0,51,1534,540.091463,1,13,4.539634
Others-CCCPS135a,3883,3883,3883.0,472,3499,2292.382622,1,15,5.581555


Maximum occupancy is more than capacity in some cases which may cause a problem

In [None]:
demand_fluctuation=((df.groupby('SystemCodeNumber')['Occupancy'].max()-df.groupby('SystemCodeNumber')['Occupancy'].min())/df.groupby('SystemCodeNumber')['Capacity'].max())
demand_fluctuation

Unnamed: 0_level_0,0
SystemCodeNumber,Unnamed: 1_level_1
BHMBCCMKT01,0.989601
BHMBCCTHL01,0.940568
BHMEURBRD01,0.940426
BHMMBMMBX01,0.754003
BHMNCPHST01,0.749167
BHMNCPNST01,0.682474
Broad Street,0.930435
Others-CCCPS105a,0.727725
Others-CCCPS119a,0.529076
Others-CCCPS135a,0.779552


Since the (demand fluctuation)/capacity of each parking space is less than 1, the weight (alpha) for it can be set to 1.

In [None]:
TRAFFIC_ENCODING = {'low': 0, 'average': 0.5, 'high': 1}
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].apply(lambda x: TRAFFIC_ENCODING.get(x))

In [None]:
VEHICLE_TYPE_WEIGHT = {'cycle': 0.4, 'bike': 0.7, 'car': 1, 'truck': 1.5}
df['VehicleType'] = df['VehicleType'].apply(lambda x: VEHICLE_TYPE_WEIGHT.get(x, 1))

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
df[['Timestamp', 'SystemCodeNumber' , 'Capacity', 'Occupancy', 'VehicleType', 'TrafficConditionNearby', 'QueueLength',
       'IsSpecialDay']].to_csv("parking_stream.csv", index=False)

In [None]:
class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Capacity: int
    Occupancy: int
    VehicleType: float
    TrafficConditionNearby: float
    QueueLength: int
    IsSpecialDay: int

In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Model 1: Baseline Model

In [None]:
import datetime
BASE_PRICE = 10
alpha = 1.0
@pw.udf
def compute_price_model1(occupancy, capacity):
    if capacity > 0:
        occ_rate = occupancy / capacity
    else:
        occ_rate = 0
    price = BASE_PRICE + alpha * occ_rate * BASE_PRICE
    return max(5, min(price, 20))

delta_window_model1 = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.SystemCodeNumber,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior(),
    )
    .reduce(
        t=pw.this._pw_window_end,
        lot_id=pw.reducers.any(pw.this.SystemCodeNumber),
        occ_max=pw.reducers.max(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
    )
    .with_columns(
        price=compute_price_model1(
            occupancy=pw.this.occ_max,
            capacity=pw.this.cap
            )
    ))


# Model 2: Demand-Based Price Function


In [None]:
import pathway as pw
import datetime

BASE_PRICE = 10
weight_occupancy = 1.0
weight_queue = 0.5
weight_traffic = 0.7
weight_special = 0.5
weight_vehicle = 0.8

def compute_demand(occupancy, capacity, queue, traffic, special, vtype):
    if capacity > 0:
        occ_rate = occupancy / capacity
    else:
        occ_rate = 0
    demand = (
        weight_occupancy * occ_rate
        + weight_queue * queue
        - weight_traffic * traffic
        + weight_special * special
        + weight_vehicle * vtype
    )
    return demand

def normalize_demand(demand, min_d=0, max_d=5):
    demand = max(min(demand, max_d), min_d)
    return (demand - min_d) / (max_d - min_d)

@pw.udf
def compute_price_model2(occupancy, capacity, queue, traffic, special, vtype):
    demand = compute_demand(occupancy, capacity, queue, traffic, special, vtype)
    norm_d = normalize_demand(demand)
    price = BASE_PRICE * (1 + norm_d)
    return max(5, min(price, 20))

delta_window_model2 = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.SystemCodeNumber,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior(),
    )
    .reduce(
        t=pw.this._pw_window_end,
        lot_id=pw.reducers.any(pw.this.SystemCodeNumber),
        occ=pw.reducers.max(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
        queue=pw.reducers.max(pw.this.QueueLength),
        traffic=pw.reducers.max(pw.this.TrafficConditionNearby),
        special=pw.reducers.max(pw.this.IsSpecialDay),
        vtype=pw.reducers.max(pw.this.VehicleType),
    )
    .with_columns(
        price=compute_price_model2(
            pw.this.occ,
            pw.this.cap,
            pw.this.queue,
            pw.this.traffic,
            pw.this.special,
            pw.this.vtype,
        )
    )
)

# Visualising Using Bokeh

In [190]:
from bokeh.palettes import Category20

def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",
        x_axis_label="Time",
        y_axis_label="Price ($)"
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig


In [191]:
# Use Pathway's built-in visualization module to visualize the tables
viz = delta_window_model1.plot(price_plotter, sorting_col="t")
viz2= delta_window_model2.plot(price_plotter, sorting_col="t")
# Create a Panel layout and make it servable as a web app
pn.Column(viz,viz2).servable()

In [192]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

