# Extending Folio with Custom Models

This notebook demonstrates how to extend Folio with custom surrogate models,
acquisition functions, and recommenders. We cover two approaches:

1. **Part 1: Inline implementation** - Self-contained in the notebook for rapid prototyping
2. **Part 2: Modular approach** - Production-ready code in external modules

**Target audience**: ML researchers and advanced users who want to plug in custom models.

**Prerequisites**: Familiarity with PyTorch and basic Bayesian optimization concepts.

## Setup

In [None]:
import tempfile
import numpy as np
import torch
import torch.nn as nn
from torch import Tensor
from torch.distributions import Normal

from botorch.acquisition import AcquisitionFunction
from botorch.models.model import Model
from botorch.optim.optimize import optimize_acqf

from folio.api import Folio
from folio.core.config import TargetConfig
from folio.core.schema import InputSpec, OutputSpec
from folio.surrogates.base import Surrogate
from folio.recommenders.base import Recommender
from folio.recommenders.acquisitions.base import Acquisition
from folio.exceptions import NotFittedError

## Test Function

We'll use the same 2D quadratic function from earlier demos:
- **Inputs**: `x1`, `x2` in [0, 10]
- **Output**: yield = 100 - (x1 - 7)^2 - (x2 - 3)^2
- **True optimum**: (7, 3) with yield = 100

In [None]:
def synthetic_experiment(x1: float, x2: float) -> float:
    """Simulated experiment: 2D quadratic with optimum at (7, 3)."""
    noise = np.random.normal(0, 0.5)
    return 100 - (x1 - 7)**2 - (x2 - 3)**2 + noise

TRUE_OPTIMUM = {"x1": 7.0, "x2": 3.0}
TRUE_YIELD = 100.0
BOUNDS = np.array([[0.0, 0.0], [10.0, 10.0]])  # shape (2, n_features)

---

# Part 1: Inline Implementation

This section shows how to implement custom models directly in your notebook. Useful for:
- Rapid prototyping
- One-off experiments
- Understanding the interfaces before creating reusable modules

## 1.1 Custom Surrogate: Neural Network Ensemble

### Interface Requirements

To create a custom surrogate, subclass `folio.surrogates.base.Surrogate` and implement:

```python
class Surrogate(ABC):
    def __init__(self):
        super().__init__()  # Sets self._is_fitted = False
    
    @abstractmethod
    def fit(self, X: np.ndarray, y: np.ndarray) -> "Surrogate":
        """
        X: shape (n_samples, n_features), dtype float64
        y: shape (n_samples,) or (n_samples, 1), dtype float64
        Returns: self (for method chaining)
        Must set: self._is_fitted = True
        """
    
    @abstractmethod
    def predict(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
        """
        X: shape (n_candidates, n_features)
        Returns: (mean, std) both shape (n_candidates,)
        Must raise: NotFittedError if not fitted
        """
```

### Implementation: NN Ensemble

We'll create an ensemble of small MLPs. Uncertainty comes from prediction disagreement.

In [None]:
class _MLP(nn.Module):
    """Simple 2-layer MLP for ensemble members."""
    
    def __init__(self, input_dim: int, hidden_dim: int = 32):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
        )
    
    def forward(self, x: Tensor) -> Tensor:
        return self.net(x)


