# Stability Criterion

In [1]:
# Import libraries
import numpy as np
import math
import branchpro
import scipy.stats
from branchpro.apps import ReproductionNumberPlot
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import pandas as pd
import stan
import arviz as az
import nest_asyncio
import seaborn as sns
nest_asyncio.apply()

num_timepoints = 1000 # number of days for incidence data
num_categories = 3

## Parametrisation of the renewal model

In [2]:
# Build the serial interval w_s
serial_intervals_range = []

ws_mean_range = [
    # [150.3, 7, 25],
    [150.3, 7, 25],
    [150.3, 7, 25],
    [150.3, 7, 25],
    # [150.3, 7, 25],
    [150.3, 7, 25],
    [150.3, 7, 25],
    [150.3, 7, 25],
    [150.3, 7, 25],
    # [150.3, 7, 25]
]
ws_std_range = [
    # [9.3, 5, 10],
    [9.3, 5, 10],
    [9.3, 5, 10],
    [9.3, 5, 10],
    # [9.3, 5, 10],
    [9.3, 5, 10],
    [9.3, 5, 10],
    [9.3, 5, 10],
    [9.3, 5, 10],
    # [9.3, 5, 10]
]

for ws_mean_cat, ws_std_cat in zip(ws_mean_range, ws_std_range):
    serial_intervals = []
    for ws_mean, ws_std in zip(ws_mean_cat, ws_std_cat):
        theta = ws_std**2 / ws_mean
        k = ws_mean / theta
        w_dist = scipy.stats.gamma(k, scale=theta)
        disc_w = w_dist.pdf(np.arange(30))

        serial_intervals.append(disc_w)
    serial_intervals_range.append(np.array(serial_intervals))

# Simulate incidence data
initial_r = 1.2

# Select a range of different contact matrices
contact_matrix_range = [
    # np.array([[6., 3., 2.], [3., 3., 2.], [2., 1., 6.]]),
    0.2*np.array([[6., 3., 2.], [3., 3., 2.], [2., 1., 6.]]),
    np.array([[6., 3., 2.], [3., 3., 2.], [2., 1., 6.]]),
    1.1*np.array([[0.5, 1., 1], [1., 1.2, 1], [0.2, 1.2, 0.5]]),
    # 3*np.array([[2.3, 0.1, 3], [0.2, 1.2, 0.5], [1., 1.2, 1]]),
    np.array([[12, 0.6, 1], [6, 4.8, 5], [1, 5, 10]]),
    np.array([[0, 6, 0], [0, 0, 0.1], [2, 0, 0]]),
    np.array([[0, 6, 0], [0, 0, 0.1], [2, 0, 0]]),
    np.array([[0, 6, 0], [0, 0, 0.1], [2, 0, 0]]),
    # np.array([[0, 6, 0], [0, 0, 0.1], [2, 0, 0]])
]

# Choose unique transmissibilityt vector for ease so
# effective contact matrix = contact matrix
transmissibility = [1, 1, 1]

# Select a range of different growth rates
new_rs_range = [
    # [0.25, 0.25],
    [0.25, 0.25],
    [0.25, 0.05],
    [0.25, 0.05],
    # [0.25, 0.15],
    [0.044, 0.044],
    [0.01, 0.01],
    [1, 1],
    [1/np.cbrt(1.2), 1/np.cbrt(1.2)],
    # [2, 2]
]

# Choose no differences in the r_t profile after day 20
start_times = [0, 20]

# Initial number of cases
parameters = [50, 50, 50] 

## Compute Stability Criterion

In [3]:
stability_criterion = []

for (contact_matrix, new_rs) in zip(contact_matrix_range, new_rs_range):   
    # Compute the eigenvalues of the maximum effective contact matrix
    eff_contact_matrix = np.matmul(contact_matrix, np.diag(transmissibility))

    spec_radius = np.max(np.absolute(np.linalg.eigvals(eff_contact_matrix)))

    # Compute stability criterion using last change in growth rate
    stability_criterion.append(new_rs[-1] * spec_radius)

print(stability_criterion)

[0.4782937212891897, 0.4782937212891894, 0.1438685641790379, 0.6481068909785518, 0.010626585691826111, 1.0626585691826111, 1.0]


## Deterministic Model

In [4]:
# Define Deterministic Model

