# Tutorial 09: Adding Custom Devices

**Goal:** Create custom device agents with domain-specific behavior.

**Time:** ~15 minutes

---

## Why Custom Devices?

HERON's `DeviceAgent` class provides:
- **Standardized lifecycle**: reset → observe → act → update
- **Automatic registration**: Devices integrate with environments seamlessly
- **Feature-based observation**: Use `FeatureProvider` for observable state
- **Action abstraction**: Define action spaces declaratively

## Step 1: Device Agent Lifecycle

Every device agent follows this lifecycle:

```
┌─────────────────────────────────────────────────────────┐
│                    Device Lifecycle                     │
├─────────────────────────────────────────────────────────┤
│  __init__()    → Configure device parameters            │
│       ↓                                                 │
│  reset()       → Initialize state for new episode       │
│       ↓                                                 │
│  ┌─────────────────────────────────────┐               │
│  │ STEP LOOP:                          │               │
│  │  observe()      → Get features      │               │
│  │       ↓                             │               │
│  │  act(action)    → Apply control     │               │
│  │       ↓                             │               │
│  │  update_state() → Physics update    │               │
│  │       ↓                             │               │
│  │  update_cost_safety() → Metrics     │               │
│  └─────────────────────────────────────┘               │
└─────────────────────────────────────────────────────────┘
```

In [None]:
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from gymnasium import spaces

# For demonstration, we'll create simplified versions
# In practice, import from heron and powergrid

## Step 2: Define Custom Feature

Custom features extend `FeatureProvider` to define observable state.

**Key methods:**
- `vector()`: Return numpy array for observation
- `names()`: Feature names for debugging
- `visibility`: Who can see this feature

In [None]:
@dataclass
class WindTurbineFeature:
    """Custom feature for wind turbine state.
    
    Visibility levels determine who can observe this feature:
    - 'owner': Only the device itself
    - 'upper_level': Parent coordinator can see
    - 'system': All coordinators can see
    - 'public': All agents can see
    """
    visibility: List[str] = field(default_factory=lambda: ['owner', 'upper_level'])
    
    # Device parameters
    p_rated_MW: float = 2.0      # Rated power capacity
    cut_in_speed: float = 3.0    # Minimum wind speed (m/s)
    rated_speed: float = 12.0    # Wind speed at rated power (m/s)
    cut_out_speed: float = 25.0  # Maximum safe wind speed (m/s)
    
    # Dynamic state
    wind_speed_ms: float = 8.0   # Current wind speed
    curtailment: float = 1.0     # Curtailment factor (0-1)
    p_output_MW: float = 0.0     # Current power output
    
    def vector(self) -> np.ndarray:
        """Convert to observation vector (normalized)."""
        return np.array([
            self.wind_speed_ms / self.rated_speed,  # Normalized wind
            self.curtailment,                        # Already 0-1
            self.p_output_MW / self.p_rated_MW,     # Normalized power
        ], dtype=np.float32)
    
    def names(self) -> List[str]:
        """Feature names for debugging."""
        return ['wind_speed_norm', 'curtailment', 'power_output_norm']


# Test the feature
feature = WindTurbineFeature(wind_speed_ms=10.0, p_output_MW=1.5)
print(f"Feature vector: {feature.vector()}")
print(f"Feature names: {feature.names()}")
print(f"Visibility: {feature.visibility}")

## Step 3: Implement Device Agent

A custom device agent must implement:

1. **`set_device_action()`**: Define the action space
2. **`update_state(ext_state)`**: Apply physics/dynamics
3. **`update_cost_safety()`**: Calculate cost and safety metrics

In [None]:
@dataclass
class WindTurbineConfig:
    """Configuration for wind turbine."""
    bus: str
    p_rated_MW: float = 2.0
    cut_in_speed: float = 3.0
    rated_speed: float = 12.0
    cut_out_speed: float = 25.0


