# DKW Controller Implementation Demo

**Artifact:** experiment_001 - method.py

This notebook demonstrates a DKW (Dvoretzky-Kiefer-Wolfowitz) guided fusion/fission controller. The controller uses statistical bounds to make decisions about whether to use fusion or fission modes based on observed error rates.

## Overview
- **DKW Controller**: Makes adaptive fusion/fission decisions with statistical guarantees
- **Experiment**: Compares the proposed method against a conservative baseline
- **Self-contained**: All data is inlined, no external files required

## 1. Imports and Setup

In [None]:
import json
import numpy as np
from dataclasses import dataclass, field

# Set random seed for reproducible results
np.random.seed(42)
print("Setup complete!")

## 2. DKW Controller Class

The DKW Controller implements a statistically-guided decision making system:

- **epsilon_target**: Target error rate threshold (10%)
- **delta**: Confidence parameter for DKW bound (5%)
- **min_samples**: Minimum observations before making decisions (100)
- **hysteresis**: Prevents oscillation between states (5%)

The controller uses the Dvoretzky-Kiefer-Wolfowitz inequality to compute confidence bounds on the empirical error rate.

In [None]:
@dataclass
class DKWController:
    """DKW-guided fusion/fission controller."""
    epsilon_target: float = 0.10
    delta: float = 0.05
    min_samples: int = 100
    hysteresis: float = 0.05

    samples: list = field(default_factory=list)
    current_state: str = "fission"

    def dkw_epsilon(self, n: int) -> float:
        """Compute DKW epsilon for n samples."""
        if n < 2:
            return 1.0
        return np.sqrt(np.log(2 / self.delta) / (2 * n))

    def add_observation(self, error: float) -> None:
        """Add error observation for calibration."""
        self.samples.append(error)

    def decide(self) -> str:
        """Make fusion/fission decision with DKW guarantee."""
        n = len(self.samples)
        if n < self.min_samples:
            return self.current_state

        epsilon = self.dkw_epsilon(n)
        empirical_error = np.mean(self.samples[-self.min_samples:])
        error_upper_bound = empirical_error + epsilon

        if self.current_state == "fusion":
            if error_upper_bound > self.epsilon_target + self.hysteresis:
                self.current_state = "fission"
        else:
            if error_upper_bound < self.epsilon_target - self.hysteresis:
                self.current_state = "fusion"

        return self.current_state

print("DKWController class defined successfully!")

## 3. Sample Data

Since this is a self-contained demo, we'll create sample data instead of reading from external files. The data represents examples with varying difficulty levels that influence error probability.

In [None]:
# Sample input data (replaces reading from "../dataset_001/data_out.json")
# Each example has an id and difficulty level (0-1) that affects error probability
sample_data = [
    {"id": "example_000", "difficulty": 0.05},
    {"id": "example_001", "difficulty": 0.08},
    {"id": "example_002", "difficulty": 0.15},
    {"id": "example_003", "difficulty": 0.03},
    {"id": "example_004", "difficulty": 0.12},
    {"id": "example_005", "difficulty": 0.20},
    {"id": "example_006", "difficulty": 0.07},
    {"id": "example_007", "difficulty": 0.09},
    {"id": "example_008", "difficulty": 0.18},
    {"id": "example_009", "difficulty": 0.04},
]

# Extend with more examples for a meaningful experiment
for i in range(10, 200):
    # Generate examples with varying difficulty (higher difficulty = more errors)
    difficulty = np.random.beta(2, 8)  # Beta distribution favoring lower difficulties
    sample_data.append({
        "id": f"example_{i:03d}",
        "difficulty": difficulty
    })

print(f"Generated {len(sample_data)} sample examples")
print(f"Difficulty range: {min(ex['difficulty'] for ex in sample_data):.3f} - {max(ex['difficulty'] for ex in sample_data):.3f}")
print(f"Average difficulty: {np.mean([ex['difficulty'] for ex in sample_data]):.3f}")

## 4. Experiment Function

The experiment compares two approaches:
- **Baseline**: Always uses conservative "fission" mode
- **Proposed**: Uses the adaptive DKW controller

In [None]:
def run_experiment(data):
    """Run DKW controller experiment."""
    controller = DKWController()
    results = {"baseline": [], "proposed": []}

    for example in data:
        # Simulate error occurrence based on difficulty
        error = np.random.random() < example["difficulty"]
        controller.add_observation(float(error))
        decision = controller.decide()

        results["proposed"].append({
            "id": example["id"],
            "decision": decision,
            "error": error,
            "difficulty": example["difficulty"]
        })
        results["baseline"].append({
            "id": example["id"],
            "decision": "fission",  # Always conservative
            "error": error,
            "difficulty": example["difficulty"]
        })

    return results

