# Phase-3.5: Decision Logic - Threat Classification

**Objective:** Convert quantum-inspired threat probabilities into binary attack/normal classifications with multi-tier severity levels and confidence-based thresholds.

**Input:** ThreatHypothesis from Phase-3.4 (probability, confidence, attack_type)  
**Output:** Binary classification + severity level + actionable decision

**Key Features:**
- Multi-threshold decision framework (critical/high/medium/low)
- Confidence-aware classification (high confidence ‚Üí lower threshold)
- Severity scoring based on attack type and probability
- Actionable recommendations for each decision

In [1]:
import numpy as np
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
from enum import Enum

# Mock ThreatHypothesis for standalone execution
@dataclass
class ThreatHypothesis:
    attack_type: str
    amplitude: float
    probability: float
    confidence: float
    evidence_count: int
    recurrence_score: float

## 1. Decision Framework Data Structures

In [2]:
class SeverityLevel(Enum):
    """Threat severity classification"""
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    BENIGN = "BENIGN"

class AttackCategory(Enum):
    """Attack type categorization for severity mapping"""
    EXFILTRATION = ["backdoor", "ddos", "injection", "password", "xss"]
    RECONNAISSANCE = ["scanning", "fingerprinting"]
    DENIAL_OF_SERVICE = ["dos", "ddos"]
    MALWARE = ["ransomware", "mitm"]

@dataclass
class ThreatDecision:
    """Final decision output"""
    is_attack: bool                    # Binary classification
    severity: SeverityLevel            # Severity tier
    attack_type: Optional[str]         # Predicted attack type (if attack)
    probability: float                 # Threat probability from Phase-3.4
    confidence: float                  # Confidence score from Phase-3.4
    decision_threshold: float          # Threshold used for classification
    recommendation: str                # Actionable recommendation
    evidence_summary: str              # Brief evidence description

## 2. Core Decision Logic Functions

In [3]:
def get_adaptive_threshold(confidence: float, 
                           base_threshold: float = 0.50,
                           confidence_adjustment: float = 0.20) -> float:
    """
    Compute adaptive decision threshold based on confidence level.
    
    High confidence ‚Üí Lower threshold needed (strong evidence)
    Low confidence ‚Üí Higher threshold needed (weak evidence)
    
    Args:
        confidence: Confidence score [0, 1]
        base_threshold: Base probability threshold (default 0.50)
        confidence_adjustment: Max adjustment range (default ¬±0.20)
    
    Returns:
        Adaptive threshold [base - adjustment, base + adjustment]
    
    Example:
        confidence=0.9 ‚Üí threshold=0.50 - 0.9*0.20 = 0.32 (easier to classify)
        confidence=0.3 ‚Üí threshold=0.50 + 0.7*0.20 = 0.64 (harder to classify)
    """
    # High confidence reduces threshold, low confidence increases it
    # adjustment = confidence * confidence_adjustment (how much to REDUCE from base)
    adjustment = confidence * confidence_adjustment
    adaptive_threshold = base_threshold - adjustment
    
    # Clamp to reasonable range [0.2, 0.8]
    adaptive_threshold = np.clip(adaptive_threshold, 0.20, 0.80)
    
    return float(adaptive_threshold)

In [4]:
def compute_severity_score(attack_type: str, 
                           probability: float, 
                           confidence: float,
                           recurrence_score: float) -> float:
    """
    Compute severity score combining multiple factors.
    
    Formula: 0.4√óprobability + 0.3√óconfidence + 0.2√órecurrence + 0.1√óattack_weight
    
    Args:
        attack_type: Type of attack detected
        probability: Threat probability [0, 1]
        confidence: Confidence score [0, 1]
        recurrence_score: Temporal recurrence [0, 1]
    
    Returns:
        Severity score [0, 1]
    """
    # Attack type severity weights
    attack_weights = {
        # Critical attacks (data exfiltration, system compromise)
        'backdoor': 1.0,
        'ransomware': 1.0,
        'injection': 0.95,
        'password': 0.95,
        'mitm': 0.90,
        
        # High severity attacks
        'ddos': 0.85,
        'xss': 0.80,
        
        # Medium severity
        'scanning': 0.60,
        'fingerprinting': 0.55,
        
        # Default
        'unknown': 0.70
    }
    
    attack_weight = attack_weights.get(attack_type.lower(), 0.70)
    
    # Weighted severity score
    severity_score = (
        0.4 * probability +
        0.3 * confidence +
        0.2 * recurrence_score +
        0.1 * attack_weight
    )
    
    return float(severity_score)

