# Advanced PWSA Features

This notebook demonstrates advanced Prompt Weapon System Awareness (PWSA) capabilities including:

1. Multi-sensor fusion
2. Pixel-level IR processing
3. Trajectory prediction
4. Threat prioritization
5. Real-time tracking

## Prerequisites

```bash
pip install requests numpy pandas matplotlib opencv-python scipy
```

In [None]:
import requests
import json
import time
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import display, clear_output
import cv2
from typing import List, Dict, Tuple

# Configuration
API_KEY = "your-api-key-here"
BASE_URL = "http://localhost:8080"
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

print("✓ Imports loaded")

## 1. Multi-Sensor Fusion

Combine data from multiple sensors (IR, radar, optical) for improved detection accuracy.

In [None]:
# Generate synthetic multi-sensor data
def generate_sensor_data(sv_id: int, num_targets: int = 3) -> Dict:
    """Generate synthetic sensor readings with multiple targets."""
    targets = []
    for i in range(num_targets):
        # Random position in sensor FOV
        azimuth = np.random.uniform(-30, 30)  # degrees
        elevation = np.random.uniform(-15, 15)  # degrees
        range_km = np.random.uniform(5, 50)  # km
        
        targets.append({
            "id": f"target_{i}",
            "azimuth": azimuth,
            "elevation": elevation,
            "range": range_km,
            "velocity": np.random.uniform(200, 800),  # m/s
            "ir_signature": np.random.uniform(0.6, 0.95),
            "radar_cross_section": np.random.uniform(0.1, 2.0),  # m²
        })
    
    return {
        "sv_id": sv_id,
        "timestamp": int(time.time()),
        "sensors": {
            "ir": {
                "frame_id": np.random.randint(1000, 9999),
                "targets": targets
            },
            "radar": {
                "scan_id": np.random.randint(1000, 9999),
                "targets": targets
            }
        }
    }

# Fuse sensor data
sensor_data = generate_sensor_data(sv_id=42, num_targets=5)

response = requests.post(
    f"{BASE_URL}/api/v1/pwsa/fuse",
    headers=headers,
    json=sensor_data
)

if response.status_code == 200:
    result = response.json()['data']
    print(f"Sensor Fusion Results:")
    print(f"  Fused Tracks: {result['num_tracks']}")
    print(f"  Fusion Quality: {result['fusion_quality']:.2%}")
    print(f"  Processing Time: {result['processing_time_ms']:.1f}ms\n")
    
    print("Track Details:")
    for track in result['tracks']:
        print(f"  Track {track['track_id']}:")
        print(f"    Position: ({track['position'][0]:.1f}, {track['position'][1]:.1f}, {track['position'][2]:.1f})")
        print(f"    Velocity: {track['velocity']:.0f} m/s")
        print(f"    Confidence: {track['confidence']:.2%}")
        print(f"    Threat Level: {track['threat_level']}\n")
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## 2. Pixel-Level IR Processing

Process raw IR frame pixels for detailed thermal analysis.

In [None]:
# Generate synthetic IR frame
def generate_ir_frame(width: int = 640, height: int = 480, num_hotspots: int = 3) -> np.ndarray:
    """Generate synthetic IR frame with thermal hotspots."""
    # Base thermal noise
    frame = np.random.normal(loc=128, scale=20, size=(height, width))
    
    # Add hotspots (potential threats)
    for _ in range(num_hotspots):
        cx = np.random.randint(50, width - 50)
        cy = np.random.randint(50, height - 50)
        size = np.random.randint(10, 30)
        intensity = np.random.uniform(200, 255)
        
        # Gaussian hotspot
        y, x = np.ogrid[-cy:height-cy, -cx:width-cx]
        mask = x*x + y*y <= size*size
        frame[mask] = intensity * np.exp(-(x[mask]**2 + y[mask]**2) / (2 * (size/2)**2))
    
    return np.clip(frame, 0, 255).astype(np.uint8)

# Generate and visualize IR frame
ir_frame = generate_ir_frame(num_hotspots=5)