class WindTurbine:
    """Wind turbine device agent.
    
    Action: Curtailment factor [0, 1]
    - 0 = Fully curtailed (no power)
    - 1 = Maximum available power
    
    Physics:
    - Power depends on wind speed (cubic relationship)
    - Curtailment reduces output below available
    """
    
    def __init__(self, agent_id: str, config: Dict[str, Any]):
        self.agent_id = agent_id
        self.config = WindTurbineConfig(**config)
        
        # Initialize feature
        self.feature = WindTurbineFeature(
            p_rated_MW=self.config.p_rated_MW,
            cut_in_speed=self.config.cut_in_speed,
            rated_speed=self.config.rated_speed,
            cut_out_speed=self.config.cut_out_speed,
        )
        
        # Metrics
        self.cost = 0.0
        self.safety = 0.0
        self.available_power = 0.0
    
    def get_action_space(self) -> spaces.Box:
        """Define action space: curtailment factor [0, 1]."""
        return spaces.Box(
            low=np.array([0.0], dtype=np.float32),
            high=np.array([1.0], dtype=np.float32),
            dtype=np.float32,
        )
    
    def get_observation_space(self) -> spaces.Box:
        """Define observation space based on feature."""
        return spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(len(self.feature.vector()),),
            dtype=np.float32,
        )
    
    def reset(self, wind_speed: float = 8.0):
        """Reset for new episode."""
        self.feature.wind_speed_ms = wind_speed
        self.feature.curtailment = 1.0
        self.feature.p_output_MW = 0.0
        self.cost = 0.0
        self.safety = 0.0
        self._compute_available_power()
    
    def _compute_available_power(self):
        """Compute available power from wind speed.
        
        Wind turbine power curve:
        - Below cut-in: 0
        - Cut-in to rated: Cubic relationship
        - Rated to cut-out: Constant at rated power
        - Above cut-out: 0 (safety shutdown)
        """
        v = self.feature.wind_speed_ms
        v_in = self.config.cut_in_speed
        v_rated = self.config.rated_speed
        v_out = self.config.cut_out_speed
        p_rated = self.config.p_rated_MW
        
        if v < v_in or v > v_out:
            self.available_power = 0.0
        elif v < v_rated:
            # Cubic relationship in the partial load region
            self.available_power = p_rated * ((v - v_in) / (v_rated - v_in)) ** 3
        else:
            # At rated power
            self.available_power = p_rated
    
    def observe(self) -> np.ndarray:
        """Get current observation."""
        return self.feature.vector()
    
    def act(self, action: np.ndarray):
        """Apply action (curtailment)."""
        self.feature.curtailment = float(np.clip(action[0], 0.0, 1.0))
    
    def update_state(self, wind_speed: float):
        """Update state based on new wind speed.
        
        Args:
            wind_speed: New wind speed from environment/dataset
        """
        self.feature.wind_speed_ms = wind_speed
        self._compute_available_power()
        
        # Apply curtailment to available power
        self.feature.p_output_MW = self.available_power * self.feature.curtailment
    
    def update_cost_safety(self):
        """Calculate cost and safety metrics.
        
        Cost: Opportunity cost of curtailment (lost revenue)
        Safety: Penalty for operating outside safe bounds
        """
        # Opportunity cost: curtailed power × price (simplified)
        curtailed_MW = self.available_power - self.feature.p_output_MW
        price_per_MWh = 30.0  # $/MWh
        self.cost = curtailed_MW * price_per_MWh
        
        # Safety: Penalty if wind speed near cut-out
        if self.feature.wind_speed_ms > self.config.cut_out_speed * 0.9:
            self.safety = 10.0  # Near cut-out warning
        else:
            self.safety = 0.0


# Test the device
turbine = WindTurbine(
    agent_id='wind_1',
    config={'bus': 'Bus_1', 'p_rated_MW': 2.0}
)

print(f"Action space: {turbine.get_action_space()}")
print(f"Observation space: {turbine.get_observation_space()}")

## Step 4: Device Lifecycle Demo

Simulate the complete device lifecycle:

In [None]:
# Simulate wind profile (hourly for one day)
np.random.seed(42)
hours = 24
base_wind = 8.0  # Base wind speed m/s
wind_profile = base_wind + 4 * np.sin(np.linspace(0, 2*np.pi, hours)) + np.random.randn(hours)
wind_profile = np.clip(wind_profile, 0, 30)

# Create turbine
turbine = WindTurbine('wind_1', {'bus': 'Bus_1', 'p_rated_MW': 2.0})

# Episode simulation
print("Wind Turbine Simulation")
print("=" * 60)

# Reset
turbine.reset(wind_speed=wind_profile[0])
print(f"Initial state: wind={wind_profile[0]:.1f} m/s")

total_energy = 0.0
total_cost = 0.0

for t in range(hours):
    # 1. Observe
    obs = turbine.observe()
    
    # 2. Policy decides action (simple heuristic: curtail in high wind)
    if wind_profile[t] > 20:  # Near cut-out, reduce load
        action = np.array([0.5])
    else:
        action = np.array([1.0])  # Full output
    
    # 3. Act
    turbine.act(action)
    
    # 4. Update state (environment provides new wind)
    turbine.update_state(wind_profile[t])
    
    # 5. Update cost/safety
    turbine.update_cost_safety()
    
    # Accumulate metrics
    total_energy += turbine.feature.p_output_MW  # MWh (1 hour timestep)
    total_cost += turbine.cost
    
    if t % 6 == 0:  # Print every 6 hours
        print(f"Hour {t:2d}: wind={wind_profile[t]:5.1f} m/s, "
              f"available={turbine.available_power:.2f} MW, "
              f"output={turbine.feature.p_output_MW:.2f} MW, "
              f"curtail={turbine.feature.curtailment:.1f}")

