# Notebook for first run through of fragility code
1. All .py files sent will be included in first few cells 
2. The actual example files will use edfs from intracranial data I am working with (#example-code)

All code on Python 3.12.9

## Fragility code

Sysid file

In [2]:
import numpy as np
import scipy
from scipy.linalg import sqrtm
from scipy.sparse import linalg

SUPPORTED_HANKEL_VECTORIZATIONS = ["coo", "dok", "csr"]


class SIDBase(object):
    """System Identification base class.

    Helper functions for linear system identification and common
    statistical methods for de-biasing the estimation. For example:

    1. Forward-backwards
    2. Total least-squares
    """

    def _construct_snapshots(self, snapshots, order, n_times):
        snaps = np.concatenate(
            [snapshots[:, i : n_times - order + i + 1] for i in range(order)],
            axis=0,
        )
        return snaps

    @staticmethod
    def _col_major_2darray(X):
        """Store snapshots into a 2D matrix, by column.

        If the input data is already formatted as 2D
        array, the method saves it, otherwise it also saves the original
        snapshots shape and reshapes the snapshots.

        Parameters
        ----------
        X : int or numpy.ndarray
            the input snapshots.

        Returns
        -------
        snapshots : np.ndarray
            the 2D matrix that contains the flatten snapshots
        snapshots_shape : tuple
            the shape of original snapshots
        """
        # If the data is already 2D ndarray
        if isinstance(X, np.ndarray) and X.ndim == 2:
            snapshots = X
            snapshots_shape = None
        else:
            input_shapes = [np.asarray(x).shape for x in X]

            if len(set(input_shapes)) != 1:
                raise ValueError("Snapshots have not the same dimension.")

            snapshots_shape = input_shapes[0]
            snapshots = np.transpose([np.asarray(x).flatten() for x in X])


        return snapshots, snapshots_shape

    def _solve_regression(
        self,
        X,
        Y,
        l2_penalty,
        solver="auto",
        fit_intercept=False,
        normalize=False,
        random_state=123456,
        sample_weight=None,
        ch_weight=None,
    ):
        from sklearn.linear_model import Ridge

        clf = Ridge(
            alpha=l2_penalty,
            fit_intercept=fit_intercept,
            normalize=normalize,
            solver=solver,
            random_state=random_state,
        )

        # n_samples X n_features and n_samples X n_targets
        clf.fit(X.T, Y.T, sample_weight=sample_weight)

        # n_targets X n_features
        A = clf.coef_
        return A

    def _pinv_solve(self, X, Y, l2_penalty, solver="svd", backend="numpy"):
        """Solves linear regression using pseudoinverse.

        Can either use standard pseudoinverse solve, or
        performs l2-regularization [1].

        Parameters
        ----------
        X : np.ndarray
            the first matrix;
        Y : np.ndarray
            the second matrix;
        l2_penalty : float | None
            The l2 penalty coefficient in L2 regularization.
        solver : str
            The solver strategy for linear regression. Either uses
            ``'lstsq'`` (calls :func:`np.linalg.pinv`, or :func:`np.linalg.lstsq`),
             or ``'svd'`` (uses SVD methods).
        backend : str
            Either ``'numpy'`` (default), or ``'scipy'``.

        Returns
        -------
        adjmat : np.ndarray
            The corresponding ``A`` matrix.

        References
        ----------
        .. [1] https://www.cs.princeton.edu/courses/archive/spring07/cos424/scribe_notes/0403.pdf  # noqa
        """
        if backend not in ["scipy", "numpy"]:
            msg = f'Only "scipy", or "numpy" backend is accepted, not {backend}.'
            raise ValueError(msg)
        elif backend == "scipy":
            backend_func = scipy
        elif backend == "numpy":
            backend_func = np

        n_chs = X.shape[0]

        if l2_penalty is None or l2_penalty == 0:
            if solver == "lstsq":
                adjmat = Y.dot(scipy.linalg.pinv(X))
            elif solver == "svd":
                # or use numpy linalg
                adjmat = Y.dot(backend_func.linalg.pinv(X))
        else:  # pragma: no cover

            # apply perturbation on diagonal of X^TX
            tikhonov_regularization = l2_penalty * np.eye(n_chs)

            if solver == "lstsq":
                adjmat = backend_func.linalg.solve(
                    X.T.dot(X) + tikhonov_regularization, X.T.dot(Y)
                )
            elif solver == "svd":
                inner_regularized = X.dot(X.T) + tikhonov_regularization
                _tikhonov_pinv_mat = X.T.dot(np.linalg.inv(inner_regularized))

                # solve analytical solution for regularized L2 regression
                adjmat = Y.dot(_tikhonov_pinv_mat)
        return adjmat

    @staticmethod
    def _compute_tlsq(X, Y, tlsq_rank):
        """Compute Total Least Square.

        Assumes the model is the following:
        ::

            Y = AX

        where ``X`` and ``Y`` are passed in arrays.

        Parameters
        ----------
        X : np.ndarray
            the first matrix;
        Y : np.ndarray
            the second matrix;
        tlsq_rank : int
            the rank for the truncation; If 0, the method
            does not compute any noise reduction; if positive number, the
            method uses the argument for the SVD truncation used in the TLSQ
            method.

        Returns
        -------
        X : np.ndarray
            the denoised matrix X,
        Y : np.ndarray
            the denoised matrix Y

        References
        ----------
        .. [1] https://arxiv.org/pdf/1703.11004.pdf
        .. [2] https://arxiv.org/pdf/1502.03854.pdf
        """
        # Do not perform tlsq
        if tlsq_rank == 0:
            return X, Y

        V = np.linalg.svd(np.append(X, Y, axis=0), full_matrices=False)[-1]
        rank = min(tlsq_rank, V.shape[0])
        VV = V[:rank, :].conj().T.dot(V[:rank, :])

        return X.dot(VV), Y.dot(VV)


class PostHocMixin:
    """Mixin class for performing post-hoc modifications to identified systems."""

    @staticmethod
    def stabilize_matrix(A, eigval):
        """
        Stabilize the matrix, A based on its eigenvalues.

        Assumes discrete-time linear system.

        Parameters
        ----------
        A : np.ndarray
             CxC matrix
        eigval : float
            the maximum eigenvalue to shift all large evals to
        Returns
        -------
        A : np.ndarray
            the stabilized matrix w/ eigenvalues <= eigval
        """
        # check if there are unstable eigenvalues for this matrix
        if np.sum(np.abs(np.linalg.eigvals(A)) > eigval) > 0:
            # perform eigenvalue decomposition
            eigA, V = scipy.linalg.eig(A)

            # get magnitude of these eigenvalues
            abs_eigA = np.abs(eigA)

            # get the indcies of the unstable eigenvalues
            unstable_inds = np.where(abs_eigA > eigval)[0]

            # move all unstable evals to magnitude eigval
            for ind in unstable_inds:
                # extract the unstable eval and magnitude
                unstable_eig = eigA[ind]
                this_abs_eig = abs_eigA[ind]

                # compute scaling factor and rescale eval
                k = eigval / this_abs_eig
                eigA[ind] = k * unstable_eig

            # recompute A
            eigA_mat = np.diag(eigA)
            Aprime = np.dot(V, eigA_mat)
            A = scipy.linalg.lstsq(V.T, Aprime.T)[0].T
        return A





Perturbation model file

In [3]:
from typing import Union, List
import numpy as np
import scipy

import utils.dataconfig as constants

SUPPORTED_PERTURBATION_OPT_METHODS = ["dc", "grid"]
PERTURBATION_STRATEGIES = ["univariate", "bivariate"]


def ensure_list(arg):
    if not (isinstance(arg, list)):
        try:  # if iterable
            if isinstance(arg, (str, dict)):
                arg = [arg]
            elif arg is None:
                arg = []
            else:
                arg = list(arg)
        except BaseException:  # if not iterable
            arg = [arg]
    return arg



def _create_standard_basis_vector(size, index, as_vector=False):
    arr = np.zeros(size)
    arr[index] = 1.0
    if as_vector:  # turn into 1 vector in R^d, not a list
        arr = arr[:, np.newaxis]
    return arr


def compute_brauer_rank_one(
    A: np.ndarray, radius: complex, eigval_idx: int = 0, rank: int = 1
):
    """Applies Brauer's rank one perturbation algorithm.

    Perturbs the leading eigenvalue of matrix A to obtain eigenvalue
    with value at ``radius``. Will perform a perturbation on the
    eigenvalue at index ``eigval_idx``, which is the index after
    sorting the eigenvalues from largest to smallest in absolute value.

    Parameters
    ----------
    %(A)s
    %(radius)s
    eigval_idx : int
        The eigenvalue to apply perturbation to.
    rank : int
        The rank of the perturbation to apply. (Default = 1).
        Results in ``rank`` number of eigenvalues that are
        changed.

    Returns
    -------
    delta_vec : np.ndarray
        The (N, N) rank-1 matrix that when applied to ``A`` will
        result in an eigenvalue with value ``radius``.

    References
    ----------
    .. [1] A. Brauer, Limits for the characteristic roots of a matrix IV: Applications to stochastic matrices, Duke Math. J. 19
        (1952) 75–91.
    .. [2] H. Perfect, Methods of constructing certain stochastic matrices II, Duke Math. J. 22 (1955) 305–311.
    """
    # perform eigenvalue decomposition of A
    W, V = scipy.linalg.eig(A)

    # get max to min of eigenvalues
    sorted_inds = np.argsort(W)[::-1]

    # take eigenvalue as the first entry
    eigval = W[sorted_inds[eigval_idx]]
    eigvec = V[:, sorted_inds[eigval_idx]]

    # compute the perturbation of lambda
    eigval_diff = radius - eigval

    # format into 2D arrays to perform least squares
    eigvec_arr = eigvec[None, :]
    eigval_diff_arr = np.array([radius - eigval])

    # solve least squares sense
    eigval_perturbation_vec, res, rnk, s = scipy.linalg.lstsq(
        eigvec_arr, eigval_diff_arr
    )
    eigval_perturbation = eigval_perturbation_vec.conj().T.dot(eigvec)
    assert np.allclose(eigval_diff, eigval_perturbation)

    # compute the delta vector
    delta_vec = np.outer(eigvec, eigval_perturbation_vec.conj().T)

    return delta_vec


def compute_bauer_fike_bound(A, E, p=2):
    """Compute the Bauer-Fike bound.

    Computes the bound of eigenvalue movement
    based on the condition number of the
    eigenvector matrix, and the p-norm of the
    perturbation matrix.

    Parameters
    ----------
    A : np.ndarray
        The matrix. Must be diagonalizable.
    E : np.ndarray
    p : str | int
        The p-norm. Corresponds to to the ``'p'``
        in :func:`np.linalg.cond` and the
        ``'ord'`` in :func:`np.linalg.norm`.

    Returns
    -------
    bound : float
        The Bauer-Fike theorem bound.

    References
    ----------
    .. [1] https://en.wikipedia.org/wiki/Bauer%E2%80%93Fike_theorem
    """
    # compute eigenvalue decomposition of A
    W, V = scipy.linalg.eig(A)

    # compute the condition number of V
    cond_num = np.linalg.cond(V, p=p)

    # compute the norm of the perturbation matrix
    pert_norm = np.linalg.norm(E, ord=p)

    return cond_num * pert_norm


class StructuredPerturbationModel(object):
    r"""Structured rank-1 perturbations of column/rows of a matrix.

    :math:`x(t+1) = (A + {\\Delta}) x(t)`

    The algorithm takes in a np.ndarray matrix of data that is NxN and computes the
    minimum 2-norm required to have one eigenvalue of the matrix A at "r".

    Parameters
    ----------
    %(radius)s
    %(perturb_type)s
    %(perturbation_strategy)s
    %(n_jobs)s
    %(verbose)s

    Notes
    -----
    Model can either perform grid search over all possible frequencies of perturbation (i.e. placements
    along the circle specified by "radius". Application at only the DC frequency (a+jb = radius; b = 0)
    seems to work sufficiently and is only one search.
    """

    def __init__(
        self,
        radius: Union[float, str],
        perturb_type: str,
        perturbation_strategy: str = "univariate",
        n_jobs: int = 1,
        on_error: str = "raise",
        verbose: bool = False,
    ):
        # ensure perturbation type is capitalized
        perturb_type = perturb_type.upper()

        if perturb_type not in [
            constants.PERTURBATIONTYPES.COLUMN_PERTURBATION.value,
            constants.PERTURBATIONTYPES.ROW_PERTURBATION.value,
        ]:
            msg = (
                "Perturbation type can only be {}, or {} for now. You "
                "passed in {}.".format(
                    constants.PERTURBATIONTYPES.COLUMN_PERTURBATION,
                    constants.PERTURBATIONTYPES.ROW_PERTURBATION,
                    perturb_type,
                )
            )
            raise AttributeError(msg)
        if perturbation_strategy not in PERTURBATION_STRATEGIES:
            msg = (
                f"Only perturbation strategies {PERTURBATION_STRATEGIES} "
                f"are accepted, not {perturbation_strategy}."
            )
            raise AttributeError(msg)

        self._radius = radius
        self._perturb_type = perturb_type
        self.perturbation_strategy = perturbation_strategy
        self.n_jobs = n_jobs
        self.verbose = verbose
        self.on_error = on_error

        self._min_frequency = None
        self._delta_vec_arr = None
        self._min_norms = None

    def __str__(self):
        """Representation of model and its parameters."""
        return (
            f"Structured Perturbation Model | "
            f"radius={self.radius}, perturb_type={self.perturb_type}"
        )

    @property
    def radius(self):
        return self._radius

    @property
    def perturb_type(self):
        return self._perturb_type

    @property
    def minimum_freqs(self) -> Union[List, int]:
        """Corresponding frequencies that minimum norm perturbations occur."""
        return self._min_frequency

    @property
    def minimum_delta_vectors(self) -> np.ndarray:
        """Corresponding perturbation vectors with minimum norm.

        The delta vectors are N x N, where N is the number of channels.
        The rows correspond to the perturbation vector. For example,
        ``self._delta_vec_arr[0, :]``
        """
        return self._delta_vec_arr

    @property
    def minimum_norms_vector(self):
        """Compute the l2 norms of the minimum-norm delta vectors.

        This will result in a N x 1 vector.
        """
        return np.linalg.norm(self.minimum_delta_vectors, ord=2, axis=1)

    def _compute_min_norm_delta_vec(
        self,
        A: np.ndarray,
        radius: complex,
        eks: List[int],
        orthogonal_constraints: List[int] = None,
    ):
        r"""Compute a $\Delta$ matrix that has minimum 2-norm.

        Based on the radius (``r = a + ib``, where ``a`` is the real
        component and ``b`` is the imaginary component) on the complex plane,
        a matrix, $\Delta$ is computed such that $A + \Delta$ has an
        eigenvalue at ``radius``.

        The $\Delta$ matrix is a rank-1 matrix that comprises of the same
        vector at the column/row indices in ``eks`` list.

        Note that the $\Delta$ matrix may be generalized to have multiple
        non-zero columns, depending on the passed in indices ``eks``.
        The perturbation is either a Row, or Column.

        Parameters
        ----------
        %(A)s
        radius : np.complex
            The perturbation radius (``np.abs(radius)``), which may have a real and
            imaginary component.
        eks : List[int]
            A list of the column/row indices to compute a perturbation on.
        %(orthogonal_constraints)s

        Returns
        -------
        delta_vec : np.ndarray
            The perturbation vector that is applied at rows/columns in ``eks``,
            which as size ``(n_chs,)``.

        Notes
        -----
        To re-compute the $\Delta$ matrix, one would add the ``delta_vec``
        vector along the rows/columns of a (n_chs X n_chs) zero matrix.
        For example: ..

            delta_mat = np.zeros((n_chs, n_chs))

            for edx in eks:
                # if we used a row perturbation
                delta_mat[edx, :] = delta_vec

                # if we used a column perturbation
                delta_mat[:, edx] = delta_vec

            # now the perturbed A matrix should have
            # eigenvalue at location radius
            print(np.linalg.eigs(A + delta_mat))

        """
        if orthogonal_constraints is None:
            orthogonal_constraints = []
        perturb_type = self._perturb_type

        # ensure all indices of the unit vector are a list
        eks = ensure_list(eks)

        if isinstance(A, np.ndarray):
            if A.shape[0] != A.shape[1]:
                msg = (
                    f"State matrix must be a square matrix. The state matrix "
                    f"passed in has shape {A.shape}."
                )
                raise AttributeError(msg)

        # initialize function parameters
        len_perturbation = A.shape[0]

        # initialize vector to desired eigenvalue
        sigma = radius.real
        omega = radius.imag
        desired_eig = sigma + 1j * omega

        # generate list of the unit vectors
        unit_vecs = [
            _create_standard_basis_vector(len_perturbation, idx, as_vector=True)
            for idx in eks
        ]

        # generate array of constraints
        constraint_vec = np.array([0, -1])
        bvec = np.tile(constraint_vec, len(unit_vecs))

        # generate the "characteristic" equation for desired eigenvalue
        characteristic_eqn = A - desired_eig * np.eye((len_perturbation))

        # generate the data matrix for running least squares
        Hmat = []  # will be 2*len(indices) X n_chs
        for idx, ek in enumerate(unit_vecs):
            # determine if to compute row, or column perturbation
            if (
                perturb_type == constants.PERTURBATIONTYPES.ROW_PERTURBATION.value
            ):  # C = inv(A)*ek
                Cmat = np.linalg.lstsq(characteristic_eqn, ek, rcond=-1)[0]
            elif perturb_type == constants.PERTURBATIONTYPES.COLUMN_PERTURBATION.value:
                Cmat = np.linalg.lstsq(characteristic_eqn.T, ek, rcond=-1)[0].T

            # extract real and imaginary components to create vector of constraints
            Cimat = np.imag(Cmat)
            Crmat = np.real(Cmat)

            # store these in vector
            Hmat.append(Cimat)
            Hmat.append(Crmat)
        Hmat = np.array(Hmat).squeeze()
        # original least squares w/ constraints
        if np.imag(desired_eig) == 0:
            Bmat = Hmat[1::2, :]
            bvec = bvec[1::2]
        else:
            Bmat = Hmat
            bvec = bvec.copy()

        # adding orthogonality constraints
        # for each orthogonal constraint will make sure the Delta vector(s) are
        # 0 at that index. For example, if Delta vector(s) are bi-columnar, then
        # orthogonal_constraints of 0 and 50 would make sure the Delta vector
        # has value 0 at indices 0 and 50 in both columns
        if len(orthogonal_constraints) > 0:
            for ekidx in orthogonal_constraints:
                ek = _create_standard_basis_vector(
                    len_perturbation, ekidx, as_vector=True
                )

                # orthogonality constraints based on row/column
                if (
                    self.perturbtype
                    == constants.PERTURBATIONTYPES.ROW_PERTURBATION.value
                ):
                    Orthmat = ek
                    Bmat = np.concatenate((Bmat.T, Orthmat), axis=1).T
                    bvec = np.hstack((bvec, [0]))
                else:
                    Orthmat = ek.T
                    Bmat = np.concatenate((Bmat, Orthmat), axis=0)
                    bvec = np.hstack((bvec, [0])).T

        # compute the delta vector that will solve the argmin problem
        # print(Bmat.shape)
        # print(np.linalg.cond(Bmat))
        # print(np.linalg.svd(Bmat))
        delvec = np.linalg.pinv(Bmat, rcond=-1).dot(bvec)

        # return a 1D vector (real component). Note the
        # imaginary component is 0
        return delvec.squeeze()

    def compute_perturbations_at_indices(
        self,
        A: np.ndarray,
        radius: Union[complex, float, str],
        perturbation_strategy: str,
        n_perturbations: int = None,
        start: int = None,
        orthogonal_constraints=None,
    ):
        """Compute single column/row perturbation of an ``A`` matrix.

        Note the ``perturb_type`` is set on instantiation of the method.

        Parameters
        ----------
        %(A)s
        radius : np.complex
            The perturbation radius (``np.abs(radius)``), which may have a real and
            imaginary component.
        %(perturbation_strategy)s
        n_perturbations : int
            The number of channels in the original dataset.
        %(orthogonal_constraints)s

        Returns
        -------
        minperturbnorm : np.ndarray
            Norms of the perturbation vectors (n_chs, 1).
        delvecs_node : np.ndarray
            Delta perturbation vectors (n_chs, n_chs). Each row
            is a new perturbation vector.

        Notes
        -----
        ``univariate`` perturbation strategy:
        Perturbs ``A`` with a delta vector at one column/row indices at a time
        to obtain an eigenvalue with value at ``radius``. This will compute
        a delta vector and corresponding norms of the delta vectors over
        all columns, or rows one at a time.

        ``bivariate`` perturbation strategy:
        Perturbs ``A`` with a delta vector at two column/row indices at a time
        to obtain an eigenvalue with value at ``radius``. This will compute
        a delta vector and corresponding norms of the delta vectors over
        all columns, or rows one at a time.
        """
        if n_perturbations is None:
            n_chs = A[0].shape[0]
            if self.perturbation_strategy == "univariate":
                n_perturbations = n_chs
            elif self.perturbation_strategy == "bivariate":
                n_perturbations = n_chs // 2
        n_perturbations = int(n_perturbations)
        # initialize array to store all the minimum euclidean norms
        minperturbnorm = np.zeros((n_perturbations,))

        # initialize NxN array to store the delta vectors with each row being
        # a delta vector computed for that specific `ek` unit vector
        delvecs_node = np.zeros((n_perturbations, A.shape[0]))


        if start is None:
            start = 0

        for idx, ek in enumerate(range(n_perturbations)):
            if perturbation_strategy == "univariate":
                ek = [ek + start]
            elif perturbation_strategy == "bivariate":
                ek = [ek, ek + (n_perturbations // 2)]  # + start

            delta_vec = self._compute_min_norm_delta_vec(
                A, radius, ek, orthogonal_constraints=orthogonal_constraints
            )

            # store the l2 norm of the perturbation vector
            min_norm = np.linalg.norm(delta_vec)
            # store the minimum norm perturbation and vector
            minperturbnorm[idx] = min_norm
            # store the vector corresponding to minimum norm perturbation
            delvecs_node[idx, :] = delta_vec

        self._delta_vec_arr = delvecs_node
        self._min_frequency = 0
        return minperturbnorm, delvecs_node

    def fit(self, A: np.ndarray, **kwargs):
        r"""Compute and setup of the least squares soln.

        Assumes, given an A matrix, |lambda| value, and unit vector ek.

        Parameters
        ----------
        %(A)s
        %(orthogonal_constraints)s

        Returns
        -------
        minperturbnorm : np.ndarray
            the l2-norm of the perturbation vector
        """
        radius = self.radius
        perturbation_strategy = self.perturbation_strategy

        if isinstance(A, np.ndarray):
            if A.shape[0] != A.shape[1]:
                msg = (
                    f"State matrix must be a square matrix. The state matrix "
                    f"passed in has shape {A.shape}."
                )
                raise AttributeError(msg)

        e_val = np.abs(np.linalg.eigvals(A)).max()
        if radius == "adaptive":
            # get the spectral radii of Amat
            radius_ = e_val + 0.1
        else:
            radius_ = radius
        if e_val >= np.abs(radius_):
            msg = (
                f"The largest eigenvalue of A has "
                f"absolute value of {e_val}, so "
                f"perturbation to {np.abs(radius)} "
                f"is ill-defined."
            )
            if self.on_error == "raise":
                raise ValueError(msg)

        # make sure A passed in is a list of A matrices
        # if passed in a list, assume it is an order p model
        if isinstance(A, np.ndarray):
            A = [A]

        # check that all A's have same shape
        assert all([x.shape == A[0].shape for x in A])

        if self.perturbation_strategy == "bivariate":
            assert np.mod(A[0].shape[0], 2) == 0

        # check the order of the model imposed
        # from state estimation
        order = len(A)

        # perturbation norms and delta vector list
        min_norms = []
        delta_vecs_list = []

        # run lti perturbation method
        for Amat in A:
            # This method, only perturbs at :math:`\omega=0`
            minperturbnorm, delta_vecs = self.compute_perturbations_at_indices(
                Amat,
                radius=radius_,
                perturbation_strategy=perturbation_strategy,
                **kwargs,
            )
            min_norms.append(minperturbnorm)
            delta_vecs_list.append(delta_vecs)
            self._min_norms = min_norms
            self._delta_per_node_arr = delta_vecs_list

        if order == 1:
            return min_norms[0]
        return min_norms


class ConjugateStructurePerturbation(StructuredPerturbationModel):
    """Structured rank-1 perturbations that occur over the entire complex disc.

    Compared to ``StructuredPerturbationModel``, this model will
    perturb over all eigenvalues with absolute value of ``radius``.
    The number of points to perturb to will be set by ``mesh_size``.

    For example, if ``radius=1.5`` and ``mesh_size=3``, then between
    ``r=1.5`` to ``r=-1.5``, there will be 51 points discretized along
    the complex disc with radius of 1.5. They will be ``r=1.5j``. We
    do not need to perturb at the conjugate pairs, since every
    real perturbation vector on a real matrix will produce conjugate
    pairs. Therefore we only apply perturbations on the top half
    of the complex disc.

    Parameters
    ----------
    %(radius)s
    %(perturb_type)s
    mesh_size : int
        Number of discrete points (default=51) on the real/imaginary
        circle w/ ``np.abs(radius)`` to search over.
    %(perturbation_strategy)s
    %(n_jobs)s
    %(verbose)s

    """

    def __init__(
        self,
        radius: float,
        perturb_type: str,
        mesh_size: int,
        perturbation_strategy: str = "univariate",
        n_jobs: int = 1,
        verbose: bool = False,
    ):
        super(ConjugateStructurePerturbation, self).__init__(
            radius=radius,
            perturb_type=perturb_type,
            perturbation_strategy=perturbation_strategy,
            n_jobs=n_jobs,
            verbose=verbose,
        )

        self._mesh_size = mesh_size

    @property
    def mesh_size(self):
        return self._mesh_size

    def _compute_gridsearch_perturbation(self, A, searchnum: int = 51):
        r"""
        Generate the minimum norm perturbation model.

        This method, does a grid search over combinations of real, or imaginary eigenvalues
        that have magnitude of "radius". And perturbs at :math:`r = \sigma + \omega j`

        Reference:
        - Sritharan 2014, et al.
        - Li 2017, et al.

        Parameters
        ----------
        %(A)s
        searchnum : int
            the number of discrete points to split the circle w/ specified radius into

        Returns
        -------
        (minperturbnorm, delvecs_node) : tuple[float, np.ndarray]

        minperturbnorm : float
            the l2 norm of the perturbation vector
        delvecs_node: np.ndarray
            [c, c'] is an array of perturbation vectors that achieve minimum norm at each row

        """
        # initialize search parameters to compute minimum norm perturbation
        top_wspace = np.linspace(-self.radius, self.radius, num=searchnum)
        wspace = np.append(top_wspace, top_wspace[1 : len(top_wspace) - 1], axis=0)
        sigma_space = np.sqrt(self.radius ** 2 - top_wspace ** 2)
        sigma_space = np.append(
            -sigma_space, sigma_space[1 : len(top_wspace) - 1], axis=0
        )

        # N x N, A matrix
        numchans, _ = A.shape

        # to store all the euclidean norms of the perturbation vectors
        # frequency x channel (F x N)
        minnorm_mat = np.zeros((wspace.size, numchans))

        # to store all the euclidean norms of the perturbation vectors
        # frequency x perturbed index x channel (F x N x N)
        delvecs_mat = np.zeros((wspace.size, numchans, numchans), dtype="complex")

        # delvecs_list = []

        # to store all the frequencies at which perturbation is applied
        freqs_mat = np.zeros((wspace.size, 1), dtype="complex")

        # compute the min-norm perturbation for omega specified
        for i, (sigma, omega) in enumerate(zip(sigma_space, wspace)):
            # run lti perturbation method for all contacts
            perturbation_results = self._compute_perturbation_on_system(A, sigma, omega)
            min_norm_perturb, delvecs_node = perturbation_results

            # find index of minimum norm perturbation for channel
            minnorm_mat[i, ...] = min_norm_perturb
            # delvecs_list.append(delvecs_node)
            delvecs_mat[i, ...] = delvecs_node.squeeze()
            freqs_mat[i] = sigma + 1j * omega

        # store the minimum norms that are achievable for each contact
        # -> allows for sensitivity to different frequencies, although majority will be
        # sensitive to omega == 0
        # determine argmin of the perturbation norms over all frequencies
        min_indices = np.argmin(minnorm_mat, axis=0)

        # get the minimum perturbation norms, frequencies,
        # and delta vectors
        min_norm_perturb = np.min(minnorm_mat, axis=0)
        min_freqs = sigma_space[min_indices] + 1j * wspace[min_indices]
        # delvecs_node = [delvecs_list[i] for i in min_indices]
        print(
            f"The minimum indices of {minnorm_mat.shape} "
            f"has {len(min_indices)} indices."
        )
        print("Shape of actual minimum norm perturbation ", min_norm_perturb.shape)
        print("Shape of minimum norm perturbation freqs ", min_freqs.shape)
        print("Delta vecs node squeezed: ", delvecs_node.squeeze().shape)
        print(delvecs_mat.shape)
        delvecs_node = delvecs_mat[min_indices, ...]

        self._min_frequency = min_freqs
        self._delta_per_node_arr = delvecs_node
        self._min_norms = min_norm_perturb
        return min_norm_perturb, delvecs_node

    def fit(self, A: np.ndarray, orthogonal_constraints: List[int] = None):
        """Compute perturbation of a square matrix.

        Parameters
        ----------
        %(A)s
        %(orthogonal_constraints)s

        Returns
        -------
        minperturbnorm : np.ndarray
            the l2-norm of the perturbation vector
        """
        pass


Mvar file

In [4]:
import numpy as np
import shutil
import os

from mvar_model import SystemIDModel


def compute_samplepoints(winsamps, stepsamps, numtimepoints):
    # Creates a [n,2] array that holds the sample range of each window that
    # is used to index the raw data for a sliding window analysis
    samplestarts = np.arange(0, numtimepoints - winsamps + 1.0, stepsamps).astype(int)
    sampleends = np.arange(winsamps, numtimepoints + 1, stepsamps).astype(int)

    samplepoints = np.append(
        samplestarts[:, np.newaxis], sampleends[:, np.newaxis], axis=1
    )
    return samplepoints


def compute_statelds_func(eegwin, **model_params):
    mvar_model = SystemIDModel(**model_params)
    # 2: compute state transition matrix using mvar model
    mvar_model.fit(eegwin)
    A_mat = mvar_model.state_array
    return A_mat


def state_lds_array(
    arr: np.ndarray,
    winsize: int = 250,
    stepsize: int = 125,
    l2penalty: float = None,
    method_to_use: str = "pinv",
    n_jobs: int = -1,
    memmap: bool = False,
    verbose: bool = False,
) -> np.ndarray:
    """Compute A state matrix from numpy array.

    If you have a ``mne.io.Raw`` object, and you want to
    run state-lds estimation, then use the ``state_lds_derivative``
    function instead.

    Parameters
    ----------
    arr : np.ndarray
        ndarray containing EEG data; nxT where n is the number of channels and T is the number of timepoints
    sfreq : float
        Sampling frequency of the EEG
    winsize : int
        Window size to sample A matrices in samples
    stepsize : int
        Increment size, in samples, for sliding window for A matrices
    l2penalty : float
        penalty for the A matrix estimation - Adds noise when channels are too similar
    method_to_use : str
        Either 'pinv' or 'hankel'
    %(n_jobs)s
    %(verbose)s

    Returns
    -------
    A_mats : np.ndarray
        nxnxW ndarray where n is the number of channels and W is the number of windows.
        
    """
    # data array should be C x T
    n_chs, n_signals = arr.shape

    # set parameters
    model_params = {
        "l2penalty": l2penalty,
        "method_to_use": method_to_use,
    }

    # compute time and sample windows array
    sample_points = compute_samplepoints(winsize, stepsize, n_signals)
    n_wins = sample_points.shape[0]

    # initialize storage container
    A_mats = np.zeros((n_chs, n_chs, n_wins))

    for idx in range(n_wins):
        data = arr[:, sample_points[idx, 0] : sample_points[idx, 1]].T
        A_mats[..., idx] = compute_statelds_func(data, **model_params)

    return A_mats


Mvar model file

In [5]:
import numpy as np
from scipy.linalg import sqrtm
from sklearn.utils.validation import check_array, check_is_fitted
from sklearn.base import RegressorMixin, MultiOutputMixin
from sklearn.linear_model._base import LinearModel

from sysid import SIDBase, PostHocMixin

SUPPORTED_SYSID_METHODS = ["pinv"]


def inner_transpose_multicompanion(mat, order):
    """Transpose inner matrices of a multi-companion matrix.

    ``mat`` comprises of a sequence of square matrices,
    ``A_1``, ..., ``A_order`` along with corresponding
    identity matrices where they should be. This function
    will transpose the ``A_1``, ..., ``A_order`` matrices.

    Parameters
    ----------
    mat : np.ndarray
    order : int

    Returns
    -------
    mat : np.ndarray
        Inner

    Notes
    -----
    ``mat`` has exactly (n_chs * order, n_chs * order) shape,
    so ``n_chs`` can be obtained from the shape of ``mat``.


    References
    ----------
    https://core.ac.uk/download/pdf/82792651.pdf
    """
    mat = mat.copy()  # create a copy
    n_chs = mat.shape[0] / order

    assert np.mod(mat.shape[0], order) == 0

    start_col = n_chs * (order - 1)
    start_row = 0
    for idx in range(order):
        grid = np.ix_(
            np.arange(start_row, start_row + n_chs, dtype=int),
            np.arange(start_col, start_col + n_chs, dtype=int),
        )
        # get the subset mat matrix
        submat = mat[grid]
        mat[grid] = submat.T
        start_row += n_chs
    return mat


class SystemIDModel(
    SIDBase,
    PostHocMixin,
    LinearModel,
    MultiOutputMixin,
    RegressorMixin,
):
    """Multivariate-autoregressive style algorithm used for estimating a linear system.

    The model is of the style:

        $x(t+1) = Ax(t)$

    The algorithm takes in a window of data that is CxT, where C is typically the
    number of channels and T is the number of samples for a specific window to generate
    the linear system.

    It uses either:
     1. a dynamic mode decomposition SVD based algorithm,
     2. SVD truncated algorithm
     3. Pseudoinverse with regularization

    to estimate A.

    Attributes
    ----------
    l2penalty : float
        The l2-norm regularization if 'regularize' is turned on. Only used
        if ``method_to_use`` is ``'pinv'``.
    method_to_use : str
        svd as the method to compute A matrix
    svd_rank : float
        Passed to :func:`eztrack.embed.svd.computeSVD`. If ``None``,
        will be the number of channels in the data matrix.
        Only used if ``method_to_use`` is ``'svd'``.
    tlsq_rank : int
        rank truncation computing Total Least Square. Default
        is 0, that means no truncation. See [1].
    fb : bool
        Whether to apply the Forward-Backwards algorithm. See Notes and [2].
    order : int
        The order of the model to estimate (default=1).
    solver : str
        Only used if ``method_to_use='sklearn'``. See :func:`sklearn.linear_model.Ridge` parameters.
    fit_intercept : bool
        Only used if ``method_to_use='sklearn'``. See :func:`sklearn.linear_model.Ridge` parameters.
    normalize : bool
        Only used if ``method_to_use='sklearn'``. See :func:`sklearn.linear_model.Ridge` parameters.

    Notes
    -----
    When the size of the data is too large (e.g. N > 180, W > 1000), then right now the construction of the csr
    matrix scales up. With more efficient indexing, we can perhaps decrease this.

    References
    ----------
    .. [1] De-biasing the dynamic mode decomposition for applied Koopman
        spectral analysis of noisy datasets. https://arxiv.org/pdf/1703.11004.pdf
    .. [2] Characterizing and correcting for the effect of sensor noise in the
        dynamic mode decomposition. https://arxiv.org/pdf/1507.02264.pdf

    """

    def __init__(
        self,
        l2penalty=0.0,
        method_to_use="pinv",
        svd_rank=None,
        tlsq_rank=0,
        fb: bool = False,
        solver="auto",
        fit_intercept=False,
        normalize=False,
        order=1,
        weighted: bool = False,
    ):
        super(SystemIDModel, self).__init__()
        if method_to_use not in SUPPORTED_SYSID_METHODS:
            raise AttributeError(
                f"System ID with {method_to_use} "
                f"is not supported. Please use "
                f"one of {SUPPORTED_SYSID_METHODS}."
            )

        self.l2penalty = l2penalty
        self.method_to_use = method_to_use
        self.svd_rank = svd_rank
        self.tlsq_rank = tlsq_rank
        self.fb = fb
        self.order = order
        self.weighted = weighted

        self.solver = solver
        self.fit_intercept = fit_intercept
        self.normalize = normalize

    @property
    def state_array(self):
        """The model tensor CxC samples by chan by chan."""
        return self.state_array_

    @property
    def eigs(self):
        return np.linalg.eigvals(self.state_array)

    def _forward_multiply(self, X, t_steps):
        # 2D array of initial conditions (C x samples)
        n_samples, n_chs = X.shape

        X_pred = []
        # generate predictions for many initial conditions
        for isamp in range(n_samples):
            x0 = X[isamp, :]
            X_hat = [x0]

            # reconstruct over possibly multiple time steps
            for it in range(t_steps - 1):
                X_hat.append(self.state_array.dot(X_hat[-1]))

            # store the predictions
            X_hat = np.array(X_hat).T
            X_pred.append(X_hat)
        # print('inside forward multiply...', np.array(X_pred).shape, n_samples, t_steps)
        return np.asarray(X_pred)

    def predict(self, X, t_steps: int = 1):
        """Predict state dynamics from initial conditions as a 2D array.

        Parameters
        ----------
        X : np.ndarray (n_samples, n_features)
            Samples of initial conditions. ``n_features`` should match the
            number of channels used in the data to fit model (in ``fit()``).
        t_steps : int
            The number of time steps to predict. Default = 1.

        Returns
        -------
        y_pred : array-like of shape (n_samples,) or (n_samples, n_outputs)
            True values for X.
        """
        check_is_fitted(self)
        X = check_array(
            X, dtype=[np.float64, np.float32], ensure_2d=True, allow_nd=False
        )

        # forward multiply linear operator
        y_pred = self._forward_multiply(X=X, t_steps=t_steps)
        return y_pred

    def score(self, X, y, sample_weight=None, t_steps=1):
        """Score fitted model.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Test samples. For some estimators this may be a
            precomputed kernel matrix or a list of generic objects
             instead with shape (n_samples, n_samples_fitted),
             where n_samples_fitted is the number of samples
             used in the fitting for the estimator.
        y : array-like of shape (n_samples,) or (n_samples, n_outputs)
            True values for X.
        sample_weight : array-like of shape (n_samples,), default=None
            Sample weights.
        t_steps : int
            The number of time steps to predict. Default = 1.

        Returns
        -------
        score : float
            R^2 of self.predict(X) w.r.t. y.
        """
        from sklearn.metrics import r2_score

        check_is_fitted(self)

        X, y = self._validate_data(
            X, y, y_numeric=True, dtype=[np.float64, np.float32], multi_output=True
        )
        if y.ndim == 1:
            raise ValueError(
                "y must have at least two dimensions for "
                "multi-output regression but has only one."
            )

        n_samples, _ = X.shape

        # run predictions for possibly multiple initial conditions
        y_pred = self.predict(X, t_steps=t_steps)


        # get the state prediction and score it
        scores = []
        for idx in range(n_samples):
            X_pred = y_pred[idx, :, :]
            X_true = y[idx, :]

            score = r2_score(X_true, X_pred, sample_weight=sample_weight)
            scores.append(score)
        print("Finally")
        return np.mean(scores)

    def fit(self, X, y=None):
        """
        Generate adjacency matrix for each window for a certain winsize and stepsize.

        Sets the ``state_array`` property, which is the linear operator that
        operates over the data matrix ``X``.

        Additional hyperparameters are defined in the ``__init__`` function.

        Parameters
        ----------
        X : np.ndarray (n_samples, n_features)
            Samples of multivariate data. ``n_features`` should match the
            number of channels used in the data . ``n_samples`` should match
            the number of time points in the data window to regress.

        Notes
        -----
        This function will fit a linear operator, ``A`` on the data as such::

            X[1:, :] = X[:-1, :].dot(A)

        representing the equation ``x(t+1) = Ax(t)``.
        """
        # check data
        if y is None:
            X = self._validate_data(X)
        else:
            check_kwargs = dict(
                dtype=[np.float64, np.float32],
                y_numeric=True,
                force_all_finite=True,
                multi_output=True,
            )
            X, y = self._validate_data(X, y, **check_kwargs)

        # check condition number of the array
        cond_num = np.linalg.cond(X)

        # 1. determine shape of the window of data
        n_times, n_chs = X.shape

        # determine svd_rank if it is None
        if self.svd_rank is None:
            svd_rank = min(n_chs, n_times)
        else:
            svd_rank = self.svd_rank

        # allow for multiple order matrix estimation
        # first make sure data matrix is 2D
        # also swap channels and times to make it backwards compatible
        # with our change to sklearn API
        self._snapshots, self._snapshots_shape = self._col_major_2darray(X.T)

        # create large snapshot with time-lags of order specified by
        # ``order`` value
        snaps = self._construct_snapshots(
            self._snapshots, order=self.order, n_times=n_times
        )
        X, Y = snaps[:, :-1], snaps[:, 1:]

        # compute total-least squares if necessary
        X, Y = self._compute_tlsq(X, Y, self.tlsq_rank)

        # compute inverse of variance weights
        if self.weighted:
            # compute along the samples dimension
            sample_var = np.var(X, axis=1)
            sample_weight = 1.0 / sample_var

            eps = 1e-8
            if np.any(sample_var < eps):
                sample_weight = None
        else:
            sample_weight = None

        # compute the forward linear operator from X -> Y
        if self.method_to_use == "pinv":
            A = self._pinv_solve(X, Y, self.l2penalty, solver="svd", backend="numpy")

        # if using forward-backwards algorithm, then
        # compute the backwards operator and combine the two
        if self.fb:
            # compute backward linear operator
            bA = self._pinv_solve(Y, X, l2_penalty=self.l2penalty)
            A = sqrtm(A.dot(np.linalg.inv(bA)))

        self.state_array_ = A
        return self



Fragility file

In [6]:
from typing import Tuple, Union
import numpy as np
import os
from pathlib import Path

def _get_tempfilename(x):
    """Hardcoded temporary file name for storing temporary results."""
    return "temp_{}.npz".format(x)


def _compute_fragility_func(
    shared_mvarmodel: SystemIDModel,
    shared_pertmodel: StructuredPerturbationModel,
    raw_data: np.ndarray,
    samplepoints: np.ndarray,
    tempdir: Union[str, Path],
    win: int,
    # type_indices: Dict[str, List],
    **kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:  # pragma: no cover
    """Parallel computation of network.

    Computes for a single window, the A matrix and
    corresponding perturbation vector with minimum 2-norm.

    Note: The `raw_data` and `win` parameters can be replaced with `raw`
        of the MNE data structure in the future if mem-mapping is desired.

    Parameters
    ----------
    shared_mvarmodel : eztrack.network.MvarModel
        The model used to compute the A state matrix for a window of time.
    shared_pertmodel : eztrack.network.MinNormPerturbModel
        The model used to compute the vector of perturbation norms for each
        column/row perturbed in the A matrix.
    raw_data : np.ndarray
        The raw EEG data C x T (channels by time)
    samplepoints : np.ndarray
    tempdir : str | pathlib.Path
    win : int
        The specific window that `raw_data` was obtained from within
        the larger EEG snapshot.

    Returns
    -------
    pert_mat : np.ndarray
    A_mat : np.ndarray
    delta_vecs : np.ndarray
    """
    # Avoid circular import

    # 1: fill matrix of all channels' next EEG data over window
    win_begin = samplepoints[win, 0]
    win_end = samplepoints[win, 1] + 1
    eegwin = raw_data[:, win_begin:win_end].T  # samples x channels

    # 2: compute state transition matrix using mvar model
    shared_mvarmodel.fit(eegwin)
    A_mat = shared_mvarmodel.state_array

    # in higher-order models, one needs to apply perturbations
    # to a multi-companion matrix
    if shared_mvarmodel.order > 1 and shared_pertmodel.perturb_type == "C":
        # flip matrix
        to_perturb_A = inner_transpose_multicompanion(
            A_mat.copy().T, order=shared_mvarmodel.order
        )
    else:
        to_perturb_A = A_mat.copy()

    # 3: compute perturbation model
    pert_mat = shared_pertmodel.fit(to_perturb_A, **kwargs)
    delta_vecs = shared_pertmodel.minimum_delta_vectors

    if tempdir is not None:
        # save adjacency matrix
        tempfilename = os.path.join(tempdir, _get_tempfilename(win))
        try:
            np.savez(tempfilename, A=A_mat, pertmat=pert_mat, delta_vecs=delta_vecs)
        except BaseException as e:
            return (None, None, None)
    return pert_mat, A_mat, delta_vecs


def _compute_perturbation_func(
    A_mat: np.ndarray, radius: float, perturb_type: str, order: int, **kwargs
) -> Tuple[np.ndarray, np.ndarray]:  # pragma: no cover
    """Parallel computation of network.

    Computes for a single window, the A matrix and
    corresponding perturbation vector with minimum 2-norm.

    Note: The `raw_data` and `win` parameters can be replaced with `raw`
        of the MNE data structure in the future if mem-mapping is desired.

    Parameters
    ----------
    radius : float
        The radius to make perturbed eigenvalue go to.
    perturb_type : str
        The type of structured perturbation to take. Either ``'R'``,
        or ``'C'``.
    A_mat : np.ndarray
        The state matrix (C x C) for x(t+1) = Ax(t).

    Returns
    -------
    pert_mat : np.ndarray
    delta_vecs : np.ndarray
    """
    # Avoid circular import

    pertmodel_kwargs = {
        "radius": radius,
        "perturb_type": perturb_type,
    }
    pertmodel_kwargs.update(kwargs)
    pert_model = StructuredPerturbationModel(**pertmodel_kwargs)

    # which column/row to start perturbations at
    n_perturbations = A_mat.shape[0] / order
    if order > 1:
        start_idx = int((order - 1) * n_perturbations)
    else:
        start_idx = 0

    # in higher-order models, one needs to apply perturbations
    # to a multi-companion matrix
    if order > 1 and perturb_type == "C":
        # flip matrix
        to_perturb_A = inner_transpose_multicompanion(A_mat.copy().T, order=order)
    else:
        to_perturb_A = A_mat.copy()

    # 3: compute perturbation model
    pert_mat = pert_model.fit(
        to_perturb_A, start=start_idx, n_perturbations=n_perturbations
    )
    delta_vecs = pert_model.minimum_delta_vectors

    return pert_mat, delta_vecs


def state_perturbation_array(
    arr: np.ndarray,
    order: int = 1,
    radius: float = 1.5,
    perturb_type: str = "C",
):
    """Compute network on state-lds dataset.

    This function operates on a numpy array that is assumed
    to be structured channels X channels X time.

    If you have a ``StateLDSDerivative`` object, then
    use ``state_perturbation_derivative`` function
    instead.

    Parameters
    ----------
    arr : np.ndarray
        The state matrices in the form (C x C x T)
    radius : float
        The radius to make perturbed eigenvalue go to.
    perturb_type : str
        The type of structured perturbation to take. Either ``'R'``,
        or ``'C'``.
    %(verbose)s

    Returns
    -------
    pert_mats : np.ndarray
    delta_vecs_arr : np.ndarray
    """
    if arr.shape[0] != arr.shape[1]:
        raise RuntimeError(
            f"State matrix data must be in a " f"channel X channel X time shape."
        )
    n_chs, n_chs_, n_wins = arr.shape

    model_params = {
        "order": order,
        "radius": radius,
        "perturb_type": perturb_type,
    }

    if order > 1:
        n_perturbations = int(arr.shape[0] / order)
    else:
        n_perturbations = n_chs

    # initialize numpy arrays to return results
    pert_mats = np.zeros((n_perturbations, n_wins))
    delta_vecs_arr = np.zeros((n_perturbations, n_chs, n_wins), dtype=complex)

    for idx in range(n_wins):
        pert_mat, delta_vecs = _compute_perturbation_func(arr[..., idx], **model_params)
        pert_mats[:, idx] = pert_mat
        delta_vecs_arr[..., idx] = delta_vecs

    return pert_mats, delta_vecs_arr


In [7]:
from scipy.signal import detrend
import mne
def outlier_repeat(data: np.ndarray, sd: float, rounds: int = np.inf,
                   axis: int = 0) -> tuple[tuple[int, int]]:
    """ Remove outliers from data and repeat until no outliers are left.

    This function removes outliers from data and repeats until no outliers are
    left. Outliers are defined as any data point that is more than sd standard
    deviations from the mean. The function returns a tuple of tuples containing
    the index of the outlier and the round in which it was removed.

    Parameters
    ----------
    data : np.ndarray
        Data to remove outliers from.
    sd : float
        Number of standard deviations from the mean to consider an outlier.
    rounds : int
        Number of times to repeat outlier removal. If None, the function will
        repeat until no outliers are left.
    axis : int
        Axis of data to remove outliers from.

    Returns
    -------
    tuple[tuple[int, int]]
        Tuple of tuples containing the index of the outlier and the round in
        which it was removed."""
    inds = list(range(data.shape[axis]))

    # Square the data and set zeros to small positive number
    R2 = np.square(data)
    R2[np.where(R2 == 0)] = 1e-9

    # find all axes that are not channels (example: time, trials)
    axes = tuple(i for i in range(data.ndim) if not i == axis)

    # Initialize stats loop
    sig = np.std(R2, axes)  # take standard deviation of each channel
    cutoff = (sd * np.std(sig)) + np.mean(sig)  # outlier cutoff
    i = 1

    # remove bad channels and re-calculate variance until no outliers are left
    while np.any(np.where(sig > cutoff)) and i <= rounds:

        # Pop out names to bads output using comprehension list
        for j, out in enumerate(np.where(sig > cutoff)[0]):
            yield inds.pop(out - j), i

        # re-calculate per channel variance
        R2 = R2[..., np.where(sig < cutoff)[0], :]
        sig = np.std(R2, axes)
        cutoff = (sd * np.std(sig)) + np.mean(sig)
        i += 1

def channel_outlier_marker(input_raw, outlier_sd=3,
                           max_rounds=np.inf, axis=0,
                            verbose=True
                           ) -> list[str]:
    """Identify bad channels by variance.

    Parameters
    ----------
    input_raw : Signal
        Raw data to be analyzed.
    outlier_sd : int, optional
        Number of standard deviations above the mean to be considered an
        outlier, by default 3
    max_rounds : int, optional
        Maximum number of variance estimations, by default runs until no
        more bad channels are found.
    axis : int, optional
        Axis to calculate variance over, by default 0

    Returns
    -------
    list[str]
        List of bad channel names.
    """ 

    names = input_raw.copy().pick('data').ch_names
    data = detrend(input_raw.get_data('data'))  # channels X time
    bads = []  # output for bad channel names
    desc = []  # output for bad channel descriptions

    # Pop out names to bads output using comprehension list
    for ind, i in outlier_repeat(data, outlier_sd, max_rounds, axis):
        bads.append(names[ind])
        desc.append(f'outlier round {i} more than {outlier_sd} SDs above mean')
        # log channels excluded per round
        if verbose:
            mne.utils.logger.info(f'outlier round {i} channels: {bads}')


    return bads

## Preprocessing and implementing fragility 

In [43]:
def preprocess(raw):
    # Drop bad channels
    raw.drop_channels(raw.info['bads'])
    raw.load_data()
    # Notch filter at 60 Hz to remove line noise
    notch_filtered = raw.notch_filter(60, notch_widths=2)
    # Extract bandapss 0.5 to 300 Hz using fourth order Butterworth filter
    final_raw = notch_filtered.filter(0.5, 300, 
                                    method='iir')

    outliers = channel_outlier_marker(final_raw, 3, 2)
    final_raw.drop_channels(outliers)
    # Set common average reference 
    final_raw.set_eeg_reference()

    return final_raw

In [9]:
from mne.io import read_raw_edf
from mne.filter import notch_filter, filter_data
from pathlib import Path



def run_fragility(raw, model_params, events=None, tmin=-10, tmax=10):
    '''
    Runs fragility code on given raw file. 

    -------------------------------------
    Input parameters:
    raw: MNE raw file to run fragility code on
    model_params: dictionary, we are defaulting to prior lab's normal values
    events: List of strings corresponding to annotations to run; if no event is specified
    it will run fragility code on entire file 
    tmin: start time to run code with respect to annotation, defaults to 0 (only needed if 
    specific events are specficied)
    tmax: end time in seconds to run code, defaults to 10 (only needed if specific events
    are specified)

    returns:
    pert_mats: pertubation matrices corresponding to fragility scores with dimensions
    channels, time points. Saved as a dictionary with keys corresponding to input event strings.
    delta_vecs_arr: array of delta vectors used to make pertubation array
    '''
    
    # raw.info['bads'] = channel_outlier_marker(raw, 3, 2)
    sfreq = raw.info["sfreq"]
    
    # Remove non-SEEG channels
    bad_words = ['C', 'TRIG', 'OSAT', 'PR', 'Pleth', 'EKG']
    for ch in raw.ch_names:
        if any(word in ch for word in bad_words):
            raw.info['bads'].append(ch)
        # Remove EEG channels based off length of string (EEG channels are no longer than 3 chars)
        if len(ch) < 4:
            raw.info['bads'].append(ch)


    winsize_sec = model_params.get("winsize_sec")
    stepsize_sec = model_params.get("stepsize_sec")
    l2penalty = model_params.get("l2penalty")

    # Calculate how many time points are in each window
    winsize_samps = int(round(winsize_sec * sfreq))
    stepsize_samps = int(round(stepsize_sec * sfreq))

    # Do preprocessing on raw data
    final_raw = preprocess(raw)
  
    # Initialize dictionaries containing fragility metrics 
    pert_mats = {}
    delta_vecs_arrs = {}

    # Use whole file if events is None
    if events == None:
        epoch_data = final_raw.get_data()
        A_mats = state_lds_array(epoch_data, winsize=winsize_samps, stepsize=stepsize_samps, l2penalty=l2penalty)
        pert_mats['whole'], delta_vecs_arrs['whole'] = state_perturbation_array(A_mats)

    # If events is specfified, make epochs from given annotations
    else:
        _, event_dict = mne.events_from_annotations(final_raw)
        events_of_interest = {}
        for ee in events:
            events_of_interest[np.str_(ee)] = event_dict[np.str_(ee)]

        epochs = mne.Epochs(final_raw, tmin=tmin, tmax=tmax, event_id=events_of_interest, baseline=(0, 0))


        for ee in events:
            print(f'Running on epoch labeled {ee}')
            epoch_data = epochs[ee].get_data().squeeze()
            A_mats = state_lds_array(epoch_data, winsize=winsize_samps, stepsize=stepsize_samps, l2penalty=l2penalty)
            pert_mats[ee], delta_vecs_arrs[ee] = state_perturbation_array(A_mats)
    
    return pert_mats, delta_vecs_arrs

## Run fragility on input file
Repalce edf_fpath with desired file path

In [41]:
model_params = {
    "winsize_sec": 0.5,
    "stepsize_sec": 0.5,
    "l2penalty": None
}

edf_fpath = Path.home() / 'Downloads' / 'fragility_code' / 'DATA' / 'MC01_1.edf'
raw = read_raw_edf(edf_fpath)
print(f'Events in raw: {raw.annotations.description}')


Extracting EDF parameters from /Users/dsexton/Downloads/fragility_code/DATA/MC01_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Events in raw: ['NFIED1' 'Spike' 'X']


  raw = read_raw_edf(edf_fpath)


## Data visualization

Takes values from pert_mats output from run_fragility and plots heatmap for each channel corresponding to fragility scores using plot_heatmap function. 

The second technique for analysis involves specifying a fragility score threshold (between 0 and 1) and counting how many times each channel crosses this threshold. 
Uses plot_threshold function. 

In [52]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

%matplotlib qt

def plot_fragility(raw, pert_mats, output_file, threshold='auto'):
    ch_num = len(raw.ch_names)
    ch_inds = [0, ch_num // 4, ch_num // 2, 3 * ch_num // 4, ch_num]
    
    # Loop through each event in pert_mat and plot heatmap and thresholds 
    for kk in pert_mats.keys():
        fig, axes = plt.subplots(2, 2)
        fig.set_size_inches([16, 12])
        cax = fig.add_axes([0.92, 0.1, 0.02, 0.8])
        fig.suptitle(kk, size=26)
        for ii, ax in enumerate(axes.flatten()):
            ch_slice = slice(ch_inds[ii], ch_inds[ii+1])
            sns.heatmap(pert_mats[kk][ch_slice, :], vmax=1, cmap='viridis', ax=ax, yticklabels=raw.ch_names[ch_slice], cbar=ii == 0, cbar_ax=None if ii != 0 else cax)
            ax.tick_params(axis='y', labelsize=6)
        fig.savefig(f'{output_file}_{kk}_heatmap.png')

        

        # Plot thresholds 
        if threshold == 'auto':
            threshold = pert_mats[kk].mean() + pert_mats[kk].std()
        threshold_mat = np.where(pert_mats[kk] >= threshold, 1, 0)
        
        # Count number of times fragility scores crosses threshold for each channel
        ch_sum = threshold_mat.sum(axis=1)

        # Create dataframe with threshold values, sort, and remove channels with no threshold crossing        
        threshold_df = pd.DataFrame({'Count': ch_sum, 'Name': raw.ch_names}).sort_values(by=['Count'])
        final_df = threshold_df[threshold_df['Count'] != 0]

        fig, axes = plt.subplots()
        fig.set_size_inches([16, 12])
        fig.suptitle(kk, size=26)

        sns.barplot(data=final_df, x='Name', y='Count', ax=axes)
        axes.tick_params(axis='x', labelsize=10, rotation=90)
        fig.savefig(f'{output_file}_{kk}_thresholds.png')
        



In [21]:
plot_fragility(raw, pert_mats, output_file='MC01')

### Look at accuracy of model (A matrix) fitting of data

In [65]:
model_params = {
    "winsize_sec": 0.5,
    "stepsize_sec": 0.5,
    "l2penalty": None
}

edf_fpath = Path.home() / 'Downloads' / 'fragility_code' / 'DATA' / 'MC01_1.edf'
raw = read_raw_edf(edf_fpath)

Extracting EDF parameters from /Users/dsexton/Downloads/fragility_code/DATA/MC01_1.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...


  raw = read_raw_edf(edf_fpath)


In [66]:
sfreq = raw.info["sfreq"]

# Remove non-SEEG channels
bad_words = ['C', 'TRIG', 'OSAT', 'PR', 'Pleth', 'EKG']
for ch in raw.ch_names:
    if any(word in ch for word in bad_words):
        raw.info['bads'].append(ch)
    # Remove EEG channels based off length of string (EEG channels are no longer than 3 chars)
    if len(ch) < 4:
        raw.info['bads'].append(ch)


winsize_sec = model_params.get("winsize_sec")
stepsize_sec = model_params.get("stepsize_sec")
l2penalty = model_params.get("l2penalty")

# Calculate how many time points are in each window
winsize_samps = int(round(winsize_sec * sfreq))
stepsize_samps = int(round(stepsize_sec * sfreq))

# Do preprocessing on raw data
final_raw = preprocess(raw)

Reading 0 ... 322111  =      0.000 ...   157.281 secs...
Filtering raw data in 1 contiguous segment
Setting up band-stop filter from 58 - 62 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandstop filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 58.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 58.25 Hz)
- Upper passband edge: 61.50 Hz
- Upper transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 61.75 Hz)
- Filter length: 13517 samples (6.600 s)



[Parallel(n_jobs=1)]: Done  17 tasks      | elapsed:    0.1s
[Parallel(n_jobs=1)]: Done  71 tasks      | elapsed:    0.5s


Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 3e+02 Hz

IIR filter parameters
---------------------
Butterworth bandpass zero-phase (two-pass forward and reverse) non-causal filter:
- Filter order 16 (effective, after forward-backward)
- Cutoffs at 0.50, 300.00 Hz: -6.02, -6.02 dB

outlier round 1 channels: ['ROAM8']
outlier round 2 channels: ['ROAM8', 'RLS5']
outlier round 2 channels: ['ROAM8', 'RLS5', 'RLS6']
outlier round 2 channels: ['ROAM8', 'RLS5', 'RLS6', 'ROAM7']
EEG channel type selected for re-referencing
Applying average reference.
Applying a custom ('EEG',) reference.


In [67]:
final_raw_cropped = final_raw.crop(tmax=10)

In [68]:
winsize_sec = 0.5
stepsize_sec = 0.5
final_raw_data = final_raw_cropped.get_data()
winsize_samps = int(round(winsize_sec * final_raw_cropped.info['sfreq']))
stepsize_samps = int(round(stepsize_sec * final_raw_cropped.info['sfreq']))
A_mats = state_lds_array(final_raw_data, winsize=winsize_samps, stepsize=stepsize_samps, l2penalty=None)



In [50]:
print(A_mats.shape)

# Initialize x_hat list to collect x_hat values across windows
x_hat_cross_windows = []
# Loop across windows 
for win in range(20):
    x_hat_lst = []
    # Initialize x_hat(0) to initial EEG value
    x_hat_lst.append(final_raw_data[:, win*winsize_samps])
    
    # Loop through each sample and calcualte x_hat using A matrix
    for ii in range(1, winsize_samps):
        x_hat_lst.append(A_mats[:, :, win] @ x_hat_lst[ii-1])
    x_hat = np.stack(x_hat_lst)
    x_hat_cross_windows.append(x_hat)

# Create array of all x_hat values
x_hat_all = np.stack(x_hat_cross_windows)
print(x_hat_all.shape)

(147, 147, 20)
(20, 1024, 147)


In [59]:
import pandas as pd
corr_dict = {}
corr_dict['Ch'] =final_raw_cropped.ch_names
for win in range(20):
    x_true = final_raw_data[:, winsize_samps*win:winsize_samps*(win+1)]
    corr_list = []
    for ch in range(len(final_raw_cropped.ch_names)):
        corr_list.append(np.corrcoef(x_true[ch, :], x_hat_all[win, :, ch])[0, 1])
    corr_dict[f'Window {win}'] = corr_list

corr_df = pd.DataFrame(corr_dict)

In [62]:
final_df = pd.melt(corr_df, id_vars=['Ch'])

In [69]:
import seaborn as sns
import matplotlib.pyplot as plt

fig, axes = plt.subplots()
fig.set_size_inches([16, 12])
fig.suptitle(r'Correlation between $x$ and $\hat{x}$', size=26)


sns.boxplot(data=final_df, x='Ch', y='value', ax=axes)
axes.tick_params(axis='x', labelsize=10, rotation=90)