## **Summer Analytics 2025**

Import Libraries and Setup

In [None]:
!pip install pathway
import numpy as np
import pandas as pd
import pathway as pw
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource, HoverTool
from bokeh.layouts import gridplot
from bokeh.io import push_notebook, output_notebook
from bokeh.palettes import Category20
import time
import math
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

from google.colab import files
uploaded = files.upload()

# Enable notebook output for Bokeh
output_notebook()



Saving dataset.csv to dataset (5).csv


Define the Main ParkingPricingSystem Class

In [None]:
class ParkingPricingSystem:
    """
    Complete Dynamic Pricing System for Urban Parking Lots
    Implements 3 models: Baseline Linear, Demand-Based, and Competitive Pricing
    """

    def __init__(self, base_price=10.0):
        self.base_price = base_price
        self.parking_lots = {}
        self.price_history = {}
        self.demand_history = {}
        self.competition_radius = 0.01  # ~1km in lat/lng degrees

        # Model parameters
        self.linear_alpha = 0.5  # Linear model coefficient

        # Demand model parameters
        self.demand_params = {
            'occupancy_weight': 2.0,
            'queue_weight': 1.5,
            'traffic_weight': -0.3,
            'special_day_weight': 0.5,
            'vehicle_type_weights': {'car': 1.0, 'bike': 0.5, 'truck': 1.5}
        }

        # Competitive model parameters
        self.competitive_params = {
            'proximity_weight': 0.3,
            'price_sensitivity': 0.2
        }

        # Price bounds
        self.min_price_multiplier = 0.5
        self.max_price_multiplier = 2.0

    def calculate_distance(self, lat1, lon1, lat2, lon2):
        """Calculate Euclidean distance between two lat/lng points"""
        return math.sqrt((lat1 - lat2)**2 + (lon1 - lon2)**2)

    def get_nearby_lots(self, current_lat, current_lon, all_lots):
        """Find nearby parking lots within competition radius"""
        nearby = []
        for lot_id, lot_data in all_lots.items():
            if lot_data['lat'] != current_lat or lot_data['lon'] != current_lon:
                distance = self.calculate_distance(current_lat, current_lon,
                                                 lot_data['lat'], lot_data['lon'])
                if distance <= self.competition_radius:
                    nearby.append((lot_id, distance, lot_data))
        return nearby

Model 1 - Linear Pricing Implementation

In [None]:
def model_1_linear_pricing(self, lot_id, current_data):
        """
        Model 1: Baseline Linear Model
        Price_t+1 = Price_t + α * (Occupancy/Capacity)
        """
        if lot_id not in self.price_history:
            self.price_history[lot_id] = [self.base_price]

        current_price = self.price_history[lot_id][-1]
        occupancy_rate = current_data['occupancy'] / max(current_data['capacity'], 1)

        # Linear price adjustment
        price_adjustment = self.linear_alpha * occupancy_rate
        new_price = current_price + price_adjustment

        # Apply bounds
        new_price = max(self.base_price * self.min_price_multiplier,
                       min(new_price, self.base_price * self.max_price_multiplier))

        self.price_history[lot_id].append(new_price)
        return new_price

# Add this method to the ParkingPricingSystem class
ParkingPricingSystem.model_1_linear_pricing = model_1_linear_pricing

Model 2 - Demand-Based Pricing Implementation

In [None]:
def model_2_demand_based_pricing(self, lot_id, current_data):
        """
        Model 2: Demand-Based Price Function
        Constructs mathematical demand function using key features
        """
        if lot_id not in self.price_history:
            self.price_history[lot_id] = [self.base_price]

        # Calculate demand components
        occupancy_rate = current_data['occupancy'] / max(current_data['capacity'], 1)
        queue_normalized = current_data['queue_length'] / 10.0  # Normalize queue
        traffic_normalized = current_data['traffic_level'] / 10.0  # Normalize traffic

        # Vehicle type weight
        vehicle_type = current_data.get('vehicle_type', 'car')
        vehicle_weight = self.demand_params['vehicle_type_weights'].get(vehicle_type, 1.0)

        # Special day indicator
        special_day = current_data.get('special_day', 0)

        # Calculate demand function
        demand = (self.demand_params['occupancy_weight'] * occupancy_rate +
                 self.demand_params['queue_weight'] * queue_normalized +
                 self.demand_params['traffic_weight'] * traffic_normalized +
                 self.demand_params['special_day_weight'] * special_day +
                 0.1 * vehicle_weight)

        # Normalize demand (using sigmoid-like function)
        normalized_demand = np.tanh(demand)

        # Store demand history
        if lot_id not in self.demand_history:
            self.demand_history[lot_id] = []
        self.demand_history[lot_id].append(normalized_demand)

        # Calculate price based on demand
        price_multiplier = 1 + 0.5 * normalized_demand
        new_price = self.base_price * price_multiplier

        # Apply bounds
        new_price = max(self.base_price * self.min_price_multiplier,
                       min(new_price, self.base_price * self.max_price_multiplier))

        self.price_history[lot_id].append(new_price)
        return new_price

