<a href="https://colab.research.google.com/github/Uddipta-sarma/cna_project_by_ubs/blob/main/cna_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
# ---------------------------
# Model 1: Baseline Linear Pricing
# ---------------------------

def model1_linear_price(prev_price, occupancy, capacity, alpha=5.0):
    """Simple linear pricing model based on occupancy."""
    return prev_price + alpha * (occupancy / capacity)


In [2]:
# ---------------------------
# Model 2: Demand-Based Pricing
# ---------------------------

def model2_demand_price(base_price, occupancy, capacity, queue, traffic, special_day, vehicle_type):
    """Demand-based dynamic pricing model."""
    vehicle_weights = {'car': 1.0, 'bike': 0.5, 'truck': 1.5}
    traffic_weights = {'low': 0.5, 'medium': 1.0, 'high': 1.5}

    demand = (
        2 * (occupancy / capacity) +
        1.5 * queue -
        1.2 * traffic_weights.get(traffic, 1.0) +
        2.0 * special_day +
        1.0 * vehicle_weights.get(vehicle_type, 1.0)
    )

    # Normalize demand and bound the price between $5 and $20
    norm_demand = (demand - 2) / 10
    price = base_price * (1 + 0.5 * norm_demand)
    return max(5, min(20, price))


In [3]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m37.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m69.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [5]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [6]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [7]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [8]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [9]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [10]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


In [11]:
print(data_with_time.schema)


id          | Timestamp | Occupancy | Capacity | t               | day
ANY_POINTER | STR       | INT       | INT      | DATE_TIME_NAIVE | STR


# Step 2: Making a simple pricing function

In [12]:
import pathway as pw
import datetime

# Step 1: Create extended data with dummy columns
extended_data = data_with_time.with_columns(
    # Dummy vehicle_type based on capacity % 3
    vehicle_type = pw.if_else(
        pw.this.Capacity % 3 == 0, "car",
        pw.if_else(pw.this.Capacity % 3 == 1, "bike", "truck")
    ),

    queue = pw.this.Capacity % 3,
    traffic = pw.this.Capacity % 5,
    is_special_day = pw.this.day.str.endswith("01"),

    vehicle_type_weight = pw.if_else(
        pw.this.Capacity % 3 == 0, 1.0,
        pw.if_else(pw.this.Capacity % 3 == 1, 0.5, 1.5)
    )
)

# Step 2: Apply window + models on extended data
delta_window = (
    extended_data.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        occ_sum=pw.reducers.sum(pw.this.Occupancy),
        count=pw.reducers.count(),
        cap=pw.reducers.max(pw.this.Capacity),
        queue_sum=pw.reducers.sum(pw.this.queue),
        traffic_sum=pw.reducers.sum(pw.this.traffic),
        special_day=pw.reducers.max(pw.this.is_special_day),
        vehicle_type_weight_sum=pw.reducers.sum(pw.this.vehicle_type_weight),
    )
    .with_columns(
        model1_price=10 + 5.0 * ((pw.this.occ_sum / pw.this.count) / pw.this.cap),

        model2_price = 10 * (
    1 + 0.5 * (
        0.4 * ((pw.this.occ_sum / pw.this.count) / pw.this.cap)
        + 0.3 * (pw.this.queue_sum / pw.this.count)
        - 0.2 * (pw.this.traffic_sum / pw.this.count)
        + 0.2 * pw.cast(float, pw.this.special_day)
        + 0.1 * (pw.this.vehicle_type_weight_sum / pw.this.count)
    )
)


    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [18]:
import panel as pn
import bokeh.plotting

pn.extension()

# Define the plotter to visualize model1 and model2 prices
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price (Model 1 & 2)",
        x_axis_type="datetime",
    )

    # Plot both models
    fig.line("t", "model1_price", source=source, line_width=2, color="green", legend_label="Model 1")
    fig.line("t", "model2_price", source=source, line_width=2, color="navy", legend_label="Model 2")

    # Use scatter instead of circle
    fig.scatter(x="t", y="model1_price", source=source, size=6, color="yellow", marker="circle")
    fig.scatter(x="t", y="model2_price", source=source, size=6, color="red", marker="circle")

    fig.legend.location = "top_left"

    return fig

# Connect Pathway stream to the visual function
viz = delta_window.plot(price_plotter, sorting_col="t")

# Display in notebook or as app
pn.Column(viz).servable()


In [19]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()


Output()

