In [None]:
import logging
from typing import List, Dict, Tuple, Any
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
import json

In [None]:
class RiskAssessmentModel:
    def __init__(self, config_path: str = None):
        """
        Initializes the RiskAssessmentModel.

        Args:
            config_path (str, optional): Path to a JSON configuration file defining risk parameters.
                                         Defaults to None.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info("Initializing Risk Assessment Model")

        # Initialize any required components (e.g., scalers, models)
        self.scaler = StandardScaler()

        # Load risk configuration if provided
        if config_path:
            try:
                with open(config_path, 'r') as file:
                    self.risk_config = json.load(file)
                self.logger.info("Risk configuration loaded from %s", config_path)
            except FileNotFoundError:
                self.logger.error("Configuration file not found at %s", config_path)
                self.risk_config = {}
            except json.JSONDecodeError as e:
                self.logger.error("Error decoding JSON from config file: %s", e)
                self.risk_config = {}
        else:
            self.risk_config = {}
            self.logger.warning("No configuration file provided. Using default risk parameters.")


In [None]:
def assess(self, enemy_coas: List[str], terrain_features: Dict[str, Any], weather_data: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Assesses the risk associated with predicted enemy COAs.

        Args:
            enemy_coas (List[str]): Predicted enemy courses of action.
            terrain_features (Dict[str, Any]): Extracted terrain features.
            weather_data (Dict[str, Any]): Current and forecast weather data.

        Returns:
            List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents
                                   a COA and its associated risk score and factors.
        """
        self.logger.info("Assessing Risks for COAs")
        risk_assessments = []

        for coa in enemy_coas:
            risk_score, risk_factors = self.calculate_risk(coa, terrain_features, weather_data)
            risk_assessments.append(
                {
                    "coa": coa,
                    "risk_score": risk_score,
                    "risk_factors": risk_factors,
                }
            )

            return risk_assessments

In [None]:
def calculate_risk(self, coa: str, terrain_features: Dict[str, Any], weather_data: Dict[str, Any]) -> Tuple[float, Dict[str, float]]:
        """
        Calculates the risk score for a specific COA.

        Args:
            coa (str): The enemy course of action.
            terrain_features (Dict[str, Any]): Terrain features relevant to the COA.
            weather_data (Dict[str, Any]): Weather data relevant to the COA.

        Returns:
            Tuple[float, Dict[str, float]]: (risk_score, risk_factors)
                - risk_score (float): The calculated risk score.
                - risk_factors (Dict[str, float]): A dictionary of factors contributing to the risk.
        """
        self.logger.debug("Calculating risk for COA: %s", coa)
        risk_factors = {}

In [None]:
# Load risk parameters from configuration or use defaults
terrain_risk_weight = self.risk_config.get("terrain_risk_weight", 0.5)
weather_risk_weight = self.risk_config.get("weather_risk_weight", 0.3)
coa_risk_weight = self.risk_config.get("coa_risk_weight", 0.2)

In [None]:
# 1. Terrain Risk
terrain_risk = self.evaluate_terrain_risk(coa, terrain_features)
risk_factors["terrain_risk"] = terrain_risk * terrain_risk_weight


In [None]:
# 2. Weather Risk
weather_risk = self.evaluate_weather_risk(weather_data)
risk_factors["weather_risk"] = weather_risk * weather_risk_weight


In [None]:
# 3. COA-Specific Risk
coa_risk = self.evaluate_coa_specific_risk(coa)
risk_factors["coa_risk"] = coa_risk * coa_risk_weight

# --- Combine Risk Factors ---
total_risk = sum(risk_factors.values())

# --- Normalize Risk Score (0-1) ---
risk_score = self.normalize_risk(total_risk)

self.logger.debug("Risk Assessment for COA '%s': Score=%.4f, Factors=%s", coa, risk_score, risk_factors)

return risk_score, risk_factors

In [None]:
def evaluate_terrain_risk(self, coa: str, terrain_features: Dict[str, Any]) -> float:
        """
        Evaluates terrain-related risk for a given COA.

        Args:
            coa (str): The enemy course of action.
            terrain_features (Dict[str, Any]): Terrain features relevant to the COA.

        Returns:
            float: Terrain risk score.
        """
        try:
            if coa in self.risk_config.get("terrain_risk_params", {}):
                params = self.risk_config["terrain_risk_params"][coa]
                slope = terrain_features.get("slope", 0)  # Default to 0 if missing
                # Example calculation: weighted slope impact
                terrain_risk = (slope / params.get("slope_max", 10)) * params.get("slope_weight", 0.5)
            else:
                terrain_risk = self.risk_config.get("default_terrain_risk", 0.1)
            return terrain_risk
        except Exception as e:
            self.logger.error("Error evaluating terrain risk for COA '%s': %s", coa, e)
            return self.risk_config.get("default_terrain_risk", 0.1)

In [None]:
def evaluate_weather_risk(self, weather_data: Dict[str, Any]) -> float:
        """
        Evaluates weather-related risk.

        Args:
            weather_data (Dict[str, Any]): Current and forecast weather data.

        Returns:
            float: Weather risk score.
        """
        try:
            rain = weather_data.get("rain", False)
            visibility = weather_data.get("visibility", 10)  # Default visibility

            if rain:
                weather_risk = 0.2
            else:
                weather_risk = 0.05

            # Additional weather factors can be incorporated here

            return weather_risk
        except Exception as e:
            self.logger.error("Error evaluating weather risk: %s", e)
            return self.risk_config.get("default_weather_risk", 0.05)

In [None]:
def evaluate_coa_specific_risk(self, coa: str) -> float:
        """
        Evaluates COA-specific risk.

        Args:
            coa (str): The enemy course of action.

        Returns:
            float: COA-specific risk score.
        """
        try:
            coa_risk = self.risk_config.get("coa_specific_risks", {}).get(coa, 0.1)
            return coa_risk
        except Exception as e:
            self.logger.error("Error evaluating COA-specific risk for COA '%s': %s", coa, e)
            return self.risk_config.get("default_coa_risk", 0.1)

In [None]:
def normalize_risk(self, total_risk: float) -> float:
        """
        Normalizes the total risk score to a 0-1 scale.

        Args:
            total_risk (float): The combined risk score.

        Returns:
            float: Normalized risk score.
        """
        # Define minimum and maximum possible risk scores based on risk factor weights
        min_risk = 0.0
        max_risk = sum([
            self.risk_config.get("terrain_risk_weight", 0.5),
            self.risk_config.get("weather_risk_weight", 0.3),
            self.risk_config.get("coa_risk_weight", 0.2)
        ])

        # Normalize and clamp between 0 and 1
        normalized_risk = (total_risk - min_risk) / (max_risk - min_risk) if max_risk > min_risk else 0.0
        normalized_risk = max(min(normalized_risk, 1.0), 0.0)
        return normalized_risk