# Add this method to the ParkingPricingSystem class
ParkingPricingSystem.model_2_demand_based_pricing = model_2_demand_based_pricing

Model 3 - Competitive Pricing Implementation

In [None]:
def model_3_competitive_pricing(self, lot_id, current_data, all_lots_data):
        """
        Model 3: Competitive Pricing Model
        Considers geographic proximity and competitor prices
        """
        if lot_id not in self.price_history:
            self.price_history[lot_id] = [self.base_price]

        # Start with demand-based price
        base_price = self.model_2_demand_based_pricing(lot_id, current_data)

        # Get nearby competitors
        current_lat = current_data['latitude']
        current_lon = current_data['longitude']
        nearby_lots = self.get_nearby_lots(current_lat, current_lon, all_lots_data)

        if not nearby_lots:
            return base_price

        # Calculate competitive adjustment
        competitor_prices = []
        competitor_occupancies = []

        for nearby_id, distance, nearby_data in nearby_lots:
            if nearby_id in self.price_history and len(self.price_history[nearby_id]) > 0:
                competitor_prices.append(self.price_history[nearby_id][-1])
                competitor_occupancies.append(nearby_data['occupancy'] / max(nearby_data['capacity'], 1))

        if competitor_prices:
            avg_competitor_price = np.mean(competitor_prices)
            avg_competitor_occupancy = np.mean(competitor_occupancies)

            # Current lot status
            current_occupancy_rate = current_data['occupancy'] / max(current_data['capacity'], 1)

            # Competitive logic
            if current_occupancy_rate > 0.9:  # Nearly full
                if avg_competitor_price < base_price:
                    # Reduce price or suggest rerouting
                    competitive_adjustment = -0.1 * (base_price - avg_competitor_price)
                else:
                    # Can maintain higher price
                    competitive_adjustment = 0.05 * base_price
            else:
                # Adjust based on competitor pricing
                if avg_competitor_price > base_price:
                    # Can increase price while staying competitive
                    competitive_adjustment = 0.1 * (avg_competitor_price - base_price)
                else:
                    # Stay competitive
                    competitive_adjustment = -0.05 * (base_price - avg_competitor_price)

            new_price = base_price + competitive_adjustment
        else:
            new_price = base_price

        # Apply bounds
        new_price = max(self.base_price * self.min_price_multiplier,
                       min(new_price, self.base_price * self.max_price_multiplier))

        self.price_history[lot_id][-1] = new_price  # Update the last price
        return new_price

# Add this method to the ParkingPricingSystem class
ParkingPricingSystem.model_3_competitive_pricing = model_3_competitive_pricing

Rerouting Logic Implementation

In [None]:
def should_reroute(self, lot_id, current_data, all_lots_data):
        """
        Determine if vehicles should be rerouted to nearby lots
        """
        current_occupancy_rate = current_data['occupancy'] / max(current_data['capacity'], 1)

        if current_occupancy_rate < 0.8:  # Not overburdened
            return False, None

        # Check nearby lots
        current_lat = current_data['latitude']
        current_lon = current_data['longitude']
        nearby_lots = self.get_nearby_lots(current_lat, current_lon, all_lots_data)

        best_alternative = None
        best_score = float('inf')

        for nearby_id, distance, nearby_data in nearby_lots:
            nearby_occupancy_rate = nearby_data['occupancy'] / max(nearby_data['capacity'], 1)
            nearby_price = self.price_history.get(nearby_id, [self.base_price])[-1]

            if nearby_occupancy_rate < 0.7:  # Has availability
                # Score based on price and distance
                score = nearby_price + distance * 100  # Weight distance heavily
                if score < best_score:
                    best_score = score
                    best_alternative = nearby_id

        if best_alternative:
            return True, best_alternative
        return False, None

