In [12]:
# Cell 1: Mount Google Drive and Install Dependencies
from google.colab import drive
drive.mount('/content/drive')

# Install required packages
!pip install numpy pandas pathway bokeh --quiet

print("Drive mounted and packages installed successfully!")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Drive mounted and packages installed successfully!


In [13]:
# Cell 2: Import Libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import pathway as pw
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource
from bokeh.layouts import gridplot, column
from bokeh.io import push_notebook
import time
import warnings
warnings.filterwarnings('ignore')

# Enable Bokeh in Colab
output_notebook()

print("All libraries imported successfully!")

All libraries imported successfully!


In [14]:
# Cell 3: Load and Explore Data
# Update this path to your actual CSV file location in Google Drive
DATA_PATH = '/content/drive/MyDrive/dynamic_pricing_project/capstone_dataset.csv'

# Load the data
try:
    df = pd.read_csv(DATA_PATH)
    print(f"Data loaded successfully! Shape: {df.shape}")
    print("\nFirst few rows:")
    print(df.head())
    print("\nColumn info:")
    print(df.info())
    print("\nUnique parking lots:")
    print(df['SystemCodeNumber'].unique())
except FileNotFoundError:
    print("❌ CSV file not found. Please upload your parking_data.csv to Google Drive and update the path.")
    # Create sample data for testing
    sample_data = {
        'ID': range(10),
        'SystemCodeNumber': ['BHMBCCMKT01'] * 5 + ['BHMBCCMKT02'] * 5,
        'Capacity': [577] * 5 + [400] * 5,
        'Latitude': [26.14453614] * 5 + [26.15453614] * 5,
        'Longitude': [91.73617216] * 5 + [91.74617216] * 5,
        'Occupancy': [61, 64, 80, 107, 150, 45, 60, 75, 90, 120],
        'VehicleType': ['car', 'car', 'car', 'car', 'bike', 'car', 'truck', 'car', 'bike', 'car'],
        'TrafficConditionNearby': ['low', 'low', 'low', 'low', 'low', 'average', 'high', 'average', 'low', 'high'],
        'QueueLength': [1, 1, 2, 2, 2, 1, 6, 5, 3, 4],
        'IsSpecialDay': [0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
        'LastUpdatedDate': ['04-10-2016'] * 10,
        'LastUpdatedTime': ['07:59:00', '08:25:00', '08:59:00', '09:32:00', '09:59:00',
                           '10:26:00', '10:59:00', '11:25:00', '11:59:00', '12:30:00']
    }
    df = pd.DataFrame(sample_data)
    print("Using sample data for testing")
    print(df.head())

Data loaded successfully! Shape: (18368, 12)

First few rows:
   ID SystemCodeNumber  Capacity   Latitude  Longitude  Occupancy VehicleType  \
0   0      BHMBCCMKT01       577  26.144536  91.736172         61         car   
1   1      BHMBCCMKT01       577  26.144536  91.736172         64         car   
2   2      BHMBCCMKT01       577  26.144536  91.736172         80         car   
3   3      BHMBCCMKT01       577  26.144536  91.736172        107         car   
4   4      BHMBCCMKT01       577  26.144536  91.736172        150        bike   

  TrafficConditionNearby  QueueLength  IsSpecialDay LastUpdatedDate  \
0                    low            1             0      04-10-2016   
1                    low            1             0      04-10-2016   
2                    low            2             0      04-10-2016   
3                    low            2             0      04-10-2016   
4                    low            2             0      04-10-2016   

  LastUpdatedTime  
0   

In [15]:
# Cell 4: Define the Pricing Engine Class
class ParkingPricingEngine:
    def __init__(self, base_price=10.0):
        self.base_price = base_price
        self.current_prices = {}  # SystemCodeNumber -> current price
        self.price_history = {}   # SystemCodeNumber -> price history
        self.competitor_cache = {}  # Nearby lots cache

        # Vehicle type weights
        self.vehicle_weights = {
            'car': 1.0,
            'bike': 0.5,
            'truck': 1.5,
            'cycle': 0.3
        }

        # Traffic condition encoding
        self.traffic_encoding = {
            'low': 0,
            'average': 1,
            'high': 2
        }

        print("Pricing Engine initialized with base price: $", base_price)

    def extract_features(self, data_row):
        """Extract and normalize features from raw data"""
        features = {
            'occupancy_rate': data_row['Occupancy'] / data_row['Capacity'],
            'queue_length': data_row['QueueLength'],
            'traffic_level': self.traffic_encoding[data_row['TrafficConditionNearby']],
            'is_special_day': data_row['IsSpecialDay'],
            'vehicle_weight': self.vehicle_weights[data_row['VehicleType']],
            'system_code': data_row['SystemCodeNumber'],
            'lat': data_row['Latitude'],
            'lng': data_row['Longitude'],
            'timestamp': f"{data_row['LastUpdatedDate']} {data_row['LastUpdatedTime']}"
        }
        return features

    def calculate_distance(self, lat1, lon1, lat2, lon2):
        """Calculate Haversine distance between two points"""
        R = 6371  # Earth's radius in km

        lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
        dlat = lat2 - lat1
        dlon = lon2 - lon1

        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a))

        return R * c