class MultiCatDeterministicBranchProModel(branchpro.BranchProModel):
    r"""MultiCatDeterministicBranchProModel Class:
    Class for the models following a Branching Processes behaviour with
    no distribution noise and multiple population categories.
    It inherits from the ``BranchProModel`` class.

    In the branching process model, we track the number of cases
    registered for each category and each day, I_{t,i}, also known as the
    "incidence" at time t.

    The incidence at time t depends on previous number of cases, according to
    the following formula:

    .. math::
        I_{t, i}^{\text(local)}|I_0, I_1, \dots I_{t-1}, w_{s}, R_{t}) =
            \sum_{j}R_{t}C{i,j}T_{j}\sum_{s=1}^{t}I_{t-s,j}w_{s}

    Always apply method :meth:`set_r_profile` before calling
    :meth:`NegBinBranchProModel.simulate` for a change of R_t profile!

    Parameters
    ----------
    initial_r
        (list) List of reproduction numbers per category at the beginning
        of the epidemic.
    serial_interval
        (list) Unnormalised probability distribution of that the recipient
        first displays symptoms s days after the infector first displays
        symptoms for each category.
    num_cat
        (int) Number of categories in which the population is split.
    contact_matrix
        (array) Matrix of contacts between the different categories in which
        the population is split.
    transm
        (list) List of overall reductions in transmissibility per category.
    multipleSI
        (boolean) Different serial intervals used for categories.

    """

    def __init__(self, initial_r, serial_interval, num_cat, contact_matrix,
                 transm, multipleSI=False):
        if not isinstance(initial_r, (int, float)):
            raise TypeError('Value of R must be integer or float.')

        if not isinstance(num_cat, int):
            raise TypeError('Number of population categories must be integer.')
        if num_cat <= 0:
            raise ValueError('Number of population categories must be > 0.')

        if np.asarray(contact_matrix).ndim != 2:
            raise ValueError(
                'Contact matrix values storage format must be 2-dimensional')
        if np.asarray(contact_matrix).shape[0] != num_cat:
            raise ValueError(
                'Wrong number of rows in contact matrix values storage')
        if np.asarray(contact_matrix).shape[1] != num_cat:
            raise ValueError(
                'Wrong number of columns in contact matrix values storage')
        for c in np.asarray(contact_matrix):
            for _ in c:
                if _ < 0:
                    raise ValueError('Contact matrix values must be >= 0.')
                if not isinstance(_, (int, float)):
                    raise TypeError(
                        'Contact matrix values must be integer or float.')

        if np.asarray(transm).ndim != 1:
            raise ValueError(
                'Transmissiblity storage format must be 1-dimensional')
        if np.asarray(transm).shape[0] != num_cat:
            raise ValueError(
                'Wrong number of categories in transmissibility storage')
        for _ in transm:
            if _ < 0:
                raise ValueError('Transmissiblity values must be >= 0.')
            if not isinstance(_, (int, float)):
                raise TypeError(
                    'Transmissiblity values must be integer or float.')

        # Invert order of serial intervals for ease in _normalised_daily_mean
        self._num_cat = num_cat
        self._contact_matrix = np.asarray(contact_matrix)
        self._transm = np.asarray(transm)

        if multipleSI is False:
            if np.asarray(serial_interval).ndim != 1:
                raise ValueError(
                    'Serial interval values storage format must be\
                    1-dimensional')
            if np.sum(serial_interval) < 0:
                raise ValueError('Sum of serial interval values must be >= 0.')
            self._serial_interval = np.tile(
                np.asarray(serial_interval)[::-1], (num_cat, 1))
        else:
            if np.asarray(serial_interval).ndim != 2:
                raise ValueError(
                    'Serial interval values storage format must be\
                    2-dimensional')
            if np.asarray(serial_interval).shape[0] != num_cat:
                raise ValueError(
                    'Serial interval values storage format must match\
                    number of categories')
            for _ in range(num_cat):
                if np.sum(serial_interval[_, :]) < 0:
                    raise ValueError(
                        'Sum of serial interval values must be >= 0.')
            self._serial_interval = np.asarray(serial_interval)[:, ::-1]

        self._r_profile = np.array([initial_r])
        self._normalizing_const = np.sum(self._serial_interval, axis=1)

    def set_transmissibility(self, contact_matrix):
        """
        Updates contact matrix for the model.

        Parameters
        ----------
        contact_matrix
            New matrix of contacts between the different categories in which
            the population is split.

        """
        if np.asarray(contact_matrix).ndim != 2:
            raise ValueError(
                'Contact matrix values storage format must be 2-dimensional')
        if np.asarray(contact_matrix).shape[0] != self._num_cat:
            raise ValueError(
                'Wrong number of rows in contact matrix values storage')
        if np.asarray(contact_matrix).shape[1] != self._num_cat:
            raise ValueError(
                'Wrong number of columns in contact matrix values storage')
        for c in np.asarray(contact_matrix):
            for _ in c:
                if _ < 0:
                    raise ValueError('Contact matrix values must be >= 0.')
                if not isinstance(_, (int, float)):
                    raise TypeError(
                        'Contact matrix values must be integer or float.')

        self._contact_matrix = np.asarray(contact_matrix)

    def get_transmissibility(self):
        """
        Returns transmissibility vector for the model.

        """
        return self._transm

    def get_contact_matrix(self):
        """
        Returns contact matrix for the model.

        """
        return self._contact_matrix

    def get_serial_intervals(self):
        """
        Returns serial intervals for the model.

        """
        # Reverse inverting of order of serial intervals
        return self._serial_interval[:, ::-1]

    def set_serial_intervals(self, serial_intervals, multipleSI=False):
        """
        Updates serial intervals for the model.

        Parameters
        ----------
        serial_intervals
            New unnormalised probability distribution of that the recipient
            first displays symptoms s days after the infector first displays
            symptoms for each category.
        multipleSI
            (boolean) Different serial intervals used for categories.

        """
        # Invert order of serial intervals for ease in _effective_no_infectives
        if multipleSI is False:
            if np.asarray(serial_intervals).ndim != 1:
                raise ValueError(
                    'Serial interval values storage format must be\
                    1-dimensional')
            if np.sum(serial_intervals) < 0:
                raise ValueError('Sum of serial interval values must be >= 0.')
            self._serial_interval = np.tile(
                np.asarray(serial_intervals)[::-1], (self._num_cat, 1))
        else:
            if np.asarray(serial_intervals).ndim != 2:
                raise ValueError(
                    'Serial interval values storage format must be\
                    2-dimensional')
            if np.asarray(serial_intervals).shape[0] != self._num_cat:
                raise ValueError(
                    'Serial interval values storage format must match\
                    number of categories')
            for _ in range(self._num_cat):
                if np.sum(serial_intervals[_, :]) < 0:
                    raise ValueError(
                        'Sum of serial interval values must be >= 0.')
            self._serial_interval = np.asarray(serial_intervals)[:, ::-1]

        self._normalizing_const = np.sum(self._serial_interval, axis=1)

    def _effective_no_infectives(self, t, incidences):
        """
        Computes expected number of new cases at time t, using previous
        incidences and serial intervals at a rate of 1:1 reproduction.

        Parameters
        ----------
        t
            evaluation time
        incidences
            sequence of incidence numbers
        """
        if t > self._serial_interval.shape[1]:
            start_date = t - self._serial_interval.shape[1]
            mean = np.divide(np.diag(
                np.matmul(self._serial_interval, incidences[start_date:t, :])),
                self._normalizing_const)
            return mean

        mean = np.divide(np.diag(
            np.matmul(self._serial_interval[:, -t:], incidences[:t, :])),
            self._normalizing_const)
        return mean

    def simulate(
            self, parameters, times, var_contacts=False, neg_binom=False,
            niu=0.1):
        """
        Runs a forward simulation with the given ``parameters`` and returns a
        time-series with incidence numbers per population category
        corresponding to the given ``times``.

        Parameters
        ----------
        parameters
            Initial number of cases per population category.
        times
            The times at which to evaluate. Must be an ordered sequence,
            without duplicates, and without negative values.
            All simulations are started at time 0, regardless of whether this
            value appears in ``times``.
        var_contacts
            (boolean) Wheteher there exists noise in number of contacts.
        neg_binom
            (boolean) Wheteher the noise in number of contacts is Negative
            Binomial distributed.
        niu
            (float) Accepance probability.

        """
        initial_cond = parameters
        last_time_point = np.max(times)

        # Repeat final r if necessary
        # (r_1, r_2, ..., r_t)
        if len(self._r_profile) < last_time_point:
            missing_days = last_time_point - len(self._r_profile)
            last_r = self._r_profile[-1]
            repeated_r = np.full(shape=missing_days, fill_value=last_r)
            self._r_profile = np.append(self._r_profile, repeated_r)

        incidences = np.empty(shape=(last_time_point + 1, self._num_cat))
        incidences[0, :] = initial_cond

        # Construct simulation times in steps of 1 unit time each
        simulation_times = np.arange(start=1, stop=last_time_point+1, step=1)

        # Compute normalised daily means for full timespan
        # and draw samples for the incidences
        self.exact_contact_matrix = [np.random.poisson(self._contact_matrix)]

        for t in simulation_times:
            if var_contacts is False:
                contact_matrix = self._contact_matrix
            else:
                if neg_binom is False:
                    contact_matrix = np.random.poisson(self._contact_matrix)
                else:
                    contact_matrix = np.random.negative_binomial(
                        self._contact_matrix, niu)
                self.exact_contact_matrix.append(contact_matrix)
            norm_daily_mean = self._r_profile[t-1] * np.matmul(
                contact_matrix,
                np.multiply(
                    self._transm,
                    self._effective_no_infectives(t, incidences)
                ))
            incidences[t, :] = norm_daily_mean

        mask = np.in1d(np.append(np.asarray(0), simulation_times), times)
        return incidences[mask]