plt.figure(figsize=(14, 5))
plt.subplot(1, 2, 1)
plt.imshow(ir_frame, cmap='hot', interpolation='nearest')
plt.title('Raw IR Frame')
plt.colorbar(label='Thermal Intensity')
plt.xlabel('X (pixels)')
plt.ylabel('Y (pixels)')

# Process frame via API
pixel_request = {
    "frame_id": int(time.time()),
    "width": ir_frame.shape[1],
    "height": ir_frame.shape[0],
    "pixels": ir_frame.flatten().tolist(),  # Flattened pixel array
    "processing_options": {
        "detect_hotspots": True,
        "compute_entropy": True,
        "apply_tda": True
    }
}

response = requests.post(
    f"{BASE_URL}/api/v1/pixels/process",
    headers=headers,
    json=pixel_request
)

if response.status_code == 200:
    result = response.json()['data']
    
    # Visualize processed frame
    processed_pixels = np.array(result['processed_pixels']).reshape(ir_frame.shape)
    plt.subplot(1, 2, 2)
    plt.imshow(processed_pixels, cmap='hot', interpolation='nearest')
    plt.title('Processed IR Frame')
    plt.colorbar(label='Enhanced Intensity')
    plt.xlabel('X (pixels)')
    plt.ylabel('Y (pixels)')
    
    # Mark detected hotspots
    for hotspot in result['hotspots']:
        plt.plot(hotspot['x'], hotspot['y'], 'c+', markersize=15, markeredgewidth=2)
    
    plt.tight_layout()
    plt.show()
    
    print(f"\nProcessing Results:")
    print(f"  Hotspots Detected: {len(result['hotspots'])}")
    print(f"  Frame Entropy: {result['entropy']:.3f}")
    print(f"  TDA Features: {len(result['tda_features'])} topological features")
    print(f"  Processing Time: {result['processing_time_ms']:.1f}ms")
    
    print("\nHotspot Details:")
    for i, hotspot in enumerate(result['hotspots']):
        print(f"  Hotspot {i+1}: ({hotspot['x']}, {hotspot['y']}) - Intensity: {hotspot['intensity']:.1f}")
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## 3. Trajectory Prediction

Predict future trajectory of detected threats.

In [None]:
# Historical track data
def generate_track_history(num_points: int = 10) -> List[Dict]:
    """Generate synthetic track history for a ballistic trajectory."""
    t = np.linspace(0, 100, num_points)  # Time in seconds
    
    # Ballistic trajectory (simplified)
    v0 = 800  # m/s initial velocity
    angle = np.radians(45)  # Launch angle
    g = 9.81  # m/s²
    
    x = v0 * np.cos(angle) * t
    y = v0 * np.sin(angle) * t - 0.5 * g * t**2
    z = np.zeros_like(t)  # Assuming 2D for simplicity
    
    # Add noise
    x += np.random.normal(0, 50, size=len(t))
    y += np.random.normal(0, 50, size=len(t))
    
    history = []
    for i in range(num_points):
        history.append({
            "timestamp": int(time.time()) - (num_points - i) * 10,
            "position": [float(x[i]), float(y[i]), float(z[i])],
            "velocity": [float(v0 * np.cos(angle)), float(v0 * np.sin(angle) - g * t[i]), 0.0]
        })
    
    return history

track_history = generate_track_history(15)

# Predict trajectory
prediction_request = {
    "track_id": "threat_001",
    "history": track_history,
    "prediction_horizon": 30,  # seconds
    "model": "kalman_filter"  # or "neural_network"
}

response = requests.post(
    f"{BASE_URL}/api/v1/pwsa/predict",
    headers=headers,
    json=prediction_request
)

