In [1]:
# symbolic_rules.py
# Symbolic Reasoning Engine for Countermeasure Suggestion (adapted for simplified NeSy project)

class SymbolicCountermeasureSuggester:
    def __init__(self, knowledge_base):
        self.kb = knowledge_base

    def get_threat_details(self, threat_type_id):
        return self.kb["threat_types_details"].get(threat_type_id)

    def suggest_countermeasures(self, threat_forecast):
        threat_type_id = threat_forecast.get("threat_type_id")
        if not threat_type_id:
            return {"error": "Threat type ID missing in forecast."}

        threat_details = self.get_threat_details(threat_type_id)
        if not threat_details:
            return {"error": f"Unknown threat type: {threat_type_id}"}

        suggestions = []
        for cm_id, cm_data in self.kb["countermeasures"].items():
            if threat_type_id in cm_data.get("mitigates_threats", []):
                cm_score = cm_data.get("effectiveness_score", 0.5)

                # Apply RoE modifiers
                priority_mod = 1.0
                if threat_details.get("urgency") == "critical" and threat_details.get("potential_impact") == "severe":
                    priority_mod *= self.kb["rules_of_engagement"][0].get("priority_modifier", 1.0)

                cm_score *= (threat_forecast.get("predicted_prob", 0.5) + 0.5)
                cm_score *= priority_mod

                if threat_forecast.get("predicted_prob", 1.0) < 0.6:
                    if cm_data.get("ethical_impact") not in ["low", "very_low"] or \
                       "low" not in cm_data.get("resources_needed", ["personnel_high"])[0]:
                        continue

                suggestions.append({
                    "id": cm_id,
                    "name": cm_data["name"],
                    "base_effectiveness": cm_data.get("effectiveness_score", 0.5),
                    "calculated_score": round(cm_score, 3),
                    "ethical_impact": cm_data.get("ethical_impact"),
                    "legality_check": cm_data.get("legality_check"),
                    "resources": cm_data.get("resources_needed")
                })

        suggestions.sort(key=lambda x: x["calculated_score"], reverse=True)

        explanation = f"Countermeasures for '{threat_type_id}' (Prob: {threat_forecast.get('predicted_prob', 'N/A'):.2f}). "
        explanation += f"Threat urgency: {threat_details.get('urgency')}, Potential Impact: {threat_details.get('potential_impact')}. "
        if threat_forecast.get("predicted_prob", 1.0) < 0.6:
            explanation += "Low probability forecast, favoring precautionary measures."

        return {"suggestions": suggestions, "explanation": explanation.strip()}

# Static KB
KNOWLEDGE_BASE = {
    "threat_types_details": {
        "IED_urban": {"urgency": "high", "potential_impact": "severe", "indicators": ["suspicious_package", "chatter_increase"]},
        "armed_assault_gov": {"urgency": "critical", "potential_impact": "severe", "indicators": ["weapon_sighting", "reconnaissance_activity"]},
        "cyber_attack_infra": {"urgency": "high", "potential_impact": "high", "indicators": ["phishing_attempts", "network_anomaly"]},
        "propaganda_online": {"urgency": "medium", "potential_impact": "moderate", "indicators": ["fake_news_spike", "hate_speech_increase"]}
    },
    "countermeasures": {
        "cm_001": {"name": "Increased Patrols", "resources_needed": ["personnel_low"], "mitigates_threats": ["IED_urban", "armed_assault_gov"], "effectiveness_score": 0.6, "ethical_impact": "low", "legality_check": "standard_procedure"},
        "cm_002": {"name": "EOD Checkpoints", "resources_needed": ["eod_team"], "mitigates_threats": ["IED_urban"], "effectiveness_score": 0.85, "ethical_impact": "medium_low", "legality_check": "requires_justification"},
        "cm_003": {"name": "Cybersecurity Monitoring", "resources_needed": ["cyber_team"], "mitigates_threats": ["cyber_attack_infra"], "effectiveness_score": 0.75, "ethical_impact": "low", "legality_check": "privacy_compliant"},
        "cm_004": {"name": "Public Awareness Campaign", "resources_needed": ["media_team"], "mitigates_threats": ["propaganda_online", "IED_urban"], "effectiveness_score": 0.5, "ethical_impact": "very_low", "legality_check": "standard_procedure"},
        "cm_005": {"name": "Targeted Intel Operation", "resources_needed": ["intel_team"], "mitigates_threats": ["armed_assault_gov", "IED_urban"], "effectiveness_score": 0.9, "ethical_impact": "high", "legality_check": "strict_oversight"},
        "cm_006": {"name": "Community Engagement", "resources_needed": ["community_liaisons"], "mitigates_threats": ["propaganda_online"], "effectiveness_score": 0.65, "ethical_impact": "positive", "legality_check": "standard_procedure"}
    },
    "rules_of_engagement": [
        {"id": "ROE_01", "condition": "critical & severe", "action": "High effectiveness prioritized", "priority_modifier": 1.5},
        {"id": "ROE_02", "condition": "propaganda_online", "action": "Prefer media & engagement", "cm_preference": ["cm_004", "cm_006"]},
        {"id": "ROE_03", "condition": "prob < 0.6", "action": "Low-intensity monitoring first", "initial_cm_filter": "low_resource_low_ethical_impact"}
    ]
}

# Optional test case
def test_suggestion():
    suggester = SymbolicCountermeasureSuggester(KNOWLEDGE_BASE)
    test_forecast = {"threat_type_id": "IED_urban", "predicted_prob": 0.8}
    result = suggester.suggest_countermeasures(test_forecast)
    print(result['explanation'])
    for cm in result['suggestions']:
        print(f"- {cm['name']} | Score: {cm['calculated_score']}")

if __name__ == '__main__':
    test_suggestion()

Countermeasures for 'IED_urban' (Prob: 0.80). Threat urgency: high, Potential Impact: severe.
- Targeted Intel Operation | Score: 1.17
- EOD Checkpoints | Score: 1.105
- Increased Patrols | Score: 0.78
- Public Awareness Campaign | Score: 0.65