In [5]:
def classify_severity(severity_score: float) -> SeverityLevel:
    """
    Map severity score to severity level tier.
    
    Tiers:
        CRITICAL: ‚â•0.80
        HIGH:     ‚â•0.65
        MEDIUM:   ‚â•0.45
        LOW:      ‚â•0.25
        BENIGN:   <0.25
    """
    if severity_score >= 0.80:
        return SeverityLevel.CRITICAL
    elif severity_score >= 0.65:
        return SeverityLevel.HIGH
    elif severity_score >= 0.45:
        return SeverityLevel.MEDIUM
    elif severity_score >= 0.25:
        return SeverityLevel.LOW
    else:
        return SeverityLevel.BENIGN

In [6]:
def generate_recommendation(severity: SeverityLevel, 
                            attack_type: Optional[str],
                            probability: float) -> str:
    """Generate actionable recommendation based on severity level."""
    
    recommendations = {
        SeverityLevel.CRITICAL: (
            f"üö® IMMEDIATE ACTION REQUIRED: "
            f"Block traffic, isolate affected systems, initiate incident response. "
            f"Detected {attack_type} with {probability*100:.1f}% probability."
        ),
        SeverityLevel.HIGH: (
            f"‚ö†Ô∏è HIGH PRIORITY: "
            f"Investigate traffic patterns, enable enhanced monitoring, "
            f"prepare containment measures. {attack_type} detected ({probability*100:.1f}%)."
        ),
        SeverityLevel.MEDIUM: (
            f"‚ö° MONITOR CLOSELY: "
            f"Log traffic for analysis, increase alert sensitivity, "
            f"validate with additional sensors. Potential {attack_type} ({probability*100:.1f}%)."
        ),
        SeverityLevel.LOW: (
            f"üëÄ WATCH: "
            f"Continue normal monitoring, log suspicious patterns. "
            f"Low-confidence {attack_type} indicator ({probability*100:.1f}%)."
        ),
        SeverityLevel.BENIGN: (
            f"‚úÖ NORMAL: "
            f"No immediate action required. Traffic appears benign."
        )
    }
    
    return recommendations.get(severity, "Unknown severity level")

## 3. Main Decision Function

