Install Required Packages

In [1]:
!pip install pathway
!pip install pandas


Collecting pathway
  Downloading pathway-0.24.0-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/60.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Collecting h3>=4 (from pathway)
  Downloading h3-4.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting python-sat>=0.1.8.dev0 (from pathway)
  Downloading python_sat-1.8.dev17-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl.metadata (1.5 kB)
Collecting beartype<0.16.0,>=0.14.0 (from pathway)
  Downloading beartype-0.15.0-py3-none-any.whl.metadata (28 kB)
Collecting diskcache>=5.2.1 (from pathway)
  Downloading diskcache-5.6.3-py3-none-any.whl.metadata (20 kB)
Collecting boto3<1.36.0,>=1.26.76 (from pathway)
  Downloading boto3-1.35.99-py3-none-any.whl.metadata (6.7



Upload and Preprocess Base Parking Data

In [1]:
import pandas as pd

# Load parking lot data
parking_df = pd.read_csv("/content/dataset.csv")

# Combine date and time into a timestamp
parking_df['TimestampStr'] = parking_df['LastUpdatedDate'] + ' ' + parking_df['LastUpdatedTime']

# Save updated dataset
parking_df.to_csv("dataset_with_timestamp.csv", index=False)


Create Synthetic External Events Data

In [2]:
# Create dummy external events data
external_data = pd.DataFrame({
    'Timestamp': parking_df['TimestampStr'].unique()[:10],
    'EventType': ['concert', 'sports', 'none', 'none', 'festival', 'none', 'none', 'concert', 'none', 'sports'],
    'Weather': ['sunny', 'rainy', 'sunny', 'cloudy', 'sunny', 'rainy', 'cloudy', 'sunny', 'sunny', 'cloudy']
})

external_data.to_csv("external_events_weather.csv", index=False)


Define Schemas in Pathway

In [3]:
import pathway as pw

class ParkingSchema(pw.Schema):
    ID: str
    SystemCodeNumber: str
    Capacity: int
    Latitude: float
    Longitude: float
    Occupancy: int
    VehicleType: str
    TrafficConditionNearby: str
    QueueLength: int
    IsSpecialDay: int
    LastUpdatedDate: str
    LastUpdatedTime: str
    TimestampStr: str

class ExternalSchema(pw.Schema):
    Timestamp: str
    EventType: str
    Weather: str


Load Streaming Tables

In [4]:
# Streaming parking data
parking = pw.io.csv.read(
    "dataset_with_timestamp.csv",
    schema=ParkingSchema,
    mode="streaming",
    autocommit_duration_ms=1000
)

# Streaming external events/weather
external = pw.io.csv.read(
    "external_events_weather.csv",
    schema=ExternalSchema,
    mode="streaming",
    autocommit_duration_ms=1000
)


Define UDFs for Demand and Pricing

In [5]:
@pw.udf
def compute_adjusted_demand(occupancy, capacity, queue, special_day, event, weather):
    if capacity == 0:
        return 0.0

    # Base demand from occupancy and queue
    demand = (occupancy / capacity) + 0.5 * queue + 1.2 * special_day

    # Event impact
    event_impact = {'concert': 2.0, 'sports': 1.5, 'festival': 1.8, 'none': 0.0}
    demand += event_impact.get(event, 0.0)

    # Weather impact
    weather_impact = {'rainy': -1.0, 'cloudy': -0.5, 'sunny': 0.5}
    demand += weather_impact.get(weather, 0.0)

    return max(demand, 0.0)

@pw.udf
def price_from_demand(demand):
    base_price = 10
    lambda_ = 1.5
    return min(max(base_price * (1 + lambda_ * demand), base_price * 0.5), base_price * 2.5)


Join Parking and External Tables

In [6]:
# Join on timestamp
joined = parking.join(external, parking.TimestampStr == external.Timestamp)

# Compute demand and price
result = joined.select(
    ID=joined.ID,
    Timestamp=joined.TimestampStr,
    Demand=compute_adjusted_demand(
        joined.Occupancy,
        joined.Capacity,
        joined.QueueLength,
        joined.IsSpecialDay,
        joined.EventType,
        joined.Weather
    )
)

# Compute final price
result += result.select(Price=price_from_demand(result.Demand))


Export the Result as Streaming Output

In [None]:
# Write to JSON Lines file
pw.io.jsonlines.write(result, "model3_output.jsonl")

# Execute Pathway pipeline
pw.run()


Output()

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(