class NNEnsembleSurrogate(Surrogate):
    """Neural network ensemble for uncertainty estimation.
    
    Parameters
    ----------
    n_members : int
        Number of ensemble members.
    hidden_dim : int
        Hidden layer size for each MLP.
    n_epochs : int
        Training epochs per member.
    lr : float
        Learning rate.
    """
    
    def __init__(
        self,
        n_members: int = 5,
        hidden_dim: int = 32,
        n_epochs: int = 200,
        lr: float = 0.01,
    ):
        # REQUIRED: call super().__init__() to set _is_fitted = False
        super().__init__()
        self.n_members = n_members
        self.hidden_dim = hidden_dim
        self.n_epochs = n_epochs
        self.lr = lr
        self._members: list[_MLP] = []
        self._y_mean = 0.0
        self._y_std = 1.0
    
    def fit(self, X: np.ndarray, y: np.ndarray) -> "NNEnsembleSurrogate":
        """Fit ensemble to training data."""
        input_dim = X.shape[1]
        
        X_t = torch.tensor(X, dtype=torch.float32)
        y_t = torch.tensor(y, dtype=torch.float32).reshape(-1, 1)
        
        # Normalize targets for stability
        self._y_mean = float(y_t.mean())
        self._y_std = float(y_t.std()) + 1e-6
        y_norm = (y_t - self._y_mean) / self._y_std
        
        self._members = []
        for i in range(self.n_members):
            mlp = _MLP(input_dim, self.hidden_dim)
            opt = torch.optim.Adam(mlp.parameters(), lr=self.lr)
            loss_fn = nn.MSELoss()
            
            for _ in range(self.n_epochs):
                opt.zero_grad()
                loss = loss_fn(mlp(X_t), y_norm)
                loss.backward()
                opt.step()
            
            mlp.eval()
            self._members.append(mlp)
        
        # REQUIRED: set _is_fitted = True
        self._is_fitted = True
        
        # REQUIRED: return self for method chaining
        return self
    
    def predict(self, X: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
        """Predict mean and std from ensemble disagreement."""
        # REQUIRED: check fitted state
        if not self._is_fitted:
            raise NotFittedError("Call fit() before predict()")
        
        X_t = torch.tensor(X, dtype=torch.float32)
        
        with torch.no_grad():
            preds = torch.stack([m(X_t) for m in self._members])
        
        # Shape: (n_members, n_candidates, 1) -> (n_members, n_candidates)
        preds = preds.squeeze(-1)
        
        # Denormalize
        preds = preds * self._y_std + self._y_mean
        
        mean = preds.mean(dim=0).numpy()
        std = preds.std(dim=0).numpy()
        std = np.maximum(std, 1e-6)  # Ensure non-zero
        
        return mean, std

### Test the Surrogate

In [None]:
# Generate some training data
np.random.seed(42)
X_train = np.random.uniform(0, 10, size=(10, 2))
y_train = np.array([synthetic_experiment(x[0], x[1]) for x in X_train])

# Fit and predict
surrogate = NNEnsembleSurrogate(n_members=5, n_epochs=100)
surrogate.fit(X_train, y_train)

# Test on new points
X_test = np.array([[7.0, 3.0], [0.0, 0.0], [5.0, 5.0]])
mean, std = surrogate.predict(X_test)

print("Test predictions:")
for i, (x, m, s) in enumerate(zip(X_test, mean, std)):
    print(f"  x={x} -> mean={m:.2f}, std={s:.2f}")

## 1.2 Custom Recommender: Using NN Ensemble with Folio API

### Interface Requirements

To create a custom recommender that works with `folio.suggest()`, subclass
`folio.recommenders.base.Recommender` and implement:

```python
class Recommender(ABC):
    def __init__(self, project: Project):
        self.project = project
    
    @abstractmethod
    def recommend_from_data(
        self,
        X: np.ndarray,           # shape (n_samples, n_features)
        y: np.ndarray,           # shape (n_samples, n_objectives)
        bounds: np.ndarray,      # shape (2, n_features)
        maximize: list[bool],    # one bool per objective
    ) -> np.ndarray:             # shape (n_features,)
        """Return suggested next input values."""
```

The base class provides:
- `recommend(observations)` - High-level method that extracts data and calls `recommend_from_data`
- `random_sample_from_bounds(bounds)` - Helper for random sampling

### Implementation: NNEnsembleRecommender

In [None]:
class NNEnsembleRecommender(Recommender):
    """Custom recommender using NN ensemble + UCB acquisition.
    
    This recommender can be used with Folio's high-level API by
    injecting it into the recommender cache.
    
    Parameters
    ----------
    project : Project
        The project defining inputs, outputs, and targets.
    n_members : int
        Number of ensemble members.
    n_initial : int
        Number of random samples before using the model.
    n_candidates : int
        Number of random candidates to evaluate.
    beta : float
        UCB exploration parameter.
    """
    
    def __init__(
        self,
        project,
        n_members: int = 5,
        n_initial: int = 3,
        n_candidates: int = 100,
        beta: float = 2.0,
    ):
        # REQUIRED: call super().__init__(project)
        super().__init__(project)
        self.n_members = n_members
        self.n_initial = n_initial
        self.n_candidates = n_candidates
        self.beta = beta
        self._surrogate = None
    
    @property
    def surrogate(self):
        """Access the fitted surrogate (for inspection/visualization)."""
        return self._surrogate
    
    def recommend_from_data(
        self,
        X: np.ndarray,
        y: np.ndarray,
        bounds: np.ndarray,
        maximize: list[bool],
    ) -> np.ndarray:
        """Suggest next experiment using NN ensemble + UCB."""
        # Random sampling phase
        if len(X) < self.n_initial:
            self._surrogate = None
            return self.random_sample_from_bounds(bounds)
        
        # Handle multi-output (use first objective for simplicity)
        y_flat = y[:, 0] if y.ndim == 2 else y
        
        # Fit NN ensemble
        self._surrogate = NNEnsembleSurrogate(
            n_members=self.n_members,
            n_epochs=200,
        )
        self._surrogate.fit(X, y_flat)
        
        # Generate random candidates
        candidates = np.random.uniform(
            bounds[0], bounds[1],
            size=(self.n_candidates, X.shape[1])
        )
        
        # Predict and compute UCB
        mean, std = self._surrogate.predict(candidates)
        
        if maximize[0]:
            ucb = mean + self.beta * std
        else:
            ucb = -mean + self.beta * std
        
        # Return best candidate
        return candidates[np.argmax(ucb)]

### Using Custom Recommender with Folio API

To use a custom recommender with `folio.suggest()`, inject it into Folio's recommender cache:

In [None]:
# Create Folio instance and project
db_path = tempfile.mktemp(suffix=".db")
folio = Folio(db_path=db_path)

folio.create_project(
    name="nn_ensemble_demo",
    inputs=[
        InputSpec("x1", "continuous", bounds=(0.0, 10.0)),
        InputSpec("x2", "continuous", bounds=(0.0, 10.0)),
    ],
    outputs=[OutputSpec("yield")],
    target_configs=[TargetConfig(objective="yield", objective_mode="maximize")],
)

# Inject custom recommender
project = folio.get_project("nn_ensemble_demo")
folio._recommenders["nn_ensemble_demo"] = NNEnsembleRecommender(
    project,
    n_members=5,
    n_initial=3,
    beta=2.0,
)

print("Custom recommender injected!")
print(f"Recommender type: {type(folio._recommenders['nn_ensemble_demo']).__name__}")

In [None]:
# Run optimization loop using folio.suggest()
np.random.seed(42)
N_ITERATIONS = 10

print("Optimization with custom NNEnsembleRecommender via Folio API:")
print("=" * 60)

for i in range(N_ITERATIONS):
    # Get suggestion using Folio's high-level API
    suggestion = folio.suggest("nn_ensemble_demo")[0]
    x1, x2 = suggestion["x1"], suggestion["x2"]
    
    # Run experiment
    result = synthetic_experiment(x1, x2)
    
    # Record observation
    folio.add_observation(
        project_name="nn_ensemble_demo",
        inputs={"x1": x1, "x2": x2},
        outputs={"yield": result},
    )
    
    print(f"Iter {i+1:2d}: x1={x1:.2f}, x2={x2:.2f} -> yield={result:.2f}")

# Check best result
observations = folio.get_observations("nn_ensemble_demo")
yields = [obs.outputs["yield"] for obs in observations]
best_idx = np.argmax(yields)
best_obs = observations[best_idx]

print("=" * 60)
print(f"Best result: yield={yields[best_idx]:.2f}")
print(f"  at x1={best_obs.inputs['x1']:.2f}, x2={best_obs.inputs['x2']:.2f}")
print(f"True optimum: yield={TRUE_YIELD:.2f} at x1={TRUE_OPTIMUM['x1']:.2f}, x2={TRUE_OPTIMUM['x2']:.2f}")

In [None]:
# Access the fitted surrogate for inspection
recommender = folio.get_recommender("nn_ensemble_demo")
if recommender.surrogate is not None:
    print("Surrogate is fitted!")
    print(f"  Ensemble members: {len(recommender.surrogate._members)}")
    
    # Make predictions at the optimum
    X_opt = np.array([[7.0, 3.0]])
    mean, std = recommender.surrogate.predict(X_opt)
    print(f"  Prediction at optimum: mean={mean[0]:.2f}, std={std[0]:.2f}")

## 1.3 Custom Acquisition: Probability of Improvement

### Interface Requirements

For acquisition functions that work with BoTorch models (e.g., GPs), you need **two classes**:

1. **Builder** (subclass `folio.recommenders.acquisitions.base.Acquisition`):
```python
class Acquisition(ABC):
    @abstractmethod
    def build(self, model: Model, best_f: float, maximize: bool) -> AcquisitionFunction:
        """Return a BoTorch-compatible acquisition function."""
```

2. **Inner Function** (subclass `botorch.acquisition.AcquisitionFunction`):
```python
class MyAcquisition(AcquisitionFunction):
    def __init__(self, model: Model, ...):
        super().__init__(model=model)  # REQUIRED
        # Use self.register_buffer() for tensor parameters
    
    def forward(self, X: Tensor) -> Tensor:
        # X shape: (batch, q, d)
        # Returns: shape (batch,)
```

### Implementation: Probability of Improvement (PI)

In [None]:
class _PIAcquisition(AcquisitionFunction):
    """Inner BoTorch-compatible PI acquisition function."""
    
    def __init__(self, model: Model, best_f: float, xi: float, maximize: bool):
        super().__init__(model=model)
        self.register_buffer("best_f", torch.as_tensor(best_f))
        self.register_buffer("xi", torch.as_tensor(xi))
        self.maximize = maximize
    
    def forward(self, X: Tensor) -> Tensor:
        posterior = self.model.posterior(X)
        mean = posterior.mean.squeeze(-1)
        std = posterior.variance.sqrt().squeeze(-1)
        
        if self.maximize:
            improvement = mean - self.best_f - self.xi
        else:
            improvement = self.best_f - mean - self.xi
        
        Z = improvement / (std + 1e-9)
        norm = Normal(0, 1)
        pi = norm.cdf(Z)
        pi = torch.where(std > 1e-6, pi, torch.zeros_like(pi))
        
        return pi.sum(dim=-1)


class ProbabilityOfImprovement(Acquisition):
    """PI acquisition function builder."""
    
    def __init__(self, xi: float = 0.0):
        if xi < 0:
            raise ValueError("xi must be non-negative")
        self.xi = xi
    
    def build(self, model: Model, best_f: float, maximize: bool) -> AcquisitionFunction:
        return _PIAcquisition(model, best_f, self.xi, maximize)

---

# Part 2: Modular Approach

For production use, put custom models in external modules:

```
demos/
  extensions/
    __init__.py
    custom_models.py  <- Your custom classes
```

This approach is better when you want to:
- Reuse models across notebooks
- Write proper tests
- Version control your implementations

## 2.1 Importing from External Module

In [None]:
import sys
from pathlib import Path

# Add demos directory to path
demos_dir = Path(".").resolve()
if str(demos_dir) not in sys.path:
    sys.path.insert(0, str(demos_dir))

# Import custom models
from extensions.custom_models import (
    NNEnsembleRecommender as ModularRecommender,
    NNEnsembleSurrogate as ModularSurrogate,
    ProbabilityOfImprovement as ModularPI,
)

print("Imported from extensions/custom_models.py:")
print(f"  - NNEnsembleRecommender: {ModularRecommender}")
print(f"  - NNEnsembleSurrogate: {ModularSurrogate}")
print(f"  - ProbabilityOfImprovement: {ModularPI}")

## 2.2 Complete Optimization with Imported Recommender

In [None]:
# Create new project
folio.create_project(
    name="modular_demo",
    inputs=[
        InputSpec("x1", "continuous", bounds=(0.0, 10.0)),
        InputSpec("x2", "continuous", bounds=(0.0, 10.0)),
    ],
    outputs=[OutputSpec("yield")],
    target_configs=[TargetConfig(objective="yield", objective_mode="maximize")],
)

# Inject modular recommender
project = folio.get_project("modular_demo")
folio._recommenders["modular_demo"] = ModularRecommender(
    project,
    n_members=5,
    n_initial=3,
    beta=2.0,
)

print(f"Injected: {type(folio._recommenders['modular_demo']).__name__}")

In [None]:
# Run optimization
np.random.seed(789)
N_ITERATIONS = 10

print("Optimization with imported NNEnsembleRecommender:")
for i in range(N_ITERATIONS):
    suggestion = folio.suggest("modular_demo")[0]
    x1, x2 = suggestion["x1"], suggestion["x2"]
    result = synthetic_experiment(x1, x2)
    
    folio.add_observation(
        project_name="modular_demo",
        inputs={"x1": x1, "x2": x2},
        outputs={"yield": result},
    )
    
    print(f"  Iter {i+1:2d}: x1={x1:.2f}, x2={x2:.2f} -> yield={result:.2f}")

observations = folio.get_observations("modular_demo")
yields = [obs.outputs["yield"] for obs in observations]
best_idx = np.argmax(yields)
best_obs = observations[best_idx]
print(f"\nBest: yield={yields[best_idx]:.2f} at ({best_obs.inputs['x1']:.2f}, {best_obs.inputs['x2']:.2f})")

---

## Summary

### Extension Points

| Component | Interface | Use Case |
|-----------|-----------|----------|
| **Surrogate** | `fit()`, `predict()` | Custom uncertainty models (ensembles, BNNs) |
| **Acquisition** | `build()` returning `AcquisitionFunction` | Custom acquisition functions for BoTorch models |
| **Recommender** | `recommend_from_data()` | Complete custom optimization strategy |

### Custom Recommender Template

```python
from folio.recommenders.base import Recommender

class MyRecommender(Recommender):
    def __init__(self, project, **kwargs):
        super().__init__(project)  # REQUIRED
        # Store parameters
    
    def recommend_from_data(
        self,
        X: np.ndarray,        # (n_samples, n_features)
        y: np.ndarray,        # (n_samples, n_objectives)
        bounds: np.ndarray,   # (2, n_features)
        maximize: list[bool], # per-objective
    ) -> np.ndarray:          # (n_features,)
        # Your optimization logic here
        return next_x

# Usage with Folio
project = folio.get_project("my_project")
folio._recommenders["my_project"] = MyRecommender(project)
suggestion = folio.suggest("my_project")  # Uses your recommender!
```

### Key Points

1. **Recommender** is the main extension point for using custom models with Folio's API
2. **Inject** custom recommenders via `folio._recommenders[project_name] = ...`
3. **Surrogate** and **Acquisition** are useful for BoTorch-based approaches
4. Start **inline** for prototyping, move to **modules** for production

In [None]:
# Cleanup
folio.delete_project("nn_ensemble_demo")
folio.delete_project("modular_demo")
print("Demo complete!")