print("Pricing Engine class defined!")

Pricing Engine class defined!


In [16]:
# Cell 5: Define Pricing Models
class ParkingPricingEngine(ParkingPricingEngine):  # Extend the class

    def model_1_baseline(self, features, alpha=0.2):
        """Model 1: Simple linear pricing based on occupancy"""
        system_code = features['system_code']
        current_price = self.current_prices.get(system_code, self.base_price)

        # Linear adjustment based on occupancy rate
        price_adjustment = alpha * features['occupancy_rate']
        new_price = current_price + price_adjustment

        # Ensure price doesn't go below base/2 or above base*2
        new_price = max(self.base_price * 0.5, min(new_price, self.base_price * 2.0))

        return new_price

    def model_2_demand_based(self, features, alpha=0.3, beta=0.1, gamma=0.05, delta=0.2, epsilon=0.1, lambda_=0.5):
        """Model 2: Demand-based pricing with multiple factors"""
        # Calculate demand score
        demand = (alpha * features['occupancy_rate'] +
                 beta * features['queue_length'] -
                 gamma * features['traffic_level'] +
                 delta * features['is_special_day'] +
                 epsilon * features['vehicle_weight'])

        # Normalize demand to reasonable range
        normalized_demand = np.tanh(demand)  # Keeps between -1 and 1

        # Calculate price
        new_price = self.base_price * (1 + lambda_ * normalized_demand)

        # Bound the price
        new_price = max(self.base_price * 0.5, min(new_price, self.base_price * 2.0))

        return new_price

    def find_competitors(self, features, all_lots_data, radius_km=2.0):
        """Find nearby parking lots within radius"""
        competitors = []
        current_lat, current_lng = features['lat'], features['lng']
        current_system = features['system_code']

        for _, lot_data in all_lots_data.iterrows():
            if lot_data['SystemCodeNumber'] != current_system:
                distance = self.calculate_distance(
                    current_lat, current_lng,
                    lot_data['Latitude'], lot_data['Longitude']
                )
                if distance <= radius_km:
                    competitors.append({
                        'system_code': lot_data['SystemCodeNumber'],
                        'distance': distance,
                        'occupancy_rate': lot_data['Occupancy'] / lot_data['Capacity'],
                        'current_price': self.current_prices.get(lot_data['SystemCodeNumber'], self.base_price)
                    })

        return competitors

    def model_3_competitive(self, features, competitors, base_demand_price):
        """Model 3: Competitive pricing with location intelligence"""
        if not competitors:
            return base_demand_price

        # Calculate average competitor price
        avg_competitor_price = np.mean([comp['current_price'] for comp in competitors])

        # Find cheapest competitor
        cheapest_competitor = min(competitors, key=lambda x: x['current_price'])

        # Competitive logic
        if features['occupancy_rate'] > 0.9:  # Lot is nearly full
            if cheapest_competitor['current_price'] < base_demand_price:
                # Reduce price to compete or suggest rerouting
                competitive_price = cheapest_competitor['current_price'] * 0.95
                return max(competitive_price, self.base_price * 0.5)

        # If competitors are expensive, we can increase price
        if avg_competitor_price > base_demand_price * 1.2:
            return min(avg_competitor_price * 0.9, self.base_price * 2.0)

        return base_demand_price

print("All pricing models defined!")

All pricing models defined!


