# Enterprise Dynamic Parking Pricing Engine
Advanced location intelligence with competitive analysis for urban parking optimization

Dependacies 

In [1]:


import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any
import warnings
from dataclasses import dataclass
from enum import Enum
import logging

warnings.filterwarnings('ignore')


Load parking data CSV

In [31]:
df = pd.read_csv("data\dataset.csv")

FileNotFoundError: [Errno 2] No such file or directory: 'data\\dataset.csv'

In [5]:
class CompetitivePosition(Enum):
    PRICE_LEADER = "price_leader"
    COMPETITIVE = "competitive" 
    PREMIUM = "premium"
    EXPENSIVE = "expensive"
    ISOLATED = "isolated"

In [6]:
class TrafficLevel(Enum):
    LOW = "Low"
    MEDIUM = "Medium"
    HIGH = "High"

In [7]:
@dataclass
class PricingFactors:
    occupancy_weight: float = 1.5
    queue_weight: float = 0.8
    traffic_weight: float = 0.4
    special_day_weight: float = 0.3
    vehicle_type_weight: float = 0.2
    competitor_weight: float = 0.6
    time_of_day_weight: float = 0.5

In [8]:
@dataclass
class CompetitorAnalysis:
    competitors: Dict[str, Dict[str, float]]
    avg_competitor_price: Optional[float]
    min_competitor_price: Optional[float]
    max_competitor_price: Optional[float]
    price_advantage: float
    competitive_position: CompetitivePosition
    recommendations: List[str]
    competitor_count: int

In [9]:
@dataclass
class ReroutingSuggestion:
    space_id: str
    price: float
    distance_km: float

 pricing engine

In [10]:
class DynamicParkingPricingEngine:
    
    def __init__(self, base_price: float = 10.0, logger: Optional[logging.Logger] = None):
        self.base_price = base_price
        self.logger = logger or self._setup_logger()
        
        # Core data structures
        self.historical_patterns: Dict[str, Dict] = {}
        self.pricing_history: Dict[str, List[Dict]] = {}
        self.distance_matrix: Optional[np.ndarray] = None
        self.space_mapping: Dict[str, Dict] = {}
        
        # Pricing configuration
        self.factors = PricingFactors()
        self.vehicle_premiums = {'car': 1.0, 'bike': 0.6, 'truck': 1.4}
        self.traffic_multipliers = {
            TrafficLevel.LOW.value: 0.95,
            TrafficLevel.MEDIUM.value: 1.0,
            TrafficLevel.HIGH.value: 1.15
        }
        
        # Business constraints
        self.MIN_PRICE = 5.0
        self.MAX_PRICE = 50.0
        self.MAX_PRICE_CHANGE_PCT = 0.15
        self.COMPETITOR_RADIUS_KM = 2.0
        
        self.logger.info(f"Pricing engine initialized with base price: ${base_price}")

In [11]:
def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger('ParkingPricingEngine')
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger

        Calculate great circle distance between two points

In [12]:
def _haversine_distance(self, lat1: float, lon1: float, lat2: float, lon2: float) -> float:
       
        R = 6371  # Earth radius in km
        
        lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2])
        dlat, dlon = lat2 - lat1, lon2 - lon1
        
        a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
        c = 2 * np.arcsin(np.sqrt(a))
        
        return R * c

Build spatial index for competitor analysis

In [13]:
def build_spatial_index(self, df: pd.DataFrame) -> Tuple[np.ndarray, Dict[str, Dict]]:
        
        required_cols = ['SystemCodeNumber', 'Latitude', 'Longitude']
        if not all(col in df.columns for col in required_cols):
            raise ValueError(f"DataFrame missing required columns: {required_cols}")
        
        unique_spaces = df[required_cols].drop_duplicates()
        n_spaces = len(unique_spaces)
        
        if n_spaces == 0:
            raise ValueError("No unique parking spaces found in dataset")
        
        distance_matrix = np.zeros((n_spaces, n_spaces))
        space_mapping = {}
        
        for idx, (_, row) in enumerate(unique_spaces.iterrows()):
            space_id = str(row['SystemCodeNumber'])
            space_mapping[space_id] = {
                'index': idx,
                'lat': row['Latitude'],
                'lon': row['Longitude']
            }
        
                                                                # Vectorized distance calculation
        for i, (_, row1) in enumerate(unique_spaces.iterrows()):
            for j, (_, row2) in enumerate(unique_spaces.iterrows()):
                if i != j:
                    distance_matrix[i, j] = self._haversine_distance(
                        row1['Latitude'], row1['Longitude'],
                        row2['Latitude'], row2['Longitude']
                    )
        
        self.distance_matrix = distance_matrix
        self.space_mapping = space_mapping
        
        self.logger.info(f"Spatial index built for {n_spaces} parking spaces")
        return distance_matrix, space_mapping
    

