In [None]:
# -----------------------------------------------------
# Install required libraries
# -----------------------------------------------------

!pip install pathway bokeh --quiet


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m46.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

UsageError: Line magic function `%%capture` not found.


In [None]:
# -----------------------------------------------------
# Import libraries
# -----------------------------------------------------

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import math

import pathway as pw
import bokeh.plotting
import panel as pn


In [None]:
# -----------------------------------------------------
# Read CSV and prepare data
# -----------------------------------------------------

# Load the raw CSV
df_parking = pd.read_csv('dataset.csv')

# Combine date and time into a timestamp
df_parking['Timestamp'] = pd.to_datetime(
    df_parking['LastUpdatedDate'] + ' ' + df_parking['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)

# Optional: filter for specific lots for testing speed
# df_parking = df_parking[df_parking["SystemCodeNumber"] == "1"]

# Save a simplified CSV for streaming
df_parking[[
    "Timestamp",
    "SystemCodeNumber",
    "Latitude",
    "Longitude",
    "Occupancy",
    "Capacity",
    "QueueLength",
    "TrafficConditionNearby",
    "IsSpecialDay",
    "VehicleType"
]].to_csv("parking_stream.csv", index=False)

In [None]:
# -----------------------------------------------------
# Define Pathway schema
# -----------------------------------------------------

class ParkingLotSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Latitude: float
    Longitude: float
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: str
    IsSpecialDay: int
    VehicleType: str

In [None]:
# -----------------------------------------------------
# Load streaming data
# -----------------------------------------------------

data_stream = pw.demo.replay_csv(
    "parking_stream.csv",
    schema=ParkingLotSchema,
    input_rate=1000
)

# Format to parse timestamps
datetime_format = "%Y-%m-%d %H:%M:%S"

# Create datetime column in Pathway table
data_stream = data_stream.with_columns(
    timestamp_dt = data_stream.Timestamp.dt.strptime(datetime_format)
)

In [None]:
# -----------------------------------------------------
# Define pricing coefficients
# -----------------------------------------------------

# Base price for parking
base_price_value = 10.0

# Weights for various factors
weight_occupancy = 1.0
weight_queue = 0.1
weight_traffic = 0.5
weight_special_day = 0.3
weight_vehicle_type = 1.0

# Demand normalization factor
demand_lambda = 0.5

# Price boundaries
min_allowed_price = 5.0
max_allowed_price = 20.0

In [None]:
# -----------------------------------------------------
# Define helper UDFs
# -----------------------------------------------------

@pw.udf
def vehicle_weight(vehicle_type) -> float:
    """Assign weight based on vehicle type."""
    return {
        'car': 0.3,
        'bike': 0.1,
        'truck': 0.5
    }.get(str(vehicle_type).lower(), 0.2)

@pw.udf
def traffic_weight(traffic_level) -> float:
    """Assign weight based on traffic conditions."""
    return {
        'low': 0.1,
        'average': 0.5,
        'high': 1.0
    }.get(str(traffic_level).lower(), 0.5)

@pw.udf
def multiply_values(val, multiplier) -> float:
    """Multiply two values, handling None safely."""
    if val is None:
        return 0.0
    return float(val * multiplier)

@pw.udf
def normalize_demand(demand_value) -> float:
    """Keep demand between 0 and 1."""
    return min(1.0, max(0.0, demand_value / 5.0))

@pw.udf
def calculate_price(demand_norm) -> float:
    """Compute price based on normalized demand."""
    new_price = base_price_value * (1 + demand_lambda * demand_norm)
    return min(max_allowed_price, max(min_allowed_price, new_price))

In [None]:
# -----------------------------------------------------
# Calculate demand components
# -----------------------------------------------------

# Compute intermediate terms
data_stream = data_stream.with_columns(
    occupancy_ratio = pw.this.Occupancy / pw.this.Capacity,
    vehicle_score = vehicle_weight(pw.this.VehicleType),
    traffic_score = traffic_weight(pw.this.TrafficConditionNearby)
)

data_stream = data_stream.with_columns(
    term_occupancy = multiply_values(data_stream.occupancy_ratio, weight_occupancy),
    term_queue = multiply_values(pw.this.QueueLength, weight_queue),
    term_traffic = multiply_values(data_stream.traffic_score, weight_traffic),
    term_special = multiply_values(pw.this.IsSpecialDay, weight_special_day),
    term_vehicle = multiply_values(data_stream.vehicle_score, weight_vehicle_type)
)
# Combine all terms to raw demand
data_stream = data_stream.with_columns(
    raw_demand = (
        data_stream.term_occupancy
        + data_stream.term_queue
        - data_stream.term_traffic
        + data_stream.term_special
        + data_stream.term_vehicle
    )
)

# Normalize demand
data_stream = data_stream.with_columns(
    normalized_demand = normalize_demand(pw.this.raw_demand)
)

# Calculate price
data_stream = data_stream.with_columns(
    calculated_price = calculate_price(pw.this.normalized_demand)
)


In [None]:
# -----------------------------------------------------
# Define haversine function for distance calculation
# -----------------------------------------------------

@pw.udf
def haversine_distance(lat1, lon1, lat2, lon2) -> float:
    """Compute distance in meters between two lat/lon points."""
    R = 6371e3  # radius of Earth in meters
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    delta_phi = math.radians(lat2 - lat1)
    delta_lambda = math.radians(lon2 - lon1)
    a = math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

In [None]:
# -----------------------------------------------------
# Create left and right tables for self-join
# -----------------------------------------------------

left_table = data_stream.select(
    LotID_left = pw.this.SystemCodeNumber,
    price_left = pw.this.calculated_price,
    lat_left = pw.this.Latitude,
    lon_left = pw.this.Longitude
)

right_table = data_stream.select(
    LotID_right = pw.this.SystemCodeNumber,
    price_right = pw.this.calculated_price,
    lat_right = pw.this.Latitude,
    lon_right = pw.this.Longitude
)


In [None]:
# -----------------------------------------------------
# Compute distance and price difference
# -----------------------------------------------------

pairs_table = left_table.join(right_table).select(
    **left_table,
    **right_table,
    distance_meters = haversine_distance(
        pw.this.lat_left,
        pw.this.lon_left,
        pw.this.lat_right,
        pw.this.lon_right
    ),
    price_difference = pw.this.price_left - pw.this.price_right
)

In [None]:
# -----------------------------------------------------
# Find cheaper competitors
# -----------------------------------------------------

cheaper_lots = pairs_table.filter(
    pw.this.price_right < pw.this.price_left
)


In [None]:
# -----------------------------------------------------
# Compute average price difference per lot
# -----------------------------------------------------

avg_price_diff_table = cheaper_lots.groupby(pw.this.LotID_left).reduce(
    LotID_left = pw.this.LotID_left,
    sum_price_diff = pw.reducers.sum(pw.this.price_difference),
    count_competitors = pw.reducers.count()
)

avg_price_diff_table = avg_price_diff_table.select(
    **avg_price_diff_table,
    avg_price_diff = pw.this.sum_price_diff / pw.cast(float, pw.this.count_competitors)
)

In [None]:
# -----------------------------------------------------
# Adjust prices based on competitors
# -----------------------------------------------------

price_adjustment_factor = 0.5

@pw.udf
def adjust_final_price(orig_price, avg_diff) -> float:
    """Reduce price if competitors are cheaper."""
    if avg_diff is None:
        return orig_price
    new_price = orig_price - price_adjustment_factor * avg_diff
    return max(min_allowed_price, min(max_allowed_price, new_price))

# Join data with avg price diff
final_table = data_stream.join_left(
    avg_price_diff_table,
    pw.this.SystemCodeNumber == pw.right.LotID_left
)

final_table = final_table.select(
    **final_table,
    final_adjusted_price = adjust_final_price(
        pw.this.calculated_price,
        pw.this.avg_price_diff
    )
)

In [None]:
# -----------------------------------------------------
# Plot using Bokeh
# -----------------------------------------------------

pn.extension()

def plot_final_prices(data_source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Dynamic Pricing Model 3 - Adjusted Prices",
        x_axis_type="datetime"
    )
    fig.line("timestamp_dt", "final_adjusted_price", source=data_source, line_width=2, color="navy")
    fig.circle("timestamp_dt", "final_adjusted_price", source=data_source, size=5, color="red")
    return fig

viz_panel = final_table.plot(plot_final_prices, sorting_col="timestamp_dt")
pn.Column(viz_panel).servable()



In [None]:
# -----------------------------------------------------
# Run Pathway pipeline
# -----------------------------------------------------

%%capture --no-display
pw.run()


Output()



KeyboardInterrupt: 