In [7]:
def make_threat_decision(threat_hypotheses: Dict[str, ThreatHypothesis],
                         base_threshold: float = 0.50,
                         min_confidence: float = 0.40) -> ThreatDecision:
    """
    Make final attack/normal classification based on threat hypotheses.
    
    Decision Logic:
    1. Select highest probability threat
    2. Compute adaptive threshold based on confidence
    3. Compare probability vs. threshold
    4. If probability > threshold AND confidence > min_confidence ‚Üí ATTACK
    5. Compute severity score and classify
    6. Generate recommendation
    
    Args:
        threat_hypotheses: Output from Phase-3.4 quantum fusion
        base_threshold: Base probability threshold (default 0.50)
        min_confidence: Minimum confidence required (default 0.40)
    
    Returns:
        ThreatDecision with classification, severity, and recommendation
    """
    # Edge case: No threats detected
    if not threat_hypotheses:
        return ThreatDecision(
            is_attack=False,
            severity=SeverityLevel.BENIGN,
            attack_type=None,
            probability=0.0,
            confidence=0.0,
            decision_threshold=base_threshold,
            recommendation="‚úÖ No threats detected. Traffic appears normal.",
            evidence_summary="No behavioral matches found in knowledge base."
        )
    
    # Step 1: Select highest probability threat
    top_threat = max(
        threat_hypotheses.values(),
        key=lambda t: t.probability
    )
    
    # Step 2: Compute adaptive threshold
    adaptive_threshold = get_adaptive_threshold(
        top_threat.confidence,
        base_threshold
    )
    
    # Step 3: Binary classification
    is_attack = (
        top_threat.probability > adaptive_threshold and
        top_threat.confidence >= min_confidence
    )
    
    # Step 4: Compute severity
    if is_attack:
        severity_score = compute_severity_score(
            top_threat.attack_type,
            top_threat.probability,
            top_threat.confidence,
            top_threat.recurrence_score
        )
        severity = classify_severity(severity_score)
        attack_type = top_threat.attack_type
        
        evidence_summary = (
            f"{top_threat.evidence_count} matches, "
            f"{top_threat.recurrence_score*100:.1f}% recurrence, "
            f"{top_threat.confidence*100:.1f}% confidence"
        )
    else:
        severity = SeverityLevel.BENIGN
        attack_type = None
        evidence_summary = (
            f"Probability {top_threat.probability*100:.1f}% below "
            f"threshold {adaptive_threshold*100:.1f}% or confidence too low"
        )
    
    # Step 5: Generate recommendation
    recommendation = generate_recommendation(
        severity,
        attack_type,
        top_threat.probability
    )
    
    return ThreatDecision(
        is_attack=is_attack,
        severity=severity,
        attack_type=attack_type,
        probability=float(top_threat.probability),
        confidence=float(top_threat.confidence),
        decision_threshold=float(adaptive_threshold),
        recommendation=recommendation,
        evidence_summary=evidence_summary
    )

## 4. Utility Functions

In [8]:
def print_decision_summary(decision: ThreatDecision):
    """Pretty-print threat decision."""
    
    print("=" * 70)
    print("THREAT DECISION SUMMARY")
    print("=" * 70)
    
    # Classification
    status = "üî¥ ATTACK DETECTED" if decision.is_attack else "üü¢ NORMAL TRAFFIC"
    print(f"\nStatus:           {status}")
    print(f"Severity:         {decision.severity.value}")
    
    if decision.attack_type:
        print(f"Attack Type:      {decision.attack_type.upper()}")
    
    print(f"\nProbability:      {decision.probability*100:.2f}%")
    print(f"Confidence:       {decision.confidence*100:.2f}%")
    print(f"Threshold Used:   {decision.decision_threshold*100:.2f}%")
    
    print(f"\nEvidence:         {decision.evidence_summary}")
    print(f"\nRecommendation:")
    print(f"  {decision.recommendation}")
    
    print("=" * 70)

In [9]:
def batch_decision(threat_hypotheses_list: List[Dict[str, ThreatHypothesis]],
                   base_threshold: float = 0.50) -> List[ThreatDecision]:
    """
    Process multiple threat hypotheses in batch.
    
    Useful for:
    - Real-time stream processing (multiple flows)
    - Batch evaluation of time windows
    - Performance benchmarking
    """
    decisions = []
    for threat_hypotheses in threat_hypotheses_list:
        decision = make_threat_decision(threat_hypotheses, base_threshold)
        decisions.append(decision)
    
    return decisions

## 5. Demonstration

In [10]:
# Demo 1: High-confidence backdoor attack (should classify as CRITICAL)
print("=" * 70)
print("DEMO 1: High-Confidence Backdoor Attack")
print("=" * 70)

threat_demo1 = {
    'backdoor': ThreatHypothesis(
        attack_type='backdoor',
        amplitude=0.85,
        probability=0.78,
        confidence=0.88,
        evidence_count=25,
        recurrence_score=0.80
    ),
    'scanning': ThreatHypothesis(
        attack_type='scanning',
        amplitude=0.35,
        probability=0.22,
        confidence=0.65,
        evidence_count=8,
        recurrence_score=0.40
    )
}