Competitive intelligence analysis

In [14]:
    def analyze_competitor_landscape(self, space_id: str, current_prices: Dict[str, float]) -> CompetitorAnalysis:
        
        if not self.space_mapping or space_id not in self.space_mapping:
            return CompetitorAnalysis(
                competitors={}, avg_competitor_price=None, min_competitor_price=None,
                max_competitor_price=None, price_advantage=0,
                competitive_position=CompetitivePosition.ISOLATED,
                recommendations=["No spatial data - premium pricing opportunity"],
                competitor_count=0
            )
        
        space_idx = self.space_mapping[space_id]['index']
        competitors = {}
        
                                                         # Find competitors within radius
        for other_space_id, mapping in self.space_mapping.items():
            if other_space_id != space_id and other_space_id in current_prices:
                other_idx = mapping['index']
                distance = self.distance_matrix[space_idx, other_idx]
                
                if distance <= self.COMPETITOR_RADIUS_KM:
                    competitors[other_space_id] = {
                        'distance_km': distance,
                        'price': current_prices[other_space_id],
                        'lat': mapping['lat'],
                        'lon': mapping['lon']
                    }
        
        if not competitors:
            return CompetitorAnalysis(
                competitors={}, avg_competitor_price=None, min_competitor_price=None,
                max_competitor_price=None, price_advantage=0,
                competitive_position=CompetitivePosition.ISOLATED,
                recommendations=["No nearby competitors - premium pricing possible"],
                competitor_count=0
            )
        
                                                         # Calculate competitive metrics
        competitor_prices = [comp['price'] for comp in competitors.values()]
        avg_price = np.mean(competitor_prices)
        min_price = min(competitor_prices)
        max_price = max(competitor_prices)
        
        current_price = current_prices.get(space_id, self.base_price)
        price_advantage = current_price - avg_price
        
                                                        # Determine competitive position
        if current_price <= min_price:
            position = CompetitivePosition.PRICE_LEADER
        elif current_price <= avg_price:
            position = CompetitivePosition.COMPETITIVE
        elif current_price <= max_price:
            position = CompetitivePosition.PREMIUM
        else:
            position = CompetitivePosition.EXPENSIVE
        
                                                        # Generate strategic recommendations
        recommendations = self._generate_pricing_recommendations(position, price_advantage, len(competitors))
        
        return CompetitorAnalysis(
            competitors=competitors,
            avg_competitor_price=avg_price,
            min_competitor_price=min_price,
            max_competitor_price=max_price,
            price_advantage=price_advantage,
            competitive_position=position,
            recommendations=recommendations,
            competitor_count=len(competitors)
        )

strategic pricing recommendations

In [15]:
def _generate_pricing_recommendations(self, position: CompetitivePosition, price_advantage: float, competitor_count: int) -> List[str]:
        recommendations = []
        if position == CompetitivePosition.EXPENSIVE and price_advantage > 5:
            recommendations.append("Consider price reduction - significantly above market")
        elif position == CompetitivePosition.PRICE_LEADER:
            recommendations.append("Opportunity for strategic price increase")
        elif competitor_count >= 3 and price_advantage > 2:
            recommendations.append("Monitor competitor responses to pricing")
        else:
            recommendations.append("Maintain current competitive positioning")
        return recommendations

Extract historical demand patterns for predictive pricing