In [5]:
deterministic_cases = []

for (contact_matrix, new_rs, serial_intervals) in zip(contact_matrix_range, new_rs_range, serial_intervals_range):    
    # Initialise the determinsitc model
    m = MultiCatDeterministicBranchProModel(
        initial_r, serial_intervals, num_categories, contact_matrix, transmissibility, multipleSI=True)

    # Simulate the incidence
    m.set_r_profile(new_rs, start_times)
    times = np.arange(num_timepoints)

    desagg_cases = m.simulate(parameters, times, var_contacts=False)
    deterministic_cases.append(desagg_cases)

### Plot disagreggated local incidence numbers

In [6]:
for _, desagg_cases in enumerate(deterministic_cases):
    # Identify fate of epidemic
    if stability_criterion[_] < 1:
        fate = '(Epidemic Decays)'
    else:
        fate = '(Epidemic Grows)'

    # Plot (bar chart cases each day)
    fig = go.Figure()

    for cat in range(num_categories):
        # Plot of incidences for category
        fig.add_trace(
            go.Scatter(
                x=times,
                y=desagg_cases[:, cat],
                name='Incidences category {}'.format(cat+1)
            )
        )

    # Add axis labels
    fig.update_layout(
        width=600, 
        height=400,
        plot_bgcolor='white',
        title='Stability criterion: {:.2f} '.format(stability_criterion[_])  + fate,
        xaxis=dict(linecolor='black'),
        xaxis_title='Time (days)',
        yaxis=dict(linecolor='black'),
        yaxis_title='New cases'
    )

    fig.show()

