<a href="https://colab.research.google.com/github/Epic-SK/dpuplusippy/blob/main/Model%202%20Final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [None]:
import math
from tabulate import tabulate
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
from bokeh.layouts import gridplot
from bokeh.plotting import figure, output_file, show
from bokeh.io import curdoc
import panel as pn

In [None]:
def haversine(lat1, lon1, lat2, lon2):

    # distance between latitudes
    # and longitudes
    dLat = (lat2 - lat1) * math.pi / 180.0
    dLon = (lon2 - lon1) * math.pi / 180.0

    # convert to radians
    lat1 = (lat1) * math.pi / 180.0
    lat2 = (lat2) * math.pi / 180.0

    # apply formulae
    a = (pow(math.sin(dLat / 2), 2) +
         pow(math.sin(dLon / 2), 2) *
             math.cos(lat1) * math.cos(lat2));
    rad = 6371
    c = 2 * math.asin(math.sqrt(a))
    return rad * c

# Step 1: Importing and Preprocessing the Data

In [None]:
pd.set_option('future.no_silent_downcasting', True)

df = pd.read_csv('https://raw.githubusercontent.com/Epic-SK/dpuplusippy/refs/heads/main/Modified%20-%20modified.csv')
maindf = pd.read_csv('https://raw.githubusercontent.com/Epic-SK/dpuplusippy/refs/heads/main/dataset.csv')
df.head()
# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

In [None]:
#df = maindf
maindf.head()

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column


df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')
df['Timestamp'] = df['Timestamp'].dt.round(freq='30min')
# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)
df['VehicleType'] = df['VehicleType'].replace({'cycle': 0, 'bike': 1, 'car': 2, 'truck':3}, regex=True)
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].replace({'low': 0, 'average': 1, 'high': 2}, regex=True)

Longitudes = maindf['Longitude'].unique()
Latitudes = maindf['Latitude'].unique()
print(Latitudes, Longitudes)
p = figure(width=400, height=400)

# add a circle renderer with a size, color, and alpha
p.scatter(Latitudes, Longitudes, size=20, color="navy", alpha=0.5)

output_file("dark_minimal.html")
curdoc().theme = 'dark_minimal'

# show the results
show(p)

distances = np.zeros((14,14), dtype = 'float')
for i in range(14):
  for j in range(14):
    distances[i,j] = np.round(haversine(Latitudes[i], Longitudes[i], Latitudes[j], Longitudes[j]),2)
table = tabulate(distances, tablefmt="fancy_grid")
print(table)



In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["SystemCodeNumber", "Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]].to_csv("parking_stream.csv", index=False)
# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    SystemCodeNumber: str
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    QueueLength : int
    TrafficConditionNearby : int
    IsSpecialDay :int
    VehicleType : int


In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
        que=pw.reducers.avg(pw.this.QueueLength),
        trf=pw.reducers.avg(pw.this.TrafficConditionNearby),
        spl=pw.reducers.max(pw.this.IsSpecialDay),
        veh=pw.reducers.avg(pw.this.VehicleType),
        scn=pw.reducers.any(pw.this.SystemCodeNumber)

    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + 1.5 * (3 * (pw.this.occ_max - pw.this.occ_min) / pw.this.cap - 0.5 * pw.this.que + 2 * pw.this.trf + pw.this.spl + (pw.this.veh - 1.5)).num.round(1)
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source, lot):
    # Create a Bokeh figure with datetime x-axis
    output_file("dark_minimal.html")
    curdoc().theme = 'dark_minimal'

    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.scatter("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
unique_lots = df["SystemCodeNumber"].dropna().unique()
panels = []

filtered = delta_window.filter(pw.this.scn == unique_lots[0])
viz = filtered.plot(lambda src: price_plotter(src, unique_lots[0]), sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()


In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()
