# Introduction


This is the **Competitive Pricing Model** which adds location intelligence and simulates real-world competition:

• Calculate geographic proximity of nearby parking spaces using lat-long.

• Determine competitor prices and factor them into our own pricing.

In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [None]:
# Importing Required Libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import datetime
from datetime import datetime
from math import radians, sin, cos, sqrt, atan2
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('/content/dataset.csv')
df.head()

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                 format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by 'SystemCodeNumber' and the new 'Timestamp' column, and reset the index
df = df.sort_values(by=['SystemCodeNumber', 'Timestamp']).reset_index(drop=True)

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df_selected = df[["SystemCodeNumber", "Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]]
df_selected.to_csv("demand_parking_stream.csv", index=False)

In [None]:
# Precompute nearby competitors (offline in pandas)
lots = df_selected.groupby('SystemCodeNumber')[['Latitude','Longitude']].first().reset_index()

def haversine(lat1, lon1, lat2, lon2):
    R = 6371000
    phi1, phi2 = radians(lat1), radians(lat2)
    dphi = radians(lat2-lat1)
    dlambda = radians(lon2-lon1)
    a = sin(dphi/2)**2 + cos(phi1)*cos(phi2)*sin(dlambda/2)**2
    return R*2*atan2(sqrt(a), sqrt(1 - a))

nearby = {}
for i, lot in lots.iterrows():
    competitors = []
    for j, other in lots.iterrows():
        if lot['SystemCodeNumber'] != other['SystemCodeNumber']:
            dist = haversine(lot['Latitude'], lot['Longitude'], other['Latitude'], other['Longitude'])
            if dist < 500:  # within 500 meters
                competitors.append(other['SystemCodeNumber'])
    nearby[lot['SystemCodeNumber']] = competitors

with open('nearby_competitors.json','w') as f:
    json.dump(nearby, f)

In [None]:
# Define the schema for the streaming data using Pathway
class ParkingSchema(pw.Schema):
    SystemCodeNumber: str
    Capacity: int
    Occupancy: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str
    Timestamp: str
    Latitude: float
    Longitude: float

In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
data_3 = pw.demo.replay_csv("demand_parking_stream.csv", schema=ParkingSchema, input_rate=500)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
data_with_time = data_3.with_columns(
    t = data_3.Timestamp.dt.strptime(fmt),
    day = data_3.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# Step 2: Making a competitive pricing function

In [None]:
# Map categorical features & compute occupancy ratio
@pw.udf(return_type=int)
def map_traffic(level):
    return {'low':1, 'medium':2, 'high':3}.get(level,1)

@pw.udf(return_type=float)
def vehicle_weight(vtype):
    return {'car':1, 'bike':0.5, 'truck':1.5}.get(vtype,1)

data_mapped = data_with_time.with_columns(
    traffic_num = map_traffic(data_with_time.TrafficConditionNearby),
    vehicle_weight = vehicle_weight(data_with_time.VehicleType),
    occupancy_ratio = data_with_time.Occupancy / data_with_time.Capacity
)

# Define some required UDFs
@pw.udf(return_type=float)
def normalize_demand(d):
    return max(0, min(1, (d + 3) / 6))

@pw.udf(return_type=float)
def bound_price(p, base_price):
    return max(0.5 * base_price, min(2 * base_price, p))

# Load competitor groups
with open('nearby_competitors.json') as f:
    nearby_dict = json.load(f)

@pw.udf
def get_competitors(lot_id):
    return nearby_dict.get(lot_id, [])

data_with_group = data_mapped.with_columns(
    competitor_group = get_competitors(data_mapped.SystemCodeNumber)
)

# Demand parameters
alpha, beta, gamma, delta, epsilon, lambd = 0.6, 0.3, 0.2, 0.5, 0.4, 1
base_price = 10

# Daily tumbling window WITH competitor price
daily_window = (
    data_with_group.windowby(
        pw.this.t,
        instance=pw.this.day + "_" + pw.this.SystemCodeNumber,  # separate per lot
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t = pw.this._pw_window_end,
        lot_id = pw.reducers.first(pw.this.SystemCodeNumber),
        avg_occ_ratio = pw.reducers.avg(pw.this.occupancy_ratio),
        avg_queue = pw.reducers.avg(pw.this.QueueLength),
        avg_traffic = pw.reducers.avg(pw.this.traffic_num),
        avg_vehicle_weight = pw.reducers.avg(pw.this.vehicle_weight),
        special_day = pw.reducers.max(pw.this.IsSpecialDay),
    )
    .with_columns(
        demand = (
            alpha * pw.this.avg_occ_ratio
          + beta * pw.this.avg_queue
          - gamma * pw.this.avg_traffic
          + delta * pw.this.special_day
          + epsilon * pw.this.avg_vehicle_weight
        )
    )
    .with_columns(
        norm_demand = normalize_demand(pw.this.demand)
    )
    .with_columns(
        raw_price = base_price * (1 + lambd * pw.this.norm_demand),
    )
)

# Compute avg competitor price via self-join
competitor_prices = daily_window.join(
    daily_window, pw.left.lot_id != pw.right.lot_id
).filter(
    pw.right.lot_id.isin(get_competitors(pw.left.lot_id))
).reduce(
    left_lot = pw.reducers.first(pw.left.lot_id),
    t = pw.reducers.first(pw.left.t),
    own_price = pw.reducers.first(pw.left.raw_price),
    avg_competitor_price = pw.reducers.avg(pw.right.raw_price)
).with_columns(
    competitive_price = pw.this.own_price - (pw.this.avg_competitor_price - base_price)*0.2
)

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define Bokeh plotting function
def price_plotter(source):
    fig = bokeh.plotting.figure(
        title="Competitive Daily Parking Price",
        x_axis_type="datetime", height=400, width=800
    )
    fig.line("t", "competitive_price", source=source, color="navy", line_width=2)
    fig.circle("t", "competitive_price", source=source, size=6, color="red")
    return fig

viz = competitor_prices.plot(price_plotter, sorting_col="t")
pn.Column(viz).servable()

In [None]:
%%capture --no-display
pw.run()

# *The End*