if response.status_code == 200:
    result = response.json()['data']
    
    # Visualize trajectory
    plt.figure(figsize=(12, 6))
    
    # Historical track
    hist_x = [p['position'][0] for p in track_history]
    hist_y = [p['position'][1] for p in track_history]
    plt.plot(hist_x, hist_y, 'bo-', label='Historical Track', linewidth=2, markersize=6)
    
    # Predicted trajectory
    pred_x = [p['position'][0] for p in result['predictions']]
    pred_y = [p['position'][1] for p in result['predictions']]
    plt.plot(pred_x, pred_y, 'r--', label='Predicted Trajectory', linewidth=2)
    
    # Uncertainty bounds
    if 'uncertainty' in result:
        std_x = [u['std'][0] for u in result['uncertainty']]
        std_y = [u['std'][1] for u in result['uncertainty']]
        plt.fill_between(
            pred_x,
            np.array(pred_y) - 2*np.array(std_y),
            np.array(pred_y) + 2*np.array(std_y),
            alpha=0.3,
            color='red',
            label='95% Confidence'
        )
    
    # Impact point
    if result['impact_point']:
        impact = result['impact_point']
        plt.plot(impact['position'][0], impact['position'][1], 'kx', 
                markersize=20, markeredgewidth=3, label='Predicted Impact')
    
    plt.xlabel('X Position (m)')
    plt.ylabel('Y Position (m)')
    plt.title('Threat Trajectory Prediction')
    plt.legend()
    plt.grid(alpha=0.3)
    plt.axis('equal')
    plt.show()
    
    print(f"\nTrajectory Prediction Results:")
    print(f"  Model Used: {result['model']}")
    print(f"  Prediction Confidence: {result['confidence']:.2%}")
    print(f"  Time to Impact: {result['time_to_impact']:.1f}s")
    if result['impact_point']:
        print(f"  Impact Position: ({result['impact_point']['position'][0]:.0f}, {result['impact_point']['position'][1]:.0f}, {result['impact_point']['position'][2]:.0f})")
    print(f"  Computation Time: {result['computation_time_ms']:.1f}ms")
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## 4. Threat Prioritization

Prioritize multiple threats based on risk assessment.

In [None]:
# Multiple threats
threats = [
    {
        "threat_id": "T001",
        "type": "ballistic_missile",
        "position": [50000, 30000, 20000],
        "velocity": 2500,
        "time_to_impact": 180,
        "confidence": 0.95
    },
    {
        "threat_id": "T002",
        "type": "cruise_missile",
        "position": [30000, 15000, 5000],
        "velocity": 800,
        "time_to_impact": 120,
        "confidence": 0.88
    },
    {
        "threat_id": "T003",
        "type": "aircraft",
        "position": [80000, 40000, 10000],
        "velocity": 600,
        "time_to_impact": 300,
        "confidence": 0.72
    },
    {
        "threat_id": "T004",
        "type": "hypersonic",
        "position": [100000, 80000, 30000],
        "velocity": 5000,
        "time_to_impact": 90,
        "confidence": 0.82
    }
]

prioritization_request = {
    "threats": threats,
    "prioritization_strategy": "time_weighted_risk",
    "defensive_assets": {
        "interceptors_available": 10,
        "max_engagement_range": 100000
    }
}

response = requests.post(
    f"{BASE_URL}/api/v1/pwsa/prioritize",
    headers=headers,
    json=prioritization_request
)

if response.status_code == 200:
    result = response.json()['data']
    
    print("Threat Prioritization Results:\n")
    print(f"{'Rank':<6} {'Threat ID':<12} {'Type':<20} {'Risk Score':<12} {'TTI (s)':<10} {'Action':<20}")
    print("=" * 95)
    
    for i, threat in enumerate(result['prioritized_threats'], 1):
        print(f"{i:<6} {threat['threat_id']:<12} {threat['type']:<20} "
              f"{threat['risk_score']:>6.3f}      {threat['time_to_impact']:>6.0f}    "
              f"{threat['recommended_action']:<20}")
    
    print(f"\n{'='*95}")
    print(f"Prioritization Strategy: {result['strategy_used']}")
    print(f"Total Threats: {result['total_threats']}")
    print(f"Engageable Threats: {result['engageable_threats']}")
    print(f"Processing Time: {result['processing_time_ms']:.1f}ms")
    
    # Visualize threat priorities
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    
    # Risk scores
    threat_ids = [t['threat_id'] for t in result['prioritized_threats']]
    risk_scores = [t['risk_score'] for t in result['prioritized_threats']]
    colors = ['red' if s > 0.8 else 'orange' if s > 0.6 else 'yellow' for s in risk_scores]
    
    ax1.barh(threat_ids, risk_scores, color=colors, edgecolor='black')
    ax1.set_xlabel('Risk Score')
    ax1.set_title('Threat Risk Scores')
    ax1.set_xlim(0, 1)
    ax1.grid(axis='x', alpha=0.3)
    
    # Time to impact
    tti_values = [t['time_to_impact'] for t in result['prioritized_threats']]
    ax2.barh(threat_ids, tti_values, color='steelblue', edgecolor='black')
    ax2.set_xlabel('Time to Impact (s)')
    ax2.set_title('Threat Timeline')
    ax2.grid(axis='x', alpha=0.3)
    
    plt.tight_layout()
    plt.show()