In [17]:
# Cell 6: Define Price Update and Smoothing Functions
class ParkingPricingEngine(ParkingPricingEngine):  # Extend the class again

    def smooth_price_transition(self, system_code, new_price, smoothing_factor=0.7):
        """Apply exponential smoothing to price changes"""
        if system_code not in self.current_prices:
            self.current_prices[system_code] = self.base_price

        current_price = self.current_prices[system_code]
        smoothed_price = smoothing_factor * current_price + (1 - smoothing_factor) * new_price

        return smoothed_price

    def update_price(self, data_row, all_lots_data=None, model='demand_based'):
        """Main pricing update function"""
        features = self.extract_features(data_row)
        system_code = features['system_code']

        # Calculate price based on selected model
        if model == 'baseline':
            new_price = self.model_1_baseline(features)
        elif model == 'demand_based':
            new_price = self.model_2_demand_based(features)
        elif model == 'competitive' and all_lots_data is not None:
            # First calculate demand-based price
            demand_price = self.model_2_demand_based(features)
            # Then apply competitive logic
            competitors = self.find_competitors(features, all_lots_data)
            new_price = self.model_3_competitive(features, competitors, demand_price)
        else:
            new_price = self.model_2_demand_based(features)

        # Apply price smoothing
        final_price = self.smooth_price_transition(system_code, new_price)

        # Update price history
        self.current_prices[system_code] = final_price
        if system_code not in self.price_history:
            self.price_history[system_code] = []
        self.price_history[system_code].append({
            'timestamp': features['timestamp'],
            'price': final_price,
            'occupancy_rate': features['occupancy_rate'],
            'queue_length': features['queue_length']
        })

        return final_price

    def get_pricing_summary(self):
        """Get summary of current pricing"""
        summary = []
        for system_code, price in self.current_prices.items():
            summary.append({
                'SystemCode': system_code,
                'CurrentPrice': f"${price:.2f}",
                'HistoryLength': len(self.price_history.get(system_code, []))
            })
        return pd.DataFrame(summary)

print("Price update and smoothing functions defined!")

Price update and smoothing functions defined!


In [19]:
# Cell 7: Initialize and Test the Engine
# Initialize the pricing engine
engine = ParkingPricingEngine(base_price=10.0)

# Test with sample data
print("Testing pricing engine with sample data...")

# Test Model 1 - Baseline
print("\n--- MODEL 1: BASELINE ---")
for i in range(min(3, len(df))):
    row = df.iloc[i]
    price = engine.update_price(row, model='baseline')
    occupancy_rate = row['Occupancy'] / row['Capacity']
    print(f"Lot: {row['SystemCodeNumber']}, Occupancy: {occupancy_rate:.2%}, Price: ${price:.2f}")

# Reset engine for next test
engine = ParkingPricingEngine(base_price=10.0)

# Test Model 2 - Demand-based
print("\n--- MODEL 2: DEMAND-BASED ---")
for i in range(min(3, len(df))):
    row = df.iloc[i]
    price = engine.update_price(row, model='demand_based')
    occupancy_rate = row['Occupancy'] / row['Capacity']
    print(f"Lot: {row['SystemCodeNumber']}, Occupancy: {occupancy_rate:.2%}, Queue: {row['QueueLength']}, Price: ${price:.2f}")

print("\nEngine testing complete!")

Pricing Engine initialized with base price: $ 10.0
Testing pricing engine with sample data...

--- MODEL 1: BASELINE ---
Lot: BHMBCCMKT01, Occupancy: 10.57%, Price: $10.01
Lot: BHMBCCMKT01, Occupancy: 11.09%, Price: $10.01
Lot: BHMBCCMKT01, Occupancy: 13.86%, Price: $10.02
Pricing Engine initialized with base price: $ 10.0

--- MODEL 2: DEMAND-BASED ---
Lot: BHMBCCMKT01, Occupancy: 10.57%, Queue: 1, Price: $10.34
Lot: BHMBCCMKT01, Occupancy: 11.09%, Queue: 1, Price: $10.58
Lot: BHMBCCMKT01, Occupancy: 13.86%, Queue: 2, Price: $10.90

Engine testing complete!


In [20]:
# Cell 8: Process Full Dataset
print("Processing full dataset...")

# Reset engine for full processing
engine = ParkingPricingEngine(base_price=10.0)

# Process each row
results = []
for i, row in df.iterrows():
    price = engine.update_price(row, all_lots_data=df, model='demand_based')
    results.append({
        'ID': row['ID'],
        'SystemCodeNumber': row['SystemCodeNumber'],
        'Timestamp': f"{row['LastUpdatedDate']} {row['LastUpdatedTime']}",
        'Occupancy': row['Occupancy'],
        'Capacity': row['Capacity'],
        'OccupancyRate': row['Occupancy'] / row['Capacity'],
        'QueueLength': row['QueueLength'],
        'VehicleType': row['VehicleType'],
        'TrafficCondition': row['TrafficConditionNearby'],
        'IsSpecialDay': row['IsSpecialDay'],
        'CalculatedPrice': price
    })

