<a href="https://colab.research.google.com/github/bhavyaJ-05/summer-analytics-assignments-quiz/blob/main/CapstoneProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installing pathway and bokeh


In [None]:
!pip install pathway bokeh --quiet

## Importing all the relevant libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

## Step 1: Importing and Preprocessing the Data

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
df = pd.read_csv('dataset.csv')
df

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
df_modified = df.drop(['LastUpdatedDate', 'LastUpdatedTime','ID','Latitude','Longitude'], axis=1).copy()

In [None]:
df_modified.head()

## making assumptions for the model based on trial/error.
mapping cycle as 0.1 ,bike as 0.3,car as 0.7 and truck as 1.0
mapping traffic low as 0.2 , average as 0.7 and high as 1.0
making a new feature names occRate that is the occupancy/capacity
lastly dividing quelength by 10 for basic scaling

In [None]:
mapping = {'car': 0.7, 'bike': 0.3, 'truck': 1,'cycle': 0.1}
mapp = {'low': 0.2 , 'average': 0.7 , 'high': 1}
df_modified['VehicleType'] = df_modified['VehicleType'].str.strip().str.lower()
df_modified['TrafficConditionNearby'] = df_modified['TrafficConditionNearby'].str.strip().str.lower()
# Overwrite the original column with mapped integers
df_modified['VehicleType'] = df_modified['VehicleType'].map(mapping)
df_modified['TrafficConditionNearby'] = df_modified['TrafficConditionNearby'].map(mapp)
df_modified['occRate'] = df_modified['Occupancy']/df_modified['Capacity']
df_modified['QueueLength'] = df_modified['QueueLength']/10

In [None]:
df_modified.head()

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df_modified.to_csv("parking_stream.csv", index=False)

In [None]:
df_modified.dtypes

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    SystemCodeNumber: str
    VehicleType: float
    TrafficConditionNearby: float
    QueueLength: 	float
    IsSpecialDay: int
    Timestamp: str
    Occupancy: int
    Capacity: int
    occRate: float

In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [None]:
# # Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# # Add new columns to the data stream:
# # - 't' contains the parsed full datetime
# # - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
# # - 'lot_id extracts the tags of all the parking lots(14 here)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),
    lot_id = pw.this.SystemCodeNumber
)

## Step 2: Making a simple pricing function and the more complex one .

In [None]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=(pw.this.lot_id, pw.this.day),  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(

        t  = pw.this._pw_window_end,
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity),
        lot_id          = pw.reducers.max(pw.this.lot_id),
        occ_rate_sum    = pw.reducers.sum(pw.this.occRate),
        queue_sum       = pw.reducers.sum(pw.this.QueueLength),
        traffic_sum     = pw.reducers.sum(pw.this.TrafficConditionNearby),
        special_sum     = pw.reducers.sum(pw.this.IsSpecialDay),
        veh_type_sum    = pw.reducers.sum(pw.this.VehicleType),
        count           = pw.reducers.count()
    )
    .with_columns(

        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price

        occ_rate_avg = pw.this.occ_rate_sum / pw.this.count,
        queue_avg    = pw.this.queue_sum / pw.this.count,
        traffic_avg  = pw.this.traffic_sum / pw.this.count,
        special_avg  = pw.this.special_sum / pw.this.count,
        veh_type_avg = pw.this.veh_type_sum / pw.this.count,

    )

    .with_columns(
        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap,

        demand_raw = (
            pw.this.occ_rate_avg
            + pw.this.queue_avg
            - pw.this.traffic_avg
            + pw.this.special_avg
            + pw.this.veh_type_avg
        )
    )

    .with_columns(
        price_demand = 10*(1 + 0.2 * pw.this.demand_raw)
    )
)

In [None]:
print(delta_window.keys())

## Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

Note: The Bokeh plot in the next cell will only be generated after you run the pw.run() cell (i.e., the final cell).

In [None]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source,lot_name):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title=f"Lot {lot_name} – Daily price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy", legend_label="Linear-Based Price")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    fig.line("t", "price_demand", source=source, line_width=2, color="green", legend_label="Demand-Based Price")
    fig.circle("t", "price_demand", source=source, size=5, color="green")

    return fig

In [None]:
df['SystemCodeNumber'].unique()

In [None]:
import panel as pn

# Initialize Panel extension (only needed once)
pn.extension()

lot_ids = ['BHMBCCMKT01', 'BHMNCPHST01', 'BHMMBMMBX01', 'BHMNCPNST01',
           'Shopping', 'BHMEURBRD01', 'Broad Street', 'Others-CCCPS8',
           'Others-CCCPS105a', 'Others-CCCPS119a', 'BHMBCCTHL01',
           'Others-CCCPS135a', 'Others-CCCPS202', 'Others-CCCPS98']

tabs = []

for lid in lot_ids:
    lot_stream = delta_window.filter(pw.this.lot_id == lid)
    viz = lot_stream.plot(
        lambda src, lid=lid: price_plotter(src, lid),
        sorting_col="t"
    )
    # Each tab is a tuple of (label, panel object)
    tabs.append((lid, viz))

# Display as a tabbed layout
pn.Tabs(*tabs).servable(title="Daily price per parking space")

In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()