## Stochastic Model

In [7]:
stochastic_cases = []

for (contact_matrix, new_rs, serial_intervals) in zip(contact_matrix_range, new_rs_range, serial_intervals_range):    
    # Initialise the stochastic model with Poisson Noise
    m = branchpro.MultiCatPoissonBranchProModel(
        initial_r, serial_intervals, num_categories, contact_matrix, transmissibility, multipleSI=True)

    # Simulate the incidence
    m.set_r_profile(new_rs, start_times)
    times = np.arange(num_timepoints)

    desagg_cases = m.simulate(parameters, times, var_contacts=False)
    stochastic_cases.append(desagg_cases)

### Plot disagreggated local incidence numbers

In [8]:
for _, desagg_cases in enumerate(stochastic_cases):
    # Identify fate of epidemic
    if stability_criterion[_] < 1:
        fate = '(Epidemic Decays)'
    else:
        fate = '(Epidemic Grows)'

    # Plot (bar chart cases each day)
    fig = go.Figure()

    for cat in range(num_categories):
        # Plot of incidences for category
        fig.add_trace(
            go.Scatter(
                x=times,
                y=desagg_cases[:, cat],
                name='Incidences category {}'.format(cat+1)
            )
        )

    # Add axis labels
    fig.update_layout(
        width=600, 
        height=400,
        plot_bgcolor='white',
        title='Stability criterion: {:.2f} '.format(stability_criterion[_])  + fate,
        xaxis=dict(linecolor='black'),
        xaxis_title='Time (days)',
        yaxis=dict(linecolor='black'),
        yaxis_title='New cases'
    )

    fig.show()