# Convert to DataFrame
results_df = pd.DataFrame(results)
print(f"Processed {len(results_df)} records")
print("\nSample results:")
print(results_df[['SystemCodeNumber', 'OccupancyRate', 'QueueLength', 'CalculatedPrice']].head(10))

# Show pricing summary
print("\nCurrent Pricing Summary:")
print(engine.get_pricing_summary())

Processing full dataset...
Pricing Engine initialized with base price: $ 10.0
Processed 18368 records

Sample results:
  SystemCodeNumber  OccupancyRate  QueueLength  CalculatedPrice
0      BHMBCCMKT01       0.105719            1        10.341484
1      BHMBCCMKT01       0.110919            1        10.582740
2      BHMBCCMKT01       0.138648            2        10.901268
3      BHMBCCMKT01       0.185442            2        11.142930
4      BHMBCCMKT01       0.259965            2        11.275121
5      BHMBCCMKT01       0.306759            3        11.576321
6      BHMBCCMKT01       0.379549            6        12.068446
7      BHMBCCMKT01       0.428076            5        12.333652
8      BHMBCCMKT01       0.448873            5        12.454657
9      BHMBCCMKT01       0.461005            8        12.784091

Current Pricing Summary:
          SystemCode CurrentPrice  HistoryLength
0        BHMBCCMKT01       $12.03           1312
1        BHMBCCTHL01       $12.94           1312
2   

In [21]:
# Cell 9: Create Visualizations
print("Creating visualizations...")

# Prepare data for visualization
viz_data = []
for system_code, history in engine.price_history.items():
    for i, entry in enumerate(history):
        viz_data.append({
            'system_code': system_code,
            'timestamp': entry['timestamp'],
            'price': entry['price'],
            'occupancy_rate': entry['occupancy_rate'],
            'queue_length': entry['queue_length'],
            'time_index': i  # Add proper time index
        })

viz_df = pd.DataFrame(viz_data)

if len(viz_df) == 0:
    print("No visualization data available. Make sure to run Cell 8 first.")
else:
    # Create Bokeh plots
    from bokeh.palettes import Category10, Set3
    from bokeh.transform import factor_cmap

    # Get unique systems and create color palette
    unique_systems = viz_df['system_code'].unique()
    n_systems = len(unique_systems)

    # Choose appropriate palette based on number of systems
    if n_systems <= 10:
        colors = Category10[max(3, n_systems)]
    else:
        colors = Set3[12] + Category10[10]  # Extend palette for more systems

    print(f"Creating plots for {n_systems} parking systems...")

    # Plot 1: Price trends over time
    p1 = figure(title="Real-time Parking Prices Over Time",
               x_axis_label="Time Index",
               y_axis_label="Price ($)",
               width=800, height=400)

    # Add lines for each parking lot
    for i, system_code in enumerate(unique_systems):
        system_data = viz_df[viz_df['system_code'] == system_code].reset_index()
        if len(system_data) > 0:
            # Use explicit lists instead of range objects
            x_values = list(range(len(system_data)))
            y_values = system_data['price'].tolist()
            p1.line(x_values, y_values,
                   legend_label=system_code, line_width=2,
                   color=colors[i % len(colors)])

    p1.legend.location = "top_left"
    p1.legend.click_policy = "hide"

    # Plot 2: Occupancy vs Price scatter
    p2 = figure(title="Occupancy Rate vs Price",
               x_axis_label="Occupancy Rate",
               y_axis_label="Price ($)",
               width=600, height=400)

    # Create color mapping with proper factors
    color_factors = unique_systems.tolist()
    mapper = factor_cmap('system_code', palette=colors[:len(color_factors)], factors=color_factors)

    # Create data source
    source = ColumnDataSource(viz_df)
    p2.circle('occupancy_rate', 'price', source=source,
             size=8, color=mapper, alpha=0.7, legend_field='system_code')

    # Plot 3: Queue Length vs Price
    p3 = figure(title="Queue Length vs Price",
               x_axis_label="Queue Length",
               y_axis_label="Price ($)",
               width=600, height=400)

    p3.circle('queue_length', 'price', source=source,
             size=8, color=mapper, alpha=0.7, legend_field='system_code')

    # Plot 4: Price Distribution
    p4 = figure(title="Price Distribution by Parking System",
               x_axis_label="Price ($)",
               y_axis_label="Frequency",
               width=600, height=400)

    # Create histogram data
    for i, system_code in enumerate(unique_systems):
        system_prices = viz_df[viz_df['system_code'] == system_code]['price']
        if len(system_prices) > 0:
            hist, edges = np.histogram(system_prices, bins=10)
            p4.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
                   alpha=0.7, color=colors[i % len(colors)],
                   legend_label=system_code)

    p4.legend.location = "top_right"

    # Display plots
    show(column(p1, p2, p3, p4))

    print("Visualizations created!")
    print(f"Visualization summary:")
    print(f"   - Systems plotted: {n_systems}")
    print(f"   - Total data points: {len(viz_df)}")
    print(f"   - Price range: ${viz_df['price'].min():.2f} - ${viz_df['price'].max():.2f}")
    print(f"   - Average occupancy: {viz_df['occupancy_rate'].mean():.1%}")