print("Experiment function defined successfully!")

## 5. Run the Experiment

In [None]:
# Run the experiment
results = run_experiment(sample_data)

print("Experiment completed!")
print(f"Total examples processed: {len(results['proposed'])}")
print(f"Baseline decisions: {len(results['baseline'])}")
print(f"Proposed method decisions: {len(results['proposed'])}")

## 6. Analyze Results

In [None]:
# Analyze the results
def analyze_results(results):
    """Analyze and display experiment results."""
    
    # Count decisions and errors for each method
    baseline_fusion = sum(1 for r in results['baseline'] if r['decision'] == 'fusion')
    baseline_fission = sum(1 for r in results['baseline'] if r['decision'] == 'fission')
    baseline_errors = sum(1 for r in results['baseline'] if r['error'])
    
    proposed_fusion = sum(1 for r in results['proposed'] if r['decision'] == 'fusion')
    proposed_fission = sum(1 for r in results['proposed'] if r['decision'] == 'fission')
    proposed_errors = sum(1 for r in results['proposed'] if r['error'])
    
    total = len(results['baseline'])
    
    print("=== EXPERIMENT RESULTS ===")
    print(f"\nBaseline (Always Fission):")
    print(f"  Fusion decisions: {baseline_fusion}/{total} ({baseline_fusion/total*100:.1f}%)")
    print(f"  Fission decisions: {baseline_fission}/{total} ({baseline_fission/total*100:.1f}%)")
    print(f"  Total errors: {baseline_errors}/{total} ({baseline_errors/total*100:.1f}%)")
    
    print(f"\nProposed (DKW Controller):")
    print(f"  Fusion decisions: {proposed_fusion}/{total} ({proposed_fusion/total*100:.1f}%)")
    print(f"  Fission decisions: {proposed_fission}/{total} ({proposed_fission/total*100:.1f}%)")
    print(f"  Total errors: {proposed_errors}/{total} ({proposed_errors/total*100:.1f}%)")
    
    # Calculate efficiency (fusion rate)
    baseline_efficiency = baseline_fusion / total * 100
    proposed_efficiency = proposed_fusion / total * 100
    
    print(f"\n=== EFFICIENCY COMPARISON ===")
    print(f"Baseline efficiency: {baseline_efficiency:.1f}%")
    print(f"Proposed efficiency: {proposed_efficiency:.1f}%")
    print(f"Improvement: +{proposed_efficiency - baseline_efficiency:.1f} percentage points")
    
    return {
        'baseline_efficiency': baseline_efficiency,
        'proposed_efficiency': proposed_efficiency,
        'baseline_errors': baseline_errors,
        'proposed_errors': proposed_errors
    }

analysis = analyze_results(results)

## 7. Visualize Decision Patterns

In [None]:
# Show decision patterns over time
print("=== DECISION TIMELINE (First 20 examples) ===")
print("ID\t\tDifficulty\tError\tBaseline\tProposed")
print("-" * 60)

for i in range(min(20, len(results['proposed']))):
    baseline = results['baseline'][i]
    proposed = results['proposed'][i]
    
    print(f"{baseline['id']}\t{proposed['difficulty']:.3f}\t\t{baseline['error']}\t{baseline['decision'][:4]}\t\t{proposed['decision'][:4]}")

# Show when controller switches to fusion mode
first_fusion = None
for i, r in enumerate(results['proposed']):
    if r['decision'] == 'fusion':
        first_fusion = i
        break

if first_fusion is not None:
    print(f"\nFirst fusion decision at example #{first_fusion}: {results['proposed'][first_fusion]['id']}")
else:
    print("\nController never switched to fusion mode.")

## 8. Export Results (Optional)

The original script saved results to a JSON file. Here we show what that output would look like:

In [None]:
# Display sample of results in JSON format (first 3 examples)
sample_output = {
    "baseline": results['baseline'][:3],
    "proposed": results['proposed'][:3]
}

print("Sample output (first 3 examples):")
print(json.dumps(sample_output, indent=2))

# Optionally save full results to file (uncomment to enable)
# with open("method_out.json", "w") as f:
#     json.dump(results, f, indent=2)
# print("\nResults saved to method_out.json")

## Summary

This notebook demonstrates the DKW Controller implementation:

1. **Statistical Guarantees**: Uses DKW inequality for confidence bounds
2. **Adaptive Behavior**: Switches between fusion/fission based on observed errors
3. **Hysteresis**: Prevents oscillation between modes
4. **Performance**: Aims to increase efficiency (fusion rate) while maintaining error bounds

The controller starts conservatively with fission mode and adapts to fusion when confidence in low error rates is sufficient. This provides better efficiency than always-conservative approaches while maintaining statistical safety guarantees.