# Add this method to the ParkingPricingSystem class
ParkingPricingSystem.should_reroute = should_reroute

Data Loading and Preprocessing Functions

In [None]:
def load_and_preprocess_data(file_path):
    """Load and preprocess the parking data"""
    try:
        df = pd.read_csv(file_path)

        # Rename columns to match expected names
        df = df.rename(columns={
            'ID': 'lot_id',
            'Capacity': 'capacity',
            'Latitude': 'latitude',
            'Longitude': 'longitude',
            'Occupancy': 'occupancy',
            'QueueLength': 'queue_length',
            'TrafficConditionNearby': 'traffic_level',
            'IsSpecialDay': 'special_day',
            'VehicleType': 'vehicle_type'
        })

        # Combine date and time into a single timestamp
        df['timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'])



        # Drop original date/time columns
        df.drop(columns=['LastUpdatedDate', 'LastUpdatedTime'], inplace=True)

        # Handle missing values
        df = df.fillna(method='ffill').fillna(method='bfill')

        # Ensure required columns exist
        required_cols = ['latitude', 'longitude', 'capacity', 'occupancy', 'queue_length', 'traffic_level']
        for col in required_cols:
            if col not in df.columns:
                print(f"Warning: Column '{col}' not found. Creating with default values.")
                if col in ['latitude', 'longitude']:
                    df[col] = np.random.uniform(-0.1, 0.1, len(df))
                else:
                    df[col] = np.random.randint(0, 10, len(df))

        # Add vehicle_type if missing
        if 'vehicle_type' not in df.columns:
            df['vehicle_type'] = np.random.choice(['car', 'bike', 'truck'], len(df))

        # Add special_day if missing
        if 'special_day' not in df.columns:
            df['special_day'] = np.random.choice([0, 1], len(df), p=[0.9, 0.1])

        return df

    except Exception as e:
        print(f"Error loading data: {e}")
        return create_sample_data()


Visualization

In [None]:
def create_real_time_visualization(pricing_system, df):
    lot_ids = df['lot_id'].unique()
    colors = Category20[max(3, len(lot_ids))]

    price_fig = figure(title="Real-time Parking Prices", x_axis_label="Time Step", y_axis_label="Price ($)", width=800, height=400)
    occupancy_fig = figure(title="Occupancy Rates", x_axis_label="Time Step", y_axis_label="Occupancy Rate", width=800, height=400)
    demand_fig = figure(title="Demand Levels", x_axis_label="Time Step", y_axis_label="Normalized Demand", width=800, height=400)

    sources = {}

    for i, lot_id in enumerate(lot_ids):
        color = colors[i % len(colors)]
        sources[lot_id] = ColumnDataSource(data=dict(x=[], y_price=[], y_occupancy=[], y_demand=[], lot_id=[], timestamp=[]))

        price_fig.line('x', 'y_price', source=sources[lot_id], legend_label=f'Lot {lot_id}', color=color, line_width=2)
        occupancy_fig.line('x', 'y_occupancy', source=sources[lot_id], legend_label=f'Lot {lot_id}', color=color, line_width=2)
        demand_fig.line('x', 'y_demand', source=sources[lot_id], legend_label=f'Lot {lot_id}', color=color, line_width=2)

    price_fig.add_tools(HoverTool(tooltips=[("Lot ID", "@lot_id"), ("Price", "@y_price{$0.00}"), ("Time Step", "@x")]))

    layout = gridplot([[price_fig], [occupancy_fig], [demand_fig]])
    return layout, sources

Real-Time Simulation Function