print("=" * 60)
print(f"Total energy: {total_energy:.1f} MWh")
print(f"Capacity factor: {total_energy / (hours * turbine.config.p_rated_MW) * 100:.1f}%")
print(f"Curtailment cost: ${total_cost:.2f}")

## Step 5: Device Registry Pattern

Register devices for factory-based creation:

In [None]:
# Device registry for factory pattern
DEVICE_REGISTRY = {}


def register_device(name: str):
    """Decorator to register device classes."""
    def decorator(cls):
        DEVICE_REGISTRY[name] = cls
        return cls
    return decorator


def create_device(device_type: str, agent_id: str, config: Dict) -> Any:
    """Factory function to create devices."""
    if device_type not in DEVICE_REGISTRY:
        raise ValueError(f"Unknown device type: {device_type}")
    return DEVICE_REGISTRY[device_type](agent_id, config)


# Register our wind turbine
DEVICE_REGISTRY['WindTurbine'] = WindTurbine


# Example: Create from configuration
device_configs = [
    {'type': 'WindTurbine', 'name': 'wind_1', 'config': {'bus': 'Bus_1', 'p_rated_MW': 2.0}},
    {'type': 'WindTurbine', 'name': 'wind_2', 'config': {'bus': 'Bus_2', 'p_rated_MW': 3.0}},
]

devices = []
for cfg in device_configs:
    device = create_device(cfg['type'], cfg['name'], cfg['config'])
    devices.append(device)
    print(f"Created {cfg['type']}: {device.agent_id} ({device.config.p_rated_MW} MW)")

print(f"\nRegistered devices: {list(DEVICE_REGISTRY.keys())}")

## Step 6: Adding Another Device Type

Let's add a Solar Panel device to demonstrate the pattern:

In [None]:
@dataclass
class SolarPanelFeature:
    """Feature for solar panel."""
    visibility: List[str] = field(default_factory=lambda: ['owner', 'upper_level'])
    
    p_rated_kW: float = 100.0
    irradiance: float = 0.0      # 0-1 normalized
    temperature_c: float = 25.0
    curtailment: float = 1.0
    p_output_kW: float = 0.0
    
    def vector(self) -> np.ndarray:
        return np.array([
            self.irradiance,
            self.temperature_c / 50.0,  # Normalize
            self.curtailment,
            self.p_output_kW / self.p_rated_kW,
        ], dtype=np.float32)
    
    def names(self) -> List[str]:
        return ['irradiance', 'temperature_norm', 'curtailment', 'power_output_norm']


class SolarPanel:
    """Solar panel with irradiance-based generation.
    
    Action: Curtailment factor [0, 1]
    Physics: P = irradiance × rated × efficiency × (1 - temp_derating) × curtailment
    """
    
    def __init__(self, agent_id: str, config: Dict[str, Any]):
        self.agent_id = agent_id
        self.p_rated_kW = config.get('p_rated_kW', 100.0)
        self.efficiency = config.get('efficiency', 0.20)
        self.temp_coef = config.get('temp_coef', -0.004)  # -0.4%/°C above 25°C
        
        self.feature = SolarPanelFeature(p_rated_kW=self.p_rated_kW)
        self.cost = 0.0
        self.safety = 0.0
    
    def get_action_space(self) -> spaces.Box:
        return spaces.Box(
            low=np.array([0.0], dtype=np.float32),
            high=np.array([1.0], dtype=np.float32),
        )
    
    def reset(self):
        self.feature.irradiance = 0.0
        self.feature.temperature_c = 25.0
        self.feature.curtailment = 1.0
        self.feature.p_output_kW = 0.0
    
    def observe(self) -> np.ndarray:
        return self.feature.vector()
    
    def act(self, action: np.ndarray):
        self.feature.curtailment = float(np.clip(action[0], 0.0, 1.0))
    
    def update_state(self, irradiance: float, temperature: float):
        """Update state with environmental conditions."""
        self.feature.irradiance = irradiance
        self.feature.temperature_c = temperature
        
        # Temperature derating
        temp_factor = 1.0 + self.temp_coef * (temperature - 25.0)
        temp_factor = np.clip(temp_factor, 0.5, 1.1)
        
        # Power output
        available = self.p_rated_kW * irradiance * temp_factor
        self.feature.p_output_kW = available * self.feature.curtailment
    
    def update_cost_safety(self):
        # Simplified: zero cost for renewables
        self.cost = 0.0
        self.safety = 0.0