else:
    print(f"Error: {response.status_code}")
    print(response.text)

## 5. Real-Time Tracking Simulation

Simulate real-time threat tracking with continuous updates.

In [None]:
import threading
import queue

# Tracking simulation
class ThreatTracker:
    def __init__(self, base_url: str, headers: dict):
        self.base_url = base_url
        self.headers = headers
        self.tracks = {}
        self.updates = queue.Queue()
        self.running = False
    
    def update_track(self, sensor_data: Dict) -> Dict:
        """Update track with new sensor data."""
        response = requests.post(
            f"{self.base_url}/api/v1/pwsa/track",
            headers=self.headers,
            json=sensor_data
        )
        if response.status_code == 200:
            return response.json()['data']
        return None
    
    def simulate_tracking(self, duration: int = 10, update_rate: float = 1.0):
        """Simulate tracking for specified duration."""
        print(f"Starting tracking simulation for {duration}s...\n")
        start_time = time.time()
        iteration = 0
        
        while time.time() - start_time < duration:
            iteration += 1
            
            # Generate simulated sensor update
            sensor_data = {
                "sv_id": 42,
                "timestamp": int(time.time()),
                "ir_frame": {
                    "width": 640,
                    "height": 480,
                    "centroid_x": 320.0 + np.random.normal(0, 10),
                    "centroid_y": 240.0 + np.random.normal(0, 10),
                    "hotspot_count": np.random.randint(3, 8)
                },
                "radar": {
                    "range": 25000 + np.random.normal(0, 500),
                    "azimuth": np.random.uniform(-5, 5),
                    "elevation": np.random.uniform(-2, 2)
                }
            }
            
            # Update track
            result = self.update_track(sensor_data)
            
            if result:
                clear_output(wait=True)
                print(f"Tracking Update #{iteration} (t={time.time()-start_time:.1f}s)")
                print("=" * 60)
                print(f"Active Tracks: {result['active_tracks']}")
                print(f"New Detections: {result['new_detections']}")
                print(f"Lost Tracks: {result['lost_tracks']}")
                print(f"Processing Latency: {result['latency_ms']:.1f}ms\n")
                
                if result['tracks']:
                    print(f"{'Track ID':<12} {'Position':<30} {'Velocity':<12} {'Confidence':<12}")
                    print("-" * 70)
                    for track in result['tracks']:
                        pos = track['position']
                        print(f"{track['track_id']:<12} "
                              f"({pos[0]:>7.0f}, {pos[1]:>7.0f}, {pos[2]:>7.0f})  "
                              f"{track['velocity']:>8.0f} m/s  "
                              f"{track['confidence']:>8.1%}")
            
            time.sleep(update_rate)
        
        print(f"\nTracking simulation complete.")

# Run simulation
tracker = ThreatTracker(BASE_URL, headers)
tracker.simulate_tracking(duration=15, update_rate=1.5)

## Summary

This tutorial demonstrated advanced PWSA capabilities:

1. ✓ Multi-sensor fusion for improved detection accuracy
2. ✓ Pixel-level IR processing with hotspot detection
3. ✓ Trajectory prediction with uncertainty quantification
4. ✓ Threat prioritization using risk assessment
5. ✓ Real-time tracking simulation

## Next Steps

- Explore `03_llm_consensus.ipynb` for multi-model AI queries
- See `04_pixel_processing.ipynb` for advanced image analysis
- Check [PWSA Documentation](../docs/API.md#pwsa-endpoints) for all endpoints
- Review [Architecture Guide](../docs/ARCHITECTURE.md) for system design