In [18]:
def extract_historical_patterns(self, df: pd.DataFrame) -> Dict[str, Dict]:
    required_cols = ['SystemCodeNumber', 'LastUpdatedDate', 'LastUpdatedTime', 'Occupancy', 'Capacity', 'QueueLength', 'TrafficConditionNearby']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")

    # Convert datetime with error handling
    try:
        df['datetime'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'])
    except Exception as e:
        self.logger.error(f"Date parsing error: {e}")
        raise ValueError("Unable to parse datetime columns")

    df['hour'] = df['datetime'].dt.hour
    df['day_of_week'] = df['datetime'].dt.dayofweek

    patterns = {}

    for space_id in df['SystemCodeNumber'].unique():
        space_data = df[df['SystemCodeNumber'] == space_id].copy()
        if len(space_data) == 0:
            continue

        # Hourly demand patterns
        hourly_occupancy = space_data.groupby('hour')['Occupancy'].agg(['mean', 'std', 'max']).fillna(0)
        daily_occupancy = space_data.groupby('day_of_week')['Occupancy'].mean()
        queue_patterns = space_data.groupby('hour')['QueueLength'].agg(['mean', 'std', 'max']).fillna(0)
        traffic_impact = space_data.groupby('TrafficConditionNearby')['Occupancy'].mean()

        capacity = space_data['Capacity'].iloc[0] if len(space_data) > 0 else 1
        avg_occupancy_rate = space_data['Occupancy'].mean() / max(capacity, 1)

        patterns[str(space_id)] = {
            'hourly_occupancy': hourly_occupancy,
            'daily_occupancy': daily_occupancy,
            'queue_patterns': queue_patterns,
            'traffic_impact': traffic_impact,
            'capacity': capacity,
            'avg_occupancy_rate': avg_occupancy_rate
        }

    self.historical_patterns = patterns
    self.logger.info(f"Historical patterns extracted for {len(patterns)} spaces")
    return patterns

Core dynamic pricing algorithm with competitive intelligence

In [19]:
def calculate_dynamic_price(self, space_id: str, current_state: Dict[str, Any], 
                              current_prices: Optional[Dict[str, float]] = None) -> Tuple[float, List[ReroutingSuggestion]]:
        space_id = str(space_id)
        
               # Validate required state data
        required_fields = ['Capacity', 'Occupancy', 'QueueLength', 'TrafficConditionNearby', 
                          'IsSpecialDay', 'VehicleType', 'LastUpdatedDate', 'LastUpdatedTime']
        
        missing_fields = [field for field in required_fields if field not in current_state]
        if missing_fields:
            self.logger.warning(f"Missing state fields for space {space_id}: {missing_fields}")
            return self.base_price, []
        
              # Extract state variables
        capacity = max(current_state['Capacity'], 1)  # Prevent division by zero
        occupancy = max(current_state['Occupancy'], 0)
        queue_length = max(current_state['QueueLength'], 0)
        traffic_level = current_state['TrafficConditionNearby']
        is_special_day = bool(current_state['IsSpecialDay'])
        vehicle_type = str(current_state['VehicleType']).lower()
        
        try:
            current_hour = pd.to_datetime(
                current_state['LastUpdatedDate'] + ' ' + current_state['LastUpdatedTime']
            ).hour
        except:
            current_hour = datetime.now().hour
        
                     # Calculate occupancy rate with bounds checking
        occupancy_rate = min(occupancy / capacity, 1.0)
        
                    # 1. Demand-based pricing multipliers
        occupancy_multiplier = self._calculate_occupancy_multiplier(occupancy_rate)
        queue_multiplier = self._calculate_queue_multiplier(queue_length)
        traffic_multiplier = self.traffic_multipliers.get(traffic_level, 1.0)
        special_day_multiplier = 1.2 if is_special_day else 1.0
        vehicle_multiplier = self.vehicle_premiums.get(vehicle_type, 1.0)
        
                     # 2. Historical pattern multiplier
        time_multiplier = self._calculate_time_multiplier(space_id, current_hour, capacity)
        
                      
                      
                       # 3. Competitive pricing intelligence
        competitive_multiplier = 1.0
        rerouting_suggestions = []
        
        if current_prices:
            comp_analysis = self.analyze_competitor_landscape(space_id, current_prices)
            competitive_multiplier, rerouting_suggestions = self._calculate_competitive_strategy(
                comp_analysis, occupancy_rate, queue_length
            )
        
                 
                 
                 # Combine factors with weights
        total_weight = sum(vars(self.factors).values())
        final_multiplier = (
            occupancy_multiplier * self.factors.occupancy_weight +
            queue_multiplier * self.factors.queue_weight +
            traffic_multiplier * self.factors.traffic_weight +
            special_day_multiplier * self.factors.special_day_weight +
            vehicle_multiplier * self.factors.vehicle_type_weight +
            competitive_multiplier * self.factors.competitor_weight +
            time_multiplier * self.factors.time_of_day_weight
        ) / total_weight
        
                    
                    
                    # Calculate final price with smoothing
        new_price = self.base_price * final_multiplier
        new_price = self._apply_price_smoothing(space_id, new_price)
        new_price = np.clip(new_price, self.MIN_PRICE, self.MAX_PRICE)
        new_price = round(new_price, 2)
        
               
               
               # Store pricing history
        self._record_pricing_decision(space_id, new_price, occupancy_rate, queue_length, {
            'occupancy_mult': occupancy_multiplier,
            'queue_mult': queue_multiplier,
            'traffic_mult': traffic_multiplier,
            'special_day_mult': special_day_multiplier,
            'vehicle_mult': vehicle_multiplier,
            'time_mult': time_multiplier,
            'competitive_mult': competitive_multiplier
        })
        
        return new_price, rerouting_suggestions

Calculate occupancy-based pricing multiplier

In [20]:
def _calculate_occupancy_multiplier(self, occupancy_rate: float) -> float:
       
        if occupancy_rate >= 0.9:
            return 2.0
        elif occupancy_rate >= 0.7:
            return 1.4 + (occupancy_rate - 0.7) * 3
        elif occupancy_rate >= 0.5:
            return 1.1 + (occupancy_rate - 0.5) * 1.5
        else:
            return 0.8 + occupancy_rate * 0.6

Calculate queue-based pricing multiplier

In [21]:
 def _calculate_queue_multiplier(self, queue_length: int) -> float:

        if queue_length > 5:
            return 1.3
        elif queue_length > 2:
            return 1.1
        elif queue_length > 0:
            return 1.05
        else:
            return 1.0

Calculate time-based pricing multiplier from historical patterns

In [22]:
def _calculate_time_multiplier(self, space_id: str, current_hour: int, capacity: int) -> float:
    
        if space_id not in self.historical_patterns:
            return 1.0
        
        hist_data = self.historical_patterns[space_id]
        if current_hour in hist_data['hourly_occupancy'].index:
            hist_occupancy_rate = hist_data['hourly_occupancy'].loc[current_hour, 'mean'] / capacity
            return 0.9 + hist_occupancy_rate * 0.3
        return 1.0

Calculate competitive pricing strategy

In [23]:
def _calculate_competitive_strategy(self, comp_analysis: CompetitorAnalysis, 
                                      occupancy_rate: float, queue_length: int) -> Tuple[float, List[ReroutingSuggestion]]:
    
        competitive_multiplier = 1.0
        rerouting_suggestions = []
        
        if not comp_analysis.competitors:
            return competitive_multiplier, rerouting_suggestions
        
        
        
                            # High demand + queue: check for rerouting opportunities
        if occupancy_rate >= 0.9 and queue_length > 0:
            cheapest_alternatives = sorted(
                comp_analysis.competitors.items(),
                key=lambda x: x[1]['price']
            )[:3]
            
            for alt_id, alt_data in cheapest_alternatives:
                if alt_data['price'] < self.base_price * 1.2:
                    rerouting_suggestions.append(ReroutingSuggestion(
                        space_id=alt_id,
                        price=alt_data['price'],
                        distance_km=alt_data['distance_km']
                    ))
            
            competitive_multiplier = 0.95 if comp_analysis.avg_competitor_price < self.base_price else 1.15
        
        
        
                           # Low utilization: be more competitive
        elif occupancy_rate < 0.5:
            if comp_analysis.competitive_position == CompetitivePosition.EXPENSIVE:
                competitive_multiplier = 0.85
            else:
                competitive_multiplier = max(0.9, comp_analysis.min_competitor_price / self.base_price * 0.95)
        
        
        
                           
                           # Normal occupancy: strategic positioning
        else:
            position_multipliers = {
                CompetitivePosition.EXPENSIVE: 0.95,
                CompetitivePosition.PRICE_LEADER: 1.05,
                CompetitivePosition.COMPETITIVE: 1.0,
                CompetitivePosition.PREMIUM: 1.02
            }
            competitive_multiplier = position_multipliers.get(comp_analysis.competitive_position, 1.0)
        
        return competitive_multiplier, rerouting_suggestions

Apply price smoothing to prevent erratic changes,to avoid confussion

In [24]:
def _apply_price_smoothing(self, space_id: str, new_price: float) -> float:
        if space_id in self.pricing_history and self.pricing_history[space_id]:
            last_price = self.pricing_history[space_id][-1]['price']
            max_change = last_price * self.MAX_PRICE_CHANGE_PCT
            
            if abs(new_price - last_price) > max_change:
                new_price = last_price + (max_change if new_price > last_price else -max_change)
        
        return new_price
    

Record pricing decision in history

In [25]:
def _record_pricing_decision(self, space_id: str, price: float, occupancy_rate: float, 
                               queue_length: int, factors: Dict[str, float]) -> None:
    
        if space_id not in self.pricing_history:
            self.pricing_history[space_id] = []
        
        record = {
            'timestamp': datetime.now(),
            'price': price,
            'occupancy_rate': occupancy_rate,
            'queue_length': queue_length,
            'factors': factors
        }
        
        self.pricing_history[space_id].append(record)
        
        
                # Keep only last 1000 records per space for better performance and memory management
        if len(self.pricing_history[space_id]) > 1000:
            self.pricing_history[space_id] = self.pricing_history[space_id][-1000:]

Execute real-time pricing simulation with competitive intelligence

In [26]:
def simulate_real_time_pricing(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    
        self.logger.info("Starting real-time competitive pricing simulation")
        
              # Initialize system
        self.extract_historical_patterns(df)
        self.build_spatial_index(df)
        
             # Prepare data
        df['datetime'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'])
        df_sorted = df.sort_values('datetime').reset_index(drop=True)
        
            # Simulation data structures
        current_prices = {}
        simulation_results = []
        rerouting_events = []
        competitive_insights = []
        
        self.logger.info(f"Processing {len(df_sorted)} records across {df_sorted['SystemCodeNumber'].nunique()} spaces")
        
             # Process by timestamp batches
        for timestamp, batch_df in df_sorted.groupby('datetime'):
            for _, row in batch_df.iterrows():
                space_id = str(row['SystemCodeNumber'])
                current_state = row.to_dict()
                
                # Calculate dynamic price
                new_price, rerouting_suggestions = self.calculate_dynamic_price(
                    space_id, current_state, current_prices
                )
                current_prices[space_id] = new_price
                
                # Store results
                simulation_results.append({
                    'timestamp': timestamp,
                    'space_id': space_id,
                    'price': new_price,
                    'occupancy': row['Occupancy'],
                    'capacity': row['Capacity'],
                    'occupancy_rate': row['Occupancy'] / max(row['Capacity'], 1),
                    'queue_length': row['QueueLength'],
                    'traffic_condition': row['TrafficConditionNearby'],
                    'vehicle_type': row['VehicleType'],
                    'is_special_day': row['IsSpecialDay']
                })
                
                # Handle rerouting events
                if rerouting_suggestions and row['Occupancy'] >= row['Capacity'] * 0.9:
                    for suggestion in rerouting_suggestions:
                        rerouting_events.append({
                            'timestamp': timestamp,
                            'from_space': space_id,
                            'from_price': new_price,
                            'to_space': suggestion.space_id,
                            'to_price': suggestion.price,
                            'distance_km': suggestion.distance_km,
                            'savings': new_price - suggestion.price
                        })
                
                # Competitive analysis
                if current_prices and len(current_prices) > 1:
                    comp_analysis = self.analyze_competitor_landscape(space_id, current_prices)
                    if comp_analysis.competitors:
                        competitive_insights.append({
                            'timestamp': timestamp,
                            'space_id': space_id,
                            'price': new_price,
                            'competitive_position': comp_analysis.competitive_position.value,
                            'price_advantage': comp_analysis.price_advantage,
                            'competitor_count': comp_analysis.competitor_count,
                            'avg_competitor_price': comp_analysis.avg_competitor_price
                        })
        
        # Convert to DataFrames
        results_df = pd.DataFrame(simulation_results)
        rerouting_df = pd.DataFrame(rerouting_events)
        competitive_df = pd.DataFrame(competitive_insights)
        
        self.logger.info(f"Simulation complete: {len(results_df)} pricing decisions, "
                        f"{len(rerouting_df)} rerouting events, {len(competitive_df)} competitive analyses")
        
        return results_df, rerouting_df, competitive_df

Generate comprehensive business analytics report

In [27]:
def generate_business_analytics_report(self, results_df: pd.DataFrame, 
                                         rerouting_df: Optional[pd.DataFrame] = None,
                                         competitive_df: Optional[pd.DataFrame] = None) -> Dict[str, Any]:
        
        
        if results_df.empty:
            self.logger.warning("Empty results DataFrame provided")
            return {}
        
        report = {}
        
                  # Pricing performance metrics
        report['pricing_performance'] = {
            'avg_price': results_df['price'].mean(),
            'price_range': (results_df['price'].min(), results_df['price'].max()),
            'price_volatility': results_df['price'].std(),
            'price_stability_index': 1 - (results_df['price'].std() / results_df['price'].mean())
        }
        
                      # Utilization metrics
        avg_occupancy_rate = results_df['occupancy_rate'].mean()
        report['utilization_metrics'] = {
            'avg_occupancy_rate': avg_occupancy_rate,
            'peak_occupancy_rate': results_df['occupancy_rate'].max(),
            'avg_queue_length': results_df['queue_length'].mean(),
            'high_demand_instances': (results_df['occupancy_rate'] > 0.8).sum()
        }
        
                  
                  
                  # Revenue optimization
        total_capacity_hours = results_df['capacity'].sum()
        total_occupied_hours = results_df['occupancy'].sum()
        estimated_revenue = (results_df['occupancy'] * results_df['price']).sum()
        
        report['revenue_optimization'] = {
            'estimated_total_revenue': estimated_revenue,
            'revenue_per_occupied_hour': estimated_revenue / max(total_occupied_hours, 1),
            'capacity_utilization': total_occupied_hours / max(total_capacity_hours, 1),
            'revenue_efficiency_score': estimated_revenue / (total_capacity_hours * self.base_price)
        }
        
                  
                  
                   # Competitive intelligence
        if competitive_df is not None and not competitive_df.empty:
            position_dist = competitive_df['competitive_position'].value_counts()
            report['competitive_intelligence'] = {
                'position_distribution': position_dist.to_dict(),
                'avg_price_advantage': competitive_df['price_advantage'].mean(),
                'competitive_pressure_events': (competitive_df['price_advantage'] < -2).sum(),
                'avg_nearby_competitors': competitive_df['competitor_count'].mean()
            }
        
                
                
                
                 # Rerouting intelligence
        if rerouting_df is not None and not rerouting_df.empty:
            report['rerouting_intelligence'] = {
                'total_suggestions': len(rerouting_df),
                'avg_customer_savings': rerouting_df['savings'].mean(),
                'avg_rerouting_distance': rerouting_df['distance_km'].mean()
            }
        
      
      
      
                    # Space performance analysis
        space_perf = results_df.groupby('space_id').agg({
            'price': ['mean', 'std'],
            'occupancy_rate': ['mean', 'max'],
            'occupancy': 'sum',
            'queue_length': 'mean'
        })
        
        space_perf['revenue_score'] = (
            space_perf[('occupancy', 'sum')] * space_perf[('price', 'mean')]
        )
        
        report['space_performance'] = space_perf.sort_values('revenue_score', ascending=False).to_dict()
        
                  
                  
                  
                   # Strategic recommendations
        report['recommendations'] = self._generate_strategic_recommendations(
            results_df, competitive_df, rerouting_df
        )
        
        self.logger.info("Business analytics report generated successfully")
        return report

Generate strategic business recommendations

In [28]:
def _generate_strategic_recommendations(self, results_df: pd.DataFrame,
                                          competitive_df: Optional[pd.DataFrame],
                                          rerouting_df: Optional[pd.DataFrame]) -> List[str]:
    
        recommendations = []
        
        # Utilization-based recommendations
        space_perf = results_df.groupby('space_id')['occupancy_rate'].mean()
        low_util_spaces = (space_perf < 0.4).sum()
        if low_util_spaces > 0:
            recommendations.append(f"Implement promotional pricing for {low_util_spaces} underutilized spaces")
        
        # Competitive positioning
        if competitive_df is not None and not competitive_df.empty:
            expensive_ratio = (competitive_df['competitive_position'] == 'expensive').sum() / len(competitive_df)
            if expensive_ratio > 0.2:
                recommendations.append("Review pricing strategy - high proportion of expensive positioning")
        
        # Queue management
        high_queue_ratio = (results_df['queue_length'] > 2).sum() / len(results_df)
        if high_queue_ratio > 0.1:
            recommendations.append("Implement surge pricing for frequently congested spaces")
        
        # Rerouting optimization
        if rerouting_df is not None and not rerouting_df.empty:
            avg_savings = rerouting_df['savings'].mean()
            if avg_savings > 2:
                recommendations.append(f"Enhance rerouting system - ${avg_savings:.2f} average savings available")
        
        if not recommendations:
            recommendations = [
                "Continue monitoring competitive landscape",
                "Implement dynamic surge pricing during peak periods",
                "Consider loyalty programs for frequent users"
            ]
        
        return recommendations


Initialize enterprise parking pricing engine

In [29]:
def initialize_pricing_engine(base_price: float = 10.0) -> DynamicParkingPricingEngine:
    
    logger = logging.getLogger('ParkingPricingSystem')
    logger.setLevel(logging.INFO)
    
    engine = DynamicParkingPricingEngine(base_price=base_price, logger=logger)
    logger.info(f"Enterprise pricing engine initialized with base price: ${base_price}")
    return engine

Analytics module for pricing performance evaluation

In [32]:
class PricingAnalytics:

    
    @staticmethod
    def calculate_price_elasticity(results_df: pd.DataFrame) -> Dict[str, float]:
        #Calculate price elasticity of demand by space
        elasticity_metrics = {}
        
        for space_id in results_df['space_id'].unique():
            space_data = results_df[results_df['space_id'] == space_id].copy()
            
            if len(space_data) < 10:  # Need sufficient data points
                continue
                
            # Calculate price changes and occupancy changes
            space_data = space_data.sort_values('timestamp')
            space_data['price_change'] = space_data['price'].pct_change()
            space_data['occupancy_change'] = space_data['occupancy_rate'].pct_change()
            
            # Filter out extreme values and zeros
            valid_data = space_data[
                (abs(space_data['price_change']) < 0.3) & 
                (abs(space_data['occupancy_change']) < 0.5) &
                (space_data['price_change'] != 0)
            ]
            
            if len(valid_data) >= 5:
                try:
                    # Price elasticity = % change in quantity / % change in price
                    elasticity = (valid_data['occupancy_change'] / valid_data['price_change']).mean()
                    elasticity_metrics[space_id] = elasticity
                except:
                    continue
        
        return elasticity_metrics

Identify revenue optimization opportunities

In [None]:
@staticmethod
def analyze_revenue_optimization_opportunities(results_df: pd.DataFrame, 
                                                 competitive_df: Optional[pd.DataFrame] = None) -> Dict[str, Any]:
    
        opportunities = {}
        
        # Underpriced spaces with high demand
        high_demand_low_price = results_df[
            (results_df['occupancy_rate'] > 0.8) & 
            (results_df['price'] < results_df['price'].quantile(0.3))
        ]
        
        if not high_demand_low_price.empty:
            opportunities['underpriced_high_demand'] = {
                'space_count': high_demand_low_price['space_id'].nunique(),
                'avg_occupancy': high_demand_low_price['occupancy_rate'].mean(),
                'avg_price': high_demand_low_price['price'].mean(),
                'revenue_potential': high_demand_low_price['occupancy'].sum() * 
                                   (results_df['price'].median() - high_demand_low_price['price'].mean())
            }
        
        # Overpriced spaces with low demand
        low_demand_high_price = results_df[
            (results_df['occupancy_rate'] < 0.4) & 
            (results_df['price'] > results_df['price'].quantile(0.7))
        ]
        
        if not low_demand_high_price.empty:
            opportunities['overpriced_low_demand'] = {
                'space_count': low_demand_high_price['space_id'].nunique(),
                'avg_occupancy': low_demand_high_price['occupancy_rate'].mean(),
                'avg_price': low_demand_high_price['price'].mean(),
                'utilization_improvement_potential': (0.6 - low_demand_high_price['occupancy_rate'].mean()) * 100
            }
        
        # Competitive pricing gaps
        if competitive_df is not None and not competitive_df.empty:
            significant_gaps = competitive_df[abs(competitive_df['price_advantage']) > 3]
            if not significant_gaps.empty:
                opportunities['competitive_gaps'] = {
                    'space_count': significant_gaps['space_id'].nunique(),
                    'avg_price_gap': significant_gaps['price_advantage'].mean(),
                    'repositioning_required': (significant_gaps['price_advantage'] > 3).sum()
                }
        
        return opportunities

IndentationError: unexpected indent (3890460531.py, line 2)

Basic demand forecast using historical patterns

In [34]:
@staticmethod
    def generate_demand_forecast(results_df: pd.DataFrame, forecast_hours: int = 24) -> pd.DataFrame:
    
        forecast_data = []
        
        # Extract hourly patterns
        results_df['hour'] = pd.to_datetime(results_df['timestamp']).dt.hour
        hourly_patterns = results_df.groupby(['space_id', 'hour']).agg({
            'occupancy_rate': ['mean', 'std'],
            'price': 'mean'
        }).reset_index()
        
        # Generate forecast
        base_time = pd.to_datetime(results_df['timestamp'].max())
        
        for space_id in results_df['space_id'].unique():
            space_patterns = hourly_patterns[hourly_patterns['space_id'] == space_id]
            
            for h in range(forecast_hours):
                forecast_time = base_time + timedelta(hours=h+1)
                hour = forecast_time.hour
                
                # Find historical pattern for this hour
                hour_pattern = space_patterns[space_patterns['hour'] == hour]
                
                if not hour_pattern.empty:
                    predicted_occupancy = hour_pattern[('occupancy_rate', 'mean')].iloc[0]
                    occupancy_std = hour_pattern[('occupancy_rate', 'std')].iloc[0]
                    suggested_price = hour_pattern[('price', 'mean')].iloc[0]
                else:
                    # Use daily average if no pattern found
                    space_data = results_df[results_df['space_id'] == space_id]
                    predicted_occupancy = space_data['occupancy_rate'].mean()
                    occupancy_std = space_data['occupancy_rate'].std()
                    suggested_price = space_data['price'].mean()
                
                forecast_data.append({
                    'space_id': space_id,
                    'forecast_time': forecast_time,
                    'predicted_occupancy_rate': predicted_occupancy,
                    'confidence_interval': occupancy_std,
                    'suggested_price': suggested_price
                })
        
        return pd.DataFrame(forecast_data)

IndentationError: unexpected indent (1360050488.py, line 2)

Report generation

In [None]:
class ReportGenerator:
   
    
    @staticmethod
    def generate_executive_summary(report_data: Dict[str, Any]) -> str:
        # executive summary 
        
        summary = []
        summary.append("EXECUTIVE SUMMARY - DYNAMIC PARKING PRICING ANALYTICS")
        summary.append("=" * 60)
        
        # Key performance indicators
        pricing_perf = report_data.get('pricing_performance', {})
        revenue_opt = report_data.get('revenue_optimization', {})
        util_metrics = report_data.get('utilization_metrics', {})
        
        summary.append(f"\nKEY PERFORMANCE INDICATORS:")
        summary.append(f"• Average Parking Price: ${pricing_perf.get('avg_price', 0):.2f}")
        summary.append(f"• Revenue Efficiency: {revenue_opt.get('revenue_efficiency_score', 0):.1%}")
        summary.append(f"• Capacity Utilization: {util_metrics.get('avg_occupancy_rate', 0):.1%}")
        summary.append(f"• Price Stability Index: {pricing_perf.get('price_stability_index', 0):.1%}")
        
        # Competitive positioning
        comp_intel = report_data.get('competitive_intelligence', {})
        if comp_intel:
            summary.append(f"\nCOMPETITIVE POSITIONING:")
            summary.append(f"• Average Price Advantage: ${comp_intel.get('avg_price_advantage', 0):.2f}")
            summary.append(f"• Competitive Pressure Events: {comp_intel.get('competitive_pressure_events', 0):,}")
        
        # Strategic priorities
        recommendations = report_data.get('recommendations', [])
        if recommendations:
            summary.append(f"\nSTRATEGIC PRIORITIES:")
            for i, rec in enumerate(recommendations[:3], 1):
                summary.append(f"• {rec}")
        
        return "\n".join(summary)
    

Exporting analysis results to CSV files

In [None]:
@staticmethod
    def export_to_csv(results_df: pd.DataFrame, competitive_df: Optional[pd.DataFrame] = None,
                     rerouting_df: Optional[pd.DataFrame] = None, filename_prefix: str = "parking_analytics") -> List[str]:
        
        
        exported_files = []
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        # Main results
        results_filename = f"{filename_prefix}_results_{timestamp}.csv"
        results_df.to_csv(results_filename, index=False)
        exported_files.append(results_filename)
        
        # Competitive analysis
        if competitive_df is not None and not competitive_df.empty:
            comp_filename = f"{filename_prefix}_competitive_{timestamp}.csv"
            competitive_df.to_csv(comp_filename, index=False)
            exported_files.append(comp_filename)
        
        # Rerouting events
        if rerouting_df is not None and not rerouting_df.empty:
            rerouting_filename = f"{filename_prefix}_rerouting_{timestamp}.csv"
            rerouting_df.to_csv(rerouting_filename, index=False)
            exported_files.append(rerouting_filename)
        
        return exported_files


Function executions

In [None]:
def main():

    
    # Initialize engine
    engine = initialize_pricing_engine(base_price=12.0)
    
    print("DYNAMIC PARKING PRICING SYSTEM")
    print(" Analytics & Competitive Intelligence Platform")
    print("\nSYSTEM CAPABILITIES:")
    print("✓ Real-time competitive price monitoring")
    print("✓ Geographic proximity analysis (Haversine formula)")
    print("✓ Intelligent customer rerouting for capacity optimization")
    print("✓ Historical demand pattern recognition")
    print("✓ Revenue optimization with price elasticity analysis")
    print("✓ Executive reporting and business intelligence")
    
    print(f"\nTo run analysis:")
    print(f"  df = pd.read_csv('parking_data.csv')")
    print(f"  results, rerouting, competitive = engine.simulate_real_time_pricing(df)")
    print(f"  report = engine.generate_business_analytics_report(results, rerouting, competitive)")
    print(f"  summary = ReportGenerator.generate_executive_summary(report)")
    
    return engine


if __name__ == "__main__":
    pricing_engine = main()