# Register solar panel
DEVICE_REGISTRY['SolarPanel'] = SolarPanel

# Test
solar = create_device('SolarPanel', 'solar_1', {'p_rated_kW': 100})
solar.reset()
solar.act(np.array([1.0]))
solar.update_state(irradiance=0.8, temperature=35.0)

print(f"Solar panel at 80% irradiance, 35°C:")
print(f"  Output: {solar.feature.p_output_kW:.1f} kW / {solar.p_rated_kW} kW rated")
print(f"  Observation: {solar.observe()}")

## Step 7: Integration with Environment

Devices integrate with the environment through the coordinator:

In [None]:
class SimpleCoordinator:
    """Coordinator managing multiple devices."""
    
    def __init__(self, coord_id: str, device_configs: List[Dict]):
        self.coord_id = coord_id
        self.devices = {}
        
        # Create devices from config
        for cfg in device_configs:
            device = create_device(cfg['type'], cfg['name'], cfg.get('config', {}))
            self.devices[cfg['name']] = device
    
    def reset(self):
        """Reset all devices."""
        for device in self.devices.values():
            device.reset()
    
    def get_total_power(self) -> float:
        """Sum power from all devices."""
        total = 0.0
        for device in self.devices.values():
            if hasattr(device.feature, 'p_output_MW'):
                total += device.feature.p_output_MW
            elif hasattr(device.feature, 'p_output_kW'):
                total += device.feature.p_output_kW / 1000.0  # Convert to MW
        return total
    
    def step(self, actions: Dict[str, np.ndarray], ext_state: Dict):
        """Step all devices."""
        for name, device in self.devices.items():
            # Apply action
            if name in actions:
                device.act(actions[name])
            
            # Update state based on device type
            if isinstance(device, WindTurbine):
                device.update_state(ext_state.get('wind_speed', 8.0))
            elif isinstance(device, SolarPanel):
                device.update_state(
                    ext_state.get('irradiance', 0.5),
                    ext_state.get('temperature', 25.0)
                )
            
            device.update_cost_safety()


# Create a microgrid with mixed devices
mg = SimpleCoordinator('mg_1', [
    {'type': 'WindTurbine', 'name': 'wind_1', 'config': {'bus': 'B1', 'p_rated_MW': 2.0}},
    {'type': 'SolarPanel', 'name': 'solar_1', 'config': {'p_rated_kW': 500}},
])

mg.reset()

# Simulate one step
actions = {
    'wind_1': np.array([1.0]),   # Full wind output
    'solar_1': np.array([0.8]),  # Curtail solar to 80%
}

ext_state = {
    'wind_speed': 10.0,    # Good wind
    'irradiance': 0.9,     # Sunny
    'temperature': 30.0,   # Warm
}

mg.step(actions, ext_state)

print(f"Microgrid {mg.coord_id} output:")
for name, device in mg.devices.items():
    if hasattr(device.feature, 'p_output_MW'):
        print(f"  {name}: {device.feature.p_output_MW:.2f} MW")
    elif hasattr(device.feature, 'p_output_kW'):
        print(f"  {name}: {device.feature.p_output_kW:.1f} kW")
print(f"  Total: {mg.get_total_power():.2f} MW")

## Key Takeaways

1. **Device Agent Structure**
   ```python
   class MyDevice(DeviceAgent):
       def __init__(self, agent_id, config):
           self.feature = MyFeature(...)  # Observable state
       
       def get_action_space(self) -> spaces.Box:
           ...  # Define control inputs
       
       def update_state(self, ext_state):
           ...  # Physics/dynamics
       
       def update_cost_safety(self):
           ...  # Calculate metrics
   ```

2. **Feature Provider Pattern**
   ```python
   @dataclass
   class MyFeature(FeatureProvider):
       visibility: List[str] = ['owner', 'upper_level']
       
       def vector(self) -> np.ndarray:
           ...  # Observation vector
   ```

3. **Device Registry**
   ```python
   DEVICE_REGISTRY['MyDevice'] = MyDevice
   device = DEVICE_REGISTRY[device_type](agent_id, config)
   ```

4. **Lifecycle Methods**
   - `reset()`: Initialize state
   - `observe()`: Get observation
   - `act(action)`: Apply control
   - `update_state(ext)`: Physics update
   - `update_cost_safety()`: Metrics

---

**Next:** Explore the full power grid case study in `examples/04_custom_device.py`