decision1 = make_threat_decision(threat_demo1, base_threshold=0.50)
print_decision_summary(decision1)
print()

DEMO 1: High-Confidence Backdoor Attack
THREAT DECISION SUMMARY

Status:           üî¥ ATTACK DETECTED
Severity:         CRITICAL
Attack Type:      BACKDOOR

Probability:      78.00%
Confidence:       88.00%
Threshold Used:   32.40%

Evidence:         25 matches, 80.0% recurrence, 88.0% confidence

Recommendation:
  üö® IMMEDIATE ACTION REQUIRED: Block traffic, isolate affected systems, initiate incident response. Detected backdoor with 78.0% probability.



In [11]:
# Demo 2: Low-confidence potential attack (should classify as BENIGN)
print("=" * 70)
print("DEMO 2: Low-Confidence Attack Signal")
print("=" * 70)

threat_demo2 = {
    'ddos': ThreatHypothesis(
        attack_type='ddos',
        amplitude=0.60,
        probability=0.55,
        confidence=0.35,  # Below min_confidence threshold
        evidence_count=5,
        recurrence_score=0.25
    )
}

decision2 = make_threat_decision(threat_demo2, base_threshold=0.50, min_confidence=0.40)
print_decision_summary(decision2)
print()

DEMO 2: Low-Confidence Attack Signal
THREAT DECISION SUMMARY

Status:           üü¢ NORMAL TRAFFIC
Severity:         BENIGN

Probability:      55.00%
Confidence:       35.00%
Threshold Used:   43.00%

Evidence:         Probability 55.0% below threshold 43.0% or confidence too low

Recommendation:
  ‚úÖ NORMAL: No immediate action required. Traffic appears benign.



In [12]:
# Demo 3: Medium-severity scanning activity
print("=" * 70)
print("DEMO 3: Medium-Severity Reconnaissance")
print("=" * 70)

threat_demo3 = {
    'scanning': ThreatHypothesis(
        attack_type='scanning',
        amplitude=0.68,
        probability=0.62,
        confidence=0.72,
        evidence_count=15,
        recurrence_score=0.55
    ),
    'fingerprinting': ThreatHypothesis(
        attack_type='fingerprinting',
        amplitude=0.45,
        probability=0.38,
        confidence=0.68,
        evidence_count=10,
        recurrence_score=0.45
    )
}

decision3 = make_threat_decision(threat_demo3, base_threshold=0.50)
print_decision_summary(decision3)
print()

DEMO 3: Medium-Severity Reconnaissance
THREAT DECISION SUMMARY

Status:           üî¥ ATTACK DETECTED
Severity:         MEDIUM
Attack Type:      SCANNING

Probability:      62.00%
Confidence:       72.00%
Threshold Used:   35.60%

Evidence:         15 matches, 55.0% recurrence, 72.0% confidence

Recommendation:
  ‚ö° MONITOR CLOSELY: Log traffic for analysis, increase alert sensitivity, validate with additional sensors. Potential scanning (62.0%).



In [13]:
# Demo 4: Empty threats (benign traffic)
print("=" * 70)
print("DEMO 4: No Threats Detected")
print("=" * 70)

threat_demo4 = {}

decision4 = make_threat_decision(threat_demo4, base_threshold=0.50)
print_decision_summary(decision4)
print()

DEMO 4: No Threats Detected
THREAT DECISION SUMMARY

Status:           üü¢ NORMAL TRAFFIC
Severity:         BENIGN

Probability:      0.00%
Confidence:       0.00%
Threshold Used:   50.00%

Evidence:         No behavioral matches found in knowledge base.

Recommendation:
  ‚úÖ No threats detected. Traffic appears normal.



## 6. Threshold Analysis

In [14]:
# Demonstrate adaptive threshold behavior
print("=" * 70)
print("ADAPTIVE THRESHOLD ANALYSIS")
print("=" * 70)

