In [1]:
# Install required packages quietly
!pip install pathway bokeh --quiet

import pathway as pw
import pandas as pd
from math import radians, sin, cos, sqrt, atan2

In [2]:

# Load raw dataset from Google Drive into a DataFrame
df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/dataset.csv")
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [3]:

# Combine date and time columns into a single timestamp string

df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'], format="%d-%m-%Y %H:%M:%S")
df['Timestamp'] = df['Timestamp'].dt.strftime("%d-%m-%Y %H:%M:%S")
# Select and reorder columns for cleaning
df_clean = df[[
    'ID','SystemCodeNumber','Capacity','Latitude','Longitude',
    'Occupancy','QueueLength','TrafficConditionNearby',
    'IsSpecialDay','VehicleType','LastUpdatedDate','LastUpdatedTime','Timestamp'
]]
# Save for Pathway ingestion
df_clean.to_csv("parking_stream_clean.csv", index=False)
print("✅ Clean CSV columns:", df_clean.columns.tolist())
# Display original and cleaned DataFrames
df
df_clean

✅ Clean CSV columns: ['ID', 'SystemCodeNumber', 'Capacity', 'Latitude', 'Longitude', 'Occupancy', 'QueueLength', 'TrafficConditionNearby', 'IsSpecialDay', 'VehicleType', 'LastUpdatedDate', 'LastUpdatedTime', 'Timestamp']


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,QueueLength,TrafficConditionNearby,IsSpecialDay,VehicleType,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,1,low,0,car,04-10-2016,07:59:00,04-10-2016 07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,1,low,0,car,04-10-2016,08:25:00,04-10-2016 08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,2,low,0,car,04-10-2016,08:59:00,04-10-2016 08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,2,low,0,car,04-10-2016,09:32:00,04-10-2016 09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,2,low,0,bike,04-10-2016,09:59:00,04-10-2016 09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,6,average,0,truck,19-12-2016,14:30:00,19-12-2016 14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,3,low,0,car,19-12-2016,15:03:00,19-12-2016 15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,3,low,0,cycle,19-12-2016,15:29:00,19-12-2016 15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,2,low,0,car,19-12-2016,16:03:00,19-12-2016 16:03:00


In [4]:
# Define pricing model parameters
BASE_PRICE = 10.0              # Base price in dollars
LAMBDA = 0.5                   # Demand sensitivity coefficient
MAX_PRICE = 2 * BASE_PRICE     # Maximum allowed price
MIN_PRICE = 0.5 * BASE_PRICE   # Minimum allowed price

# Traffic condition to numeric
@pw.udf
def traffic_level_mapper(traffic_str: str) -> int:
    return {"low": 0, "medium": 1, "high": 2}.get(traffic_str.lower(), 1)

# Vehicle type to weight
@pw.udf
def vehicle_weight(vehicle_type: str) -> float:
    return {"car": 1.0, "bike": 0.5, "truck": 2.0}.get(vehicle_type.lower(), 1.0)

# Demand function
@pw.udf
def compute_demand(occupancy: int, capacity: int, queue: int, traffic: int, is_special: int, vehicle_weight: float) -> float:
    if capacity == 0:
        return 0.0
    return (
        2.0 * (occupancy / capacity)
        + 1.5 * queue
        - 1.0 * traffic
        + 2.0 * is_special
        + 1.0 * vehicle_weight
    )

# Normalize demand and compute price
@pw.udf
def demand_to_price(demand: float) -> float:
    norm_demand = demand / 10  # Assuming demand is in a 0–10 range
    price = BASE_PRICE * (1 + LAMBDA * norm_demand)
    return max(min(price, MAX_PRICE), MIN_PRICE)


In [5]:

# Define Haversine UDF for distance calculation
@pw.udf
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    R = 6371000.0  # Earth radius in meters
    φ1, φ2 = radians(lat1), radians(lat2)
    Δφ = radians(lat2 - lat1)
    Δλ = radians(lon2 - lon1)
    a = sin(Δφ / 2)**2 + cos(φ1) * cos(φ2) * sin(Δλ / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c


In [6]:
# Define schema for streaming data
class LotSchema(pw.Schema):
    ID: int
    SystemCodeNumber: str
    Capacity: int
    Latitude: float
    Longitude: float
    Occupancy: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str
    LastUpdatedDate: str
    LastUpdatedTime: str
    Timestamp: str

In [7]:
# Replay cleaned CSV as a streaming source at 1000 rows/sec
stream = pw.demo.replay_csv("parking_stream_clean.csv", schema=LotSchema, input_rate=1000)
print(stream)

<pathway.Table schema={'ID': <class 'int'>, 'SystemCodeNumber': <class 'str'>, 'Capacity': <class 'int'>, 'Latitude': <class 'float'>, 'Longitude': <class 'float'>, 'Occupancy': <class 'int'>, 'QueueLength': <class 'int'>, 'TrafficConditionNearby': <class 'str'>, 'IsSpecialDay': <class 'int'>, 'VehicleType': <class 'str'>, 'LastUpdatedDate': <class 'str'>, 'LastUpdatedTime': <class 'str'>, 'Timestamp': <class 'str'>}>


In [8]:
# Parse timestamp strings into datetime objects and extract day buckets
fmt = "%d-%m-%Y %H:%M:%S"
stream = stream.with_columns(
    t = stream.Timestamp.dt.strptime(fmt),
    day = stream.Timestamp.dt.strptime(fmt).dt.strftime("%d-%m-%YT00:00:00")
)
print(stream)

<pathway.Table schema={'ID': <class 'int'>, 'SystemCodeNumber': <class 'str'>, 'Capacity': <class 'int'>, 'Latitude': <class 'float'>, 'Longitude': <class 'float'>, 'Occupancy': <class 'int'>, 'QueueLength': <class 'int'>, 'TrafficConditionNearby': <class 'str'>, 'IsSpecialDay': <class 'int'>, 'VehicleType': <class 'str'>, 'LastUpdatedDate': <class 'str'>, 'LastUpdatedTime': <class 'str'>, 'Timestamp': <class 'str'>, 't': <class 'pathway.internals.datetime_types.DateTimeNaive'>, 'day': <class 'str'>}>


In [9]:
# Stage 1 of Model 2 – add numeric traffic level and vehicle weight
stage1 = stream.with_columns(
    TrafficLevel=traffic_level_mapper(stream.TrafficConditionNearby),
    VehicleWeight=vehicle_weight(stream.VehicleType)
)

In [10]:
# Apply Model 2 Logic - Stage 2: Compute Demand
stage2 = stage1.with_columns(
    Demand=compute_demand(
        stage1.Occupancy,
        stage1.Capacity,
        stage1.QueueLength,
        stage1.TrafficLevel,
        stage1.IsSpecialDay,
        stage1.VehicleWeight
    )
)


In [11]:
# Stage 3: Compute Model2 price and preserve ID
stage3 = stage2.with_columns(
    Model2Price = demand_to_price(stage2.Demand),
    ID=stage2.ID

)
print(stage3)

<pathway.Table schema={'ID': <class 'int'>, 'SystemCodeNumber': <class 'str'>, 'Capacity': <class 'int'>, 'Latitude': <class 'float'>, 'Longitude': <class 'float'>, 'Occupancy': <class 'int'>, 'QueueLength': <class 'int'>, 'TrafficConditionNearby': <class 'str'>, 'IsSpecialDay': <class 'int'>, 'VehicleType': <class 'str'>, 'LastUpdatedDate': <class 'str'>, 'LastUpdatedTime': <class 'str'>, 'Timestamp': <class 'str'>, 't': <class 'pathway.internals.datetime_types.DateTimeNaive'>, 'day': <class 'str'>, 'TrafficLevel': <class 'int'>, 'VehicleWeight': <class 'float'>, 'Demand': <class 'float'>, 'Model2Price': <class 'float'>}>


In [12]:
import pathway as pw

# 1. Alias the original stream
left = stage3
right = stage3.copy()

# 2. Add the dummy key to both
left  = left.with_columns(_join_key=1)
right = right.with_columns(_join_key=1)

# 3. Perform the Cartesian (inner) join
joined = left.join(
    right,
    left._join_key == right._join_key,
    how=pw.JoinMode.INNER,
)
list(left.column_names())

['ID',
 'SystemCodeNumber',
 'Capacity',
 'Latitude',
 'Longitude',
 'Occupancy',
 'QueueLength',
 'TrafficConditionNearby',
 'IsSpecialDay',
 'VehicleType',
 'LastUpdatedDate',
 'LastUpdatedTime',
 'Timestamp',
 't',
 'day',
 'TrafficLevel',
 'VehicleWeight',
 'Demand',
 'Model2Price',
 '_join_key']

In [13]:
joined_all = joined.select(
    # Columns from left table (keep original names)
    ID=left.ID,
    SystemCodeNumber=left.SystemCodeNumber,
    Capacity=left.Capacity,
    Latitude=left.Latitude,
    Longitude=left.Longitude,
    Occupancy=left.Occupancy,
    QueueLength=left.QueueLength,
    TrafficConditionNearby=left.TrafficConditionNearby,
    IsSpecialDay=left.IsSpecialDay,
    VehicleType=left.VehicleType,
    LastUpdatedDate=left.LastUpdatedDate,
    LastUpdatedTime=left.LastUpdatedTime,
    Timestamp=left.Timestamp,
    t=left.t,
    day=left.day,
    TrafficLevel=left.TrafficLevel,
    VehicleWeight=left.VehicleWeight,
    Demand=left.Demand,
    Model2Price=left.Model2Price,

    # Columns from right table (add suffix to avoid conflicts)
    ID_other=right.ID,
    SystemCodeNumber_other=right.SystemCodeNumber,
    Capacity_other=right.Capacity,
    Latitude_other=right.Latitude,
    Longitude_other=right.Longitude,
    Occupancy_other=right.Occupancy,
    QueueLength_other=right.QueueLength,
    TrafficConditionNearby_other=right.TrafficConditionNearby,
    IsSpecialDay_other=right.IsSpecialDay,
    VehicleType_other=right.VehicleType,
    LastUpdatedDate_other=right.LastUpdatedDate,
    LastUpdatedTime_other=right.LastUpdatedTime,
    Timestamp_other=right.Timestamp,
    t_other=right.t,
    day_other=right.day,
    TrafficLevel_other=right.TrafficLevel,
    VehicleWeight_other=right.VehicleWeight,
    Demand_other=right.Demand,
    Model2Price_other=right.Model2Price,
)
joined_all.schema

In [14]:
# Add the 'dist' column using haversine formula
joined_all_with_dist = joined_all.with_columns(
    dist = haversine(
        joined_all.Latitude, joined_all.Longitude,
        joined_all.Latitude_other, joined_all.Longitude_other
    )
)

# Filter to find only nearby competitors (within 500m but not self)
nearby = joined_all_with_dist.filter(
    (pw.this.dist > 0) & (pw.this.dist <= 500)
)
print(nearby)

<pathway.Table schema={'ID': <class 'int'>, 'SystemCodeNumber': <class 'str'>, 'Capacity': <class 'int'>, 'Latitude': <class 'float'>, 'Longitude': <class 'float'>, 'Occupancy': <class 'int'>, 'QueueLength': <class 'int'>, 'TrafficConditionNearby': <class 'str'>, 'IsSpecialDay': <class 'int'>, 'VehicleType': <class 'str'>, 'LastUpdatedDate': <class 'str'>, 'LastUpdatedTime': <class 'str'>, 'Timestamp': <class 'str'>, 't': <class 'pathway.internals.datetime_types.DateTimeNaive'>, 'day': <class 'str'>, 'TrafficLevel': <class 'int'>, 'VehicleWeight': <class 'float'>, 'Demand': <class 'float'>, 'Model2Price': <class 'float'>, 'ID_other': <class 'int'>, 'SystemCodeNumber_other': <class 'str'>, 'Capacity_other': <class 'int'>, 'Latitude_other': <class 'float'>, 'Longitude_other': <class 'float'>, 'Occupancy_other': <class 'int'>, 'QueueLength_other': <class 'int'>, 'TrafficConditionNearby_other': <class 'str'>, 'IsSpecialDay_other': <class 'int'>, 'VehicleType_other': <class 'str'>, 'LastUpd

In [15]:
# Aggregate competitor stats: min/max price and average occupancy ratio
competitors = nearby.groupby(nearby.ID).reduce(
    pw.this.ID,
    min_price_other = pw.reducers.min(pw.this.Model2Price_other),
    max_price_other = pw.reducers.max(pw.this.Model2Price_other),
    avg_occ_ratio_other=pw.reducers.avg(pw.this.Occupancy_other / pw.this.Capacity_other)
)


In [17]:
# Join aggregated competitor stats back to original stage3 table
merged = stage3.join(competitors, pw.this.ID == competitors.ID, how=pw.JoinMode.LEFT)


In [18]:
# Define Model 3 logic to set final price and reroute suggestions
def model3(table):
    return table.select(
        *pw.this,
        occ_ratio=pw.this.Occupancy / pw.this.Capacity,
        final_price=pw.apply(
            lambda own_price, occ, cap, min_other, max_other:
                (max(min_other if min_other is not None else MIN_PRICE, MIN_PRICE)
                 if (occ/cap > 0.9 and min_other is not None and min_other < own_price)
                 else min(max_other if max_other is not None else MAX_PRICE, MAX_PRICE)
                 if (max_other is not None and max_other > own_price)
                 else own_price),
            pw.this.Model2Price, pw.this.Occupancy, pw.this.Capacity,
            pw.this.min_price_other, pw.this.max_price_other
        ),
        suggest_reroute=pw.apply(
            lambda occ, cap, own_price, min_other:
                bool(occ/cap > 0.9 and min_other is not None and min_other < own_price),
            pw.this.Occupancy, pw.this.Capacity,
            pw.this.Model2Price, pw.this.min_price_other
        )
    )

In [19]:
# Execute Model 3 and write results to JSONL
model3_results = model3(merged)

In [20]:
pw.io.jsonlines.write(model3_results, "model3_output.jsonl")

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


In [None]:
pw.run()

Output()



In [2]:
# Visualize results over time with Bokeh

import pandas as pd
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource
# Enable inline notebook display for Bokeh
output_notebook()
# Load Model 3 output into a DataFrame, parse time, and sort
df_out = pd.read_json("model3_output.jsonl", lines=True)
df_out['t'] = pd.to_datetime(df_out['t'])
df_out = df_out.sort_values('t')
# Create a Bokeh data source from the DataFrame
source = ColumnDataSource(df_out)
# Initialize a datetime line plot for final price over time
fig = figure(
    title="Model 3 Competitive Pricing Over Time",
    x_axis_type="datetime", width=800, height=400
)
fig.line(x='t', y='final_price', source=source, line_width=2, color='orange', legend_label="Final Price")
fig.circle(x='t', y='final_price', source=source, size=5, color='red')

# Label axes and position legend
fig.xaxis.axis_label = "Time"
fig.yaxis.axis_label = "Price ($)"
fig.legend.location = "top_left"
# Display the interactive plot
show(fig)