In [None]:
def simulate_real_time_processing(df, pricing_system):
    """Simulate real-time data processing and pricing"""

    print("Starting Real-time Parking Pricing Simulation...")
    print("=" * 50)

    # Group data by timestamp for real-time simulation
    timestamps = sorted(df['timestamp'].unique())

    results = []

    for step, timestamp in enumerate(timestamps[:100]):  # Limit for demo
        current_batch = df[df['timestamp'] == timestamp]

        # Prepare all lots data for competitive model
        all_lots_data = {}
        for _, row in current_batch.iterrows():
            all_lots_data[row['lot_id']] = {
                'lat': row['latitude'],
                'lon': row['longitude'],
                'capacity': row['capacity'],
                'occupancy': row['occupancy'],
                'queue_length': row['queue_length'],
                'traffic_level': row['traffic_level']
            }

        print(f"\nStep {step + 1}: Processing {len(current_batch)} lots at {timestamp}")

        step_results = []

        for _, row in current_batch.iterrows():
            lot_id = row['lot_id']

            # Prepare current data
            current_data = {
                'latitude': row['latitude'],
                'longitude': row['longitude'],
                'capacity': row['capacity'],
                'occupancy': row['occupancy'],
                'queue_length': row['queue_length'],
                'traffic_level': row['traffic_level'],
                'vehicle_type': row['vehicle_type'],
                'special_day': row['special_day']
            }

            # Calculate prices using all three models
            linear_price = pricing_system.model_1_linear_pricing(lot_id, current_data)
            demand_price = pricing_system.model_2_demand_based_pricing(lot_id, current_data)
            competitive_price = pricing_system.model_3_competitive_pricing(lot_id, current_data, all_lots_data)

            # Check for rerouting suggestion
            should_reroute, alternative_lot = pricing_system.should_reroute(lot_id, current_data, all_lots_data)

            result = {
                'step': step,
                'timestamp': timestamp,
                'lot_id': lot_id,
                'occupancy_rate': current_data['occupancy'] / max(current_data['capacity'], 1),
                'queue_length': current_data['queue_length'],
                'linear_price': linear_price,
                'demand_price': demand_price,
                'competitive_price': competitive_price,
                'should_reroute': should_reroute,
                'alternative_lot': alternative_lot,
                'demand_level': pricing_system.demand_history[lot_id][-1] if lot_id in pricing_system.demand_history else 0
            }

            step_results.append(result)

            if step % 10 == 0:  # Print every 10 steps
                print(f"  Lot {lot_id}: Linear=${linear_price:.2f}, Demand=${demand_price:.2f}, Competitive=${competitive_price:.2f}")

        results.extend(step_results)

        # Simulate real-time delay
        time.sleep(0.1)

    return pd.DataFrame(results)

Analysis Function

In [None]:
def analyze_results(results_df, pricing_system):
    """Analyze and visualize the results"""

    print("\n" + "="*50)
    print("ANALYSIS RESULTS")
    print("="*50)

    # Overall statistics
    print("\n1. PRICING STATISTICS:")
    print(f"   Average Linear Price: ${results_df['linear_price'].mean():.2f}")
    print(f"   Average Demand Price: ${results_df['demand_price'].mean():.2f}")
    print(f"   Average Competitive Price: ${results_df['competitive_price'].mean():.2f}")
    print(f"   Price Range: ${results_df['competitive_price'].min():.2f} - ${results_df['competitive_price'].max():.2f}")

    # Rerouting statistics
    rerouting_rate = results_df['should_reroute'].sum() / len(results_df) * 100
    print(f"\n2. REROUTING RECOMMENDATIONS:")
    print(f"   Rerouting Rate: {rerouting_rate:.1f}%")
    print(f"   Total Rerouting Suggestions: {results_df['should_reroute'].sum()}")

    # Demand analysis
    print(f"\n3. DEMAND ANALYSIS:")
    print(f"   Average Demand Level: {results_df['demand_level'].mean():.3f}")
    print(f"   Demand Range: {results_df['demand_level'].min():.3f} - {results_df['demand_level'].max():.3f}")

    # Occupancy correlation
    occupancy_price_corr = results_df['occupancy_rate'].corr(results_df['competitive_price'])
    print(f"\n4. OCCUPANCY-PRICE CORRELATION:")
    print(f"   Correlation: {occupancy_price_corr:.3f}")

    # Model comparison
    print(f"\n5. MODEL COMPARISON:")
    print(f"   Linear vs Demand Price Correlation: {results_df['linear_price'].corr(results_df['demand_price']):.3f}")
    print(f"   Demand vs Competitive Price Correlation: {results_df['demand_price'].corr(results_df['competitive_price']):.3f}")

    return results_df

Main Execution Function

In [None]:
def main():
    """Main execution function"""

    print("Dynamic Parking Pricing System")
    print("Capstone Project - Summer Analytics 2025")
    print("="*50)

    # Load data
    print("\n1. Loading and preprocessing data...")
    df = load_and_preprocess_data('dataset.csv')
    print(f"   Loaded {len(df)} records for {df['lot_id'].nunique()} parking lots")

    # Initialize pricing system
    print("\n2. Initializing pricing system...")
    pricing_system = ParkingPricingSystem(base_price=10.0)

    # Create visualizations
    print("\n3. Setting up real-time visualizations...")
    layout, sources = create_real_time_visualization(pricing_system, df)

    # Run simulation
    print("\n4. Running real-time simulation...")
    results_df = simulate_real_time_processing(df, pricing_system)

    # Analyze results
    print("\n5. Analyzing results...")
    final_results = analyze_results(results_df, pricing_system)

    return final_results, pricing_system

Execute the System

In [None]:
# Run the complete system
results, system = main()

Dynamic Parking Pricing System
Capstone Project - Summer Analytics 2025

1. Loading and preprocessing data...
Error loading data: time data "13-10-2016 07:57:00" doesn't match format "%m-%d-%Y %H:%M:%S", at position 162. You might want to try:
    - passing `format` if your strings have a consistent format;
    - passing `format='ISO8601'` if your strings are all ISO8601 but not necessarily in exactly the same format;
    - passing `format='mixed'`, and the format will be inferred for each element individually. You might want to use `dayfirst` alongside this.
   Loaded 18396 records for 14 parking lots

2. Initializing pricing system...

3. Setting up real-time visualizations...

4. Running real-time simulation...
Starting Real-time Parking Pricing Simulation...

Step 1: Processing 14 lots at 2024-01-01 08:00:00
  Lot 0: Linear=$10.33, Demand=$14.67, Competitive=$14.67
  Lot 1: Linear=$10.14, Demand=$13.79, Competitive=$13.79
  Lot 2: Linear=$10.31, Demand=$14.65, Competitive=$14.61
  

Create Final Visualizations

In [None]:
# Create summary visualization
print("\nCreating summary visualizations...")

# Price evolution plot
price_evolution = figure(title="Price Evolution Comparison",
                       x_axis_label="Time Steps",
                       y_axis_label="Price ($)",
                       width=900, height=500)

for lot_id in results['lot_id'].unique()[:5]:  # Show first 5 lots
    lot_data = results[results['lot_id'] == lot_id]
    price_evolution.line(lot_data['step'], lot_data['linear_price'],
                       legend_label=f'Lot {lot_id} Linear', alpha=0.7)
    price_evolution.line(lot_data['step'], lot_data['demand_price'],
                       legend_label=f'Lot {lot_id} Demand', alpha=0.7)
    price_evolution.line(lot_data['step'], lot_data['competitive_price'],
                       legend_label=f'Lot {lot_id} Competitive', line_width=2)

price_evolution.legend.location = "top_left"
price_evolution.legend.click_policy = "hide"

show(price_evolution)

print("\nSIMULATION COMPLETE!")


Creating summary visualizations...



SIMULATION COMPLETE!


Display Final Results Summary

In [None]:
print("\n" + "="*50)
print("              FINAL RESULTS SUMMARY")
print("="*50)
print("\nKey Insights:")
print("1. The linear model provides a baseline pricing mechanism")
print("2. The demand-based model incorporates multiple factors for better pricing")
print("3. The competitive model adds location intelligence and market dynamics")
print("4. Real-time processing enables dynamic price adjustments")
print("5. Rerouting suggestions help optimize overall utilization")

# Display sample results
print(f"\nSample Results (First 10 rows):")
print(results.head(10)[['lot_id', 'linear_price', 'demand_price', 'competitive_price', 'should_reroute']].to_string())


              FINAL RESULTS SUMMARY

Key Insights:
1. The linear model provides a baseline pricing mechanism
2. The demand-based model incorporates multiple factors for better pricing
3. The competitive model adds location intelligence and market dynamics
4. Real-time processing enables dynamic price adjustments
5. Rerouting suggestions help optimize overall utilization

Sample Results (First 10 rows):
   lot_id  linear_price  demand_price  competitive_price  should_reroute
0       0     10.330986     14.666857          14.666857           False
1       1     10.137500     13.786812          13.786812           False
2       2     10.313830     14.654651          14.611259           False
3       3     10.109756     13.345259          13.345259           False
4       4     10.343373     14.730640          14.727451           False
5       5     10.227941     14.499568          14.463930           False
6       6     10.210843     14.430393          14.441113           False
7       7