confidences = [0.30, 0.50, 0.70, 0.90]
base_threshold = 0.50

print(f"\nBase Threshold: {base_threshold*100:.0f}%")
print(f"Confidence Adjustment Range: ¬±20%\n")

for conf in confidences:
    adaptive_thresh = get_adaptive_threshold(conf, base_threshold)
    print(f"Confidence {conf*100:.0f}% ‚Üí Threshold {adaptive_thresh*100:.1f}%")

print("\nüí° Insight: Higher confidence allows easier classification (lower threshold)")
print("   Lower confidence requires stronger evidence (higher threshold)")
print()

ADAPTIVE THRESHOLD ANALYSIS

Base Threshold: 50%
Confidence Adjustment Range: ¬±20%

Confidence 30% ‚Üí Threshold 44.0%
Confidence 50% ‚Üí Threshold 40.0%
Confidence 70% ‚Üí Threshold 36.0%
Confidence 90% ‚Üí Threshold 32.0%

üí° Insight: Higher confidence allows easier classification (lower threshold)
   Lower confidence requires stronger evidence (higher threshold)



## 7. Integration with Phase-3.4

In [15]:
# Full pipeline example: Phase-3.4 ‚Üí Phase-3.5
print("=" * 70)
print("FULL PIPELINE: Phase-3.4 Quantum Fusion ‚Üí Phase-3.5 Decision")
print("=" * 70)

# Simulate Phase-3.4 output
quantum_output = {
    'backdoor': ThreatHypothesis(
        attack_type='backdoor',
        amplitude=0.861,
        probability=0.742,
        confidence=0.756,
        evidence_count=15,
        recurrence_score=0.75
    ),
    'ddos': ThreatHypothesis(
        attack_type='ddos',
        amplitude=0.508,
        probability=0.258,
        confidence=0.684,
        evidence_count=8,
        recurrence_score=0.40
    )
}

print("\nüì• INPUT (Phase-3.4 Quantum Fusion):")
for attack_type, hyp in quantum_output.items():
    print(f"  {attack_type}: prob={hyp.probability*100:.1f}%, conf={hyp.confidence*100:.1f}%")

# Apply decision logic
final_decision = make_threat_decision(quantum_output, base_threshold=0.50)

print("\nüì§ OUTPUT (Phase-3.5 Decision Logic):")
print_decision_summary(final_decision)

FULL PIPELINE: Phase-3.4 Quantum Fusion ‚Üí Phase-3.5 Decision

üì• INPUT (Phase-3.4 Quantum Fusion):
  backdoor: prob=74.2%, conf=75.6%
  ddos: prob=25.8%, conf=68.4%

üì§ OUTPUT (Phase-3.5 Decision Logic):
THREAT DECISION SUMMARY

Status:           üî¥ ATTACK DETECTED
Severity:         HIGH
Attack Type:      BACKDOOR

Probability:      74.20%
Confidence:       75.60%
Threshold Used:   34.88%

Evidence:         15 matches, 75.0% recurrence, 75.6% confidence

Recommendation:
  ‚ö†Ô∏è HIGH PRIORITY: Investigate traffic patterns, enable enhanced monitoring, prepare containment measures. backdoor detected (74.2%).


## 8. Summary

**Phase-3.5 Decision Logic: Key Features**

‚úÖ **Adaptive Thresholding**: Confidence-based threshold adjustment (¬±20%)  
‚úÖ **Multi-Tier Severity**: CRITICAL/HIGH/MEDIUM/LOW/BENIGN classification  
‚úÖ **Attack-Type Weighting**: Different attacks have different severity weights  
‚úÖ **Actionable Recommendations**: Severity-specific response guidance  
‚úÖ **Batch Processing**: Support for real-time stream and batch evaluation  

**Next Steps:**
- Phase-3.6: Explainability Module (generate human-readable explanations)
- Phase-3.7: End-to-End Pipeline Integration
- Performance benchmarking with real ChromaDB data