## Tutorial: Building a problem and data retriever from scratch

In this tutorial, we will build a new optimization problem (which is not present in the examples provided in the toolbox), to show how the classes related to the optimization problem and its data retriever should be set up.

The problem we consider is the spherically constrained quadratic problem (SCQP):

$\text{min}_X\; 0.5\cdot\mathbb{E}[||X^T\mathbf{y}(t)||^2]+\text{trace}(X^TB)$
$\text{s.t. } \text{trace}(X^TX)=1.$ 


In [None]:
import numpy as np

from dasftoolbox import OptimizationProblem
from dasftoolbox import DataRetriever, DataWindowParameters
from dasftoolbox import ProblemInputs
from dasftoolbox.utils import autocorrelation_matrix, make_symmetric, normalize

import pymanopt
from pymanopt.manifolds import Sphere
from pymanopt import Problem
from pymanopt.optimizers import TrustRegions
import autograd

import matplotlib as mpl
import matplotlib.pyplot as plt

# Choose plot backend.
mpl.use("macosx")
# mpl.use('Qt5Agg')
# mpl.use('TkAgg')
# mpl.use("Agg")
from dasftoolbox import NetworkGraph, ConvergenceParameters
from dasftoolbox import DASF

Note that the contraint of the problem corresponds to a sphere, for which a manifold optimization method can be applied. Therefore, we will use the `pymanopt` library (https://pymanopt.org) to solve the SCQP problem with a trust regions solver.

We build the **`SCQPProblem`** class as follows:
- We make it a child class of **`OptimizationProblem`** such that all attributes and methods are inherited.
- The initialization requires the number of filters (or number of columns of `X`), other attributes are optional.
- The solver is implemented inside the **`solve`** method, for which the problem inputs of class **`ProblemInputs`** must be provided.
- The **`evaluate_objective`** method is used for evaluating the objective at a given `X` and for given inputs.
- In the case of the SCQP, we do not need to provide a **`resolve_ambiguity`** function, as the solution is unique.

An implementation is provided below:

In [None]:
class SCQPProblem(OptimizationProblem):
    def __init__(self, nb_filters: int) -> None:
        super().__init__(nb_filters=nb_filters)

    def solve(
        self,
        problem_inputs: ProblemInputs,
        save_solution: bool = False,
        convergence_parameters=None,
        initial_estimate=None,
    ) -> np.ndarray:
        """Solve the SCQP problem min 0.5 * E[||X.T @ y(t)||**2] + trace(X.T @ B) s.t. trace(X.T @ Gamma @ X)=1."""
        Y = problem_inputs.fused_signals[0]
        B = problem_inputs.fused_constants[0]
        Gamma = problem_inputs.fused_quadratics[0]

        manifold = Sphere(np.size(B, 0), np.size(B, 1))

        Ryy = autocorrelation_matrix(Y)

        Gamma = make_symmetric(Gamma)

        L = np.linalg.cholesky(Gamma)
        Ryy_t = np.linalg.inv(L) @ Ryy @ np.linalg.inv(L).T
        Ryy_t = make_symmetric(Ryy_t)
        B_t = np.linalg.inv(L) @ B

        @pymanopt.function.autograd(manifold)
        def cost(X):
            return 0.5 * autograd.numpy.trace(X.T @ Ryy_t @ X) + autograd.numpy.trace(
                X.T @ B_t
            )

        problem = Problem(manifold=manifold, cost=cost)
        problem.verbosity = 0

        solver = TrustRegions(verbosity=0)
        X_star = solver.run(problem).point
        X_star = np.linalg.inv(L.T) @ X_star

        if save_solution:
            self._X_star = X_star

        return X_star

    def evaluate_objective(self, X: np.ndarray, problem_inputs: ProblemInputs) -> float:
        """Evaluate the SCQP objective 0.5 * E[||X.T @ y(t)||**2] + trace(X.T @ B)."""
        Y = problem_inputs.fused_signals[0]
        B = problem_inputs.fused_constants[0]

        Ryy = autocorrelation_matrix(Y)

        f = 0.5 * np.trace(X.T @ Ryy @ X) + np.trace(X.T @ B)

        return f

The next step is to create a data retriever using a class that inherits the **`DataRetriever`** class. Here, we will again use a synthetic dataset that will be constructed inside the **`SCQPDataRetriever`** class. Note that the data retriever's only purpose is to return a window of data, so in cases where a real dataset is used, there is no need to create artificial data, making the functions inside the retriever much smaller than the example we provide, since here we implement the data generation from scratch.

The data retriever should be provided an instance of **`DataWindowParameters`** which will be used to return the correct window of data.

In [None]:
class SCQPDataRetriever(DataRetriever):
    def __init__(
        self,
        data_window_params: DataWindowParameters,
        nb_sensors: int,
        nb_sources: int,
        nb_windows: int,
        nb_filters: int,
        rng: np.random.Generator,
        signal_var: float = 0.5,
        noise_var: float = 0.1,
        mixture_var: float = 0.5,
        diff_var: float = 1,
    ) -> None:
        self.data_window_params = data_window_params
        nb_samples = data_window_params.window_length
        self.nb_sensors = nb_sensors
        self.D = rng.normal(
            loc=0,
            scale=np.sqrt(signal_var),
            size=(nb_sources, nb_samples),
        )
        self.A_0 = rng.normal(
            loc=0, scale=np.sqrt(mixture_var), size=(nb_sensors, nb_sources)
        )
        self.Delta = rng.normal(
            loc=0, scale=np.sqrt(mixture_var), size=(nb_sensors, nb_sources)
        )
        self.Delta = (
            self.Delta
            * np.linalg.norm(self.A_0, "fro")
            * diff_var
            / np.linalg.norm(self.Delta, "fro")
        )
        self.noise = rng.normal(
            loc=0,
            scale=np.sqrt(noise_var),
            size=(nb_sensors, nb_samples),
        )
        self.B = rng.normal(loc=0, scale=1, size=(nb_sensors, nb_filters))

        self.weights = self.weight_function(nb_windows)

    def get_data_window(self, window_id: int) -> ProblemInputs:
        Y_window = (
            self.A_0 + self.Delta * self.weights[window_id]
        ) @ self.D + self.noise
        Y_window = normalize(Y_window)
        scqp_inputs = ProblemInputs(
            fused_signals=[Y_window],
            fused_constants=[self.B],
            fused_quadratics=[np.eye(self.nb_sensors)],
        )
        return scqp_inputs

    def weight_function(self, nb_windows: int) -> np.ndarray:
        if nb_windows < 10:
            weights = np.zeros(nb_windows)
        else:
            segment_1 = np.linspace(0, 1, int(5 * nb_windows / 10), endpoint=False)
            segment_2 = np.linspace(0, 1, int(3 * nb_windows / 10), endpoint=False)
            segment_3 = np.linspace(0, 1, int(2 * nb_windows / 10), endpoint=False)

            weights = np.concatenate([segment_1, segment_2, segment_3])
        return weights