Creating visualizations...
Creating plots for 14 parking systems...


Visualizations created!
Visualization summary:
   - Systems plotted: 14
   - Total data points: 18368
   - Price range: $10.33 - $14.53
   - Average occupancy: 50.9%


In [22]:
# Cell 10: Export Results
print("Saving results...")

# Save results to Google Drive
output_path = '/content/drive/MyDrive/parking_pricing_results.csv'
results_df.to_csv(output_path, index=False)

# Save pricing history
history_data = []
for system_code, history in engine.price_history.items():
    for entry in history:
        history_data.append({
            'SystemCode': system_code,
            'Timestamp': entry['timestamp'],
            'Price': entry['price'],
            'OccupancyRate': entry['occupancy_rate'],
            'QueueLength': entry['queue_length']
        })

history_df = pd.DataFrame(history_data)
history_path = '/content/drive/MyDrive/parking_pricing_history.csv'
history_df.to_csv(history_path, index=False)

print(f"Results saved to: {output_path}")
print(f"History saved to: {history_path}")
print(f"Final Statistics:")
print(f"   - Total records processed: {len(results_df)}")
print(f"   - Unique parking lots: {len(unique_systems)}")
print(f"   - Price range: ${results_df['CalculatedPrice'].min():.2f} - ${results_df['CalculatedPrice'].max():.2f}")
print(f"   - Average price: ${results_df['CalculatedPrice'].mean():.2f}")

Saving results...
Results saved to: /content/drive/MyDrive/parking_pricing_results.csv
History saved to: /content/drive/MyDrive/parking_pricing_history.csv
Final Statistics:
   - Total records processed: 18368
   - Unique parking lots: 14
   - Price range: $10.33 - $14.53
   - Average price: $12.85


In [24]:
# Cell 11: Model Comparison
print("🔍 Comparing all three models...")

# Reset and test all models
models = ['baseline', 'demand_based']
comparison_results = []

for model in models:
    engine = ParkingPricingEngine(base_price=10.0)
    model_results = []

    for i, row in df.iterrows():
        price = engine.update_price(row, all_lots_data=df, model=model)
        model_results.append(price)

    comparison_results.append({
        'model': model,
        'avg_price': np.mean(model_results),
        'min_price': np.min(model_results),
        'max_price': np.max(model_results),
        'std_price': np.std(model_results)
    })

comparison_df = pd.DataFrame(comparison_results)
print("\nModel Comparison:")
print(comparison_df)

# Create comparison plot
p_comp = figure(title="Model Comparison - Average Prices",
               x_axis_label="Model",
               y_axis_label="Average Price ($)",
               width=600, height=400)

p_comp.vbar(x=comparison_df['model'], top=comparison_df['avg_price'],
           width=0.5, color=['red', 'green', 'blue'], alpha=0.7)

show(p_comp)

print("\nAll analysis complete!")
print("Summary:")
print("   - Drive mounted and data loaded")
print("   - Pricing engine implemented with 3 models")
print("   - Full dataset processed")
print("   - Visualizations created")
print("   - Results exported to Google Drive")
print("   - Model comparison completed")

🔍 Comparing all three models...
Pricing Engine initialized with base price: $ 10.0
Pricing Engine initialized with base price: $ 10.0

Model Comparison:
          model  avg_price  min_price  max_price  std_price
0      baseline  18.529613  10.004174  20.000000    2.75355
1  demand_based  12.850823  10.326021  14.533713    0.61550



All analysis complete!
Summary:
   - Drive mounted and data loaded
   - Pricing engine implemented with 3 models
   - Full dataset processed
   - Visualizations created
   - Results exported to Google Drive
   - Model comparison completed
