# Sensitivity Analysis

This notebook generates images in Figures 10-15 in the Methods section.

## Imports

In [None]:
import numpy as np
import numpy.matlib

from matplotlib import pyplot as plt
import pandas as pd
import pymc3 as pm

## Agents

Timeseries data are generated by simulating interactions between multiple `Agent` instances representing the policy and the epidemics model. This section includes the core Agent code used throughout the sensitivity analysis.

### Agent (abstract class)

In [None]:
class Agent():
    """ Implements the gym.Env interface. 
    
    Each agent in a Closed Loop Data Science simulation acts on the union 
    of observations made available by other agents, and returns an object 
    representing what can be observed of this agent by others.
    
    Most documentation below is copied, for convenience, from 
    https://github.com/openai/gym/blob/master/gym/core.py
    
    The main API methods that users of this class need to know are:
        step
        reset
    """
    
    def reset(self):
        """Resets the state of the agent and returns an initial observable.
        Returns:
            observation (object): the initial observation.
        """
        raise NotImplementedError
        
    def step(self, action):
        """Run one timestep of the environment's dynamics.
        
        Args:
            action (object): observations made available by other agents.
            
        Returns:
            observation (object): information the agent makes observable to others
            reward (float) : amount of reward returned after previous action
            done (bool): whether the episode has ended, in which case further step() calls will return undefined results
            info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
        """
        raise NotImplementedError
        
    @staticmethod
    def _get_input(var, action):
        """ Facilitates the implementation of agents that support both
        (a) parameters of the step function remain constant from __init__, and 
        (b) parameters are read from the `action` input.
        
        If var is a string, then action[var]. 
        If var is a function, the function is applied to the action.
        By default, var is returned.
        """
        if isinstance(var, str):
            return action[var]
        elif callable(var):
            return var(action)
        else: 
            return var    

### Composite Agent

Simulations are represented as `Composite` agents.

In [None]:
class Composite(Agent):

    def __init__(self, order='concurrent'):
        """
        Composite agent that calls child agents' step, provides a global store 
        of observables, applies postprocessing to agent outputs and 
        preprocessing of agent inputs.
        
        Args:
            order (str): Execution order of child agents. 
                With 'concurrent' order all agents observe outputs of all other
                agents from the previous timestep.
                With 'sequential' order agent i observes outputs of agents 
                0,..i-1 from the current time step and outputs of agents 
                i,..N-1 from the previous time step.
        """
        self.order = order
        self.agents = []
        self.observable = None
    
    class Internal(Agent):
        """ Agent wrapper augmenting instances with input preprocessing and output 
           postprocessing.
        
        Args:
            agent (Agent): agent to be wrapped.
            out (str): agent output channel name.
            pre (callable): function applied to observable input before `step`
                is called.
            post (callable): function applied to agent output before sending it
                to `out`.
        """
        
        def __init__(self, agent, out, pre=None, post=None):
            assert isinstance(agent, Agent)
            assert isinstance(out, str)
            self.agent = agent
            self.out = out
            self.pre = pre
            self.post = post
            
        def reset(self):
            return self._wrap_output(self.agent.reset())
            
        def step(self, observable):
            o, r, d, i = self.agent.step(self._wrap_input(observable))
            return self._wrap_output(o), r, d, i
        
        def _wrap_input(self, observable):
            return observable if self.pre is None else self.pre(observable)
        
        def _wrap_output(self, output):
            return {self.out: output if self.post is None else self.post(output)}  
    
    def add(self, agent, out, pre=None, post=None):
        self.agents.append(self.Internal(agent=agent, out=out, pre=pre, post=post))
    
    def reset(self):
        """ Resets observable by resetting all agents. """
        self.observable = {}
        for agent in self.agents:
            o = agent.reset()
            for k, v in o.items():
                self.observable[k] = v
            
        return self.observable
        
    
    def step(self, action=None):
        """ Updates joint observable by stepping all agents once.
        
        Args:
            action (dict): (key, value) pairs added to observable before 
                agent.step is called.
        
        Returns:
            observation (object): dictionary of agent observations.
            reward (float) : dictionary of rewards (if any).
            done (bool): dictionary of done indicators.
            info (dict): dictionary of auxiliary diagnostic information returned 
                from all agents.
        """
        if action is not None:
            for k, v in action.items():
                self.observable[k] = v
                
        if self.order == 'concurrent':
            observable, reward, done, info = {}, 0, False, None
            for agent in self.agents:
                o, r, d, i = agent.step(self.observable)
                for k, v in o.items():
                    observable[k] = v
                    
        if self.order == 'sequential':
            # initialize current step's output with previous step's output.
            observable, reward, done, info = dict(self.observable), 0, False, None
            for agent in self.agents:
                # agents receive latest outputs from all agents
                o, r, d, i = agent.step(observable) 
                for k, v in o.items():
                    observable[k] = v
            
        self.observable = observable
        return observable, reward, done, info

### SIQR Agent

The BatchSIQR Agent implements the SIQR mass-action model. It simulates timeseries for a batch parameters at once.

In [None]:
# clock-driven SIQR model that solves batches of Monte Carlo samples at once
class BatchSIQR(Agent):
    def __init__(self,
                 batch_size=1,
                 beta=.373,
                 alpha=0.067,
                 eta=0.067,
                 delta=0.036,
                 I0=100,
                 N=6e7,
                 round_state=False, step_size=0.1):
        """Class for SIQR dynamics of the environment

        Parameters
        ----------
        beta (float, default=0.373): the infection rate
        alpha (float, default=0.067): the transition rate from I to R (recovery or death rate for non-quarantined population)
        eta (float, default=0.067): the transition rate from I to Q
        delta (float, default=0.036): the transition rate from Q to R (recovery or death rate for quarantined population)
        N (int, default=10000): the total siwe of the population
        epsilon (float, default=0.01): initial fraction of infected and recovered members of the
            population.
        round_state(bool): round state to nearest integer after each step.
        """
    
        self.batch_size = batch_size  
        self.beta = self._to_batch(beta)
        self.alpha = self._to_batch(alpha)
        self.eta = self._to_batch(eta)
        self.delta = self._to_batch(delta)
        self.I0 = self._to_batch(I0)
        self.N = self._to_batch(N)
        self.round_state = round_state
        self.step_size = step_size

    def reset(self):
        """returns initial state (s0,  i0, r0)"""
        self.state = np.array([
            self._to_batch(self.N - self.I0), # S
            self._to_batch(self.I0), # I
            self._to_batch(0), # Q
            self._to_batch(0)]).T # R
        if self.round_state:
            self.state = self._pround(self.state)

        return self.state

    def step(self, action=None):
        """performs integration step"""
        beta = self._get_input(self.beta, action)
        beta_batch = self._to_batch(beta)
        
        self.state = self.euler_step(self.state, dt=1, beta=beta_batch)
        if self.round_state:
            self.state = self._pround(self.state)
            
        return self.state, 0, False, None

    @staticmethod
    def _pround(x):
        dx = np.random.uniform(size=x.shape) < (x-x.astype(np.int32))
        return x + dx
        
    # variable should have shape (batch, ) + shape
    def _to_batch(self, x, shape=()):
        # return placeholder key or callable
        if isinstance(x, str) or callable(x):
            return x
        
        x_arr = np.array(x)
        target_shape = (self.batch_size, ) + shape

        if x_arr.shape == target_shape:
            return x_arr
        elif (x_arr.shape == shape):
            return np.matlib.repmat(x_arr.reshape(shape), self.batch_size,1).reshape(target_shape)
        elif len(x_arr.shape) > 0 and x_arr.shape[0] == target_shape:
            return x_arr.reshape(target_shape)
        else:
            print("Warning: unable to convert to target shape", x, target_shape)
            return x
    
    def euler_step(self, X, dt, beta):
        
        X_ = np.array(X)
        n_steps = int(1/self.step_size)
        for _ in range(n_steps):
            dxdt = self._ode(X_, dt/n_steps, beta, self.alpha, self.eta, self.delta, self.N)
            X_ = X_ + dxdt
        return X_

    @staticmethod
    def _ode(Y, dt, beta, alpha, eta, delta, N, f=0):
        """Y = (S, I, R)^T """
        S, I, Q, R = Y[:,0], Y[:,1], Y[:,2], Y[:,3]
        
        dS = - beta * (1/N) * I * S
        dI = beta * (1/N) * I * S - (alpha + eta) * I
        dQ = eta * I - delta * Q
        dR = delta * Q + alpha * I

        return np.array([dS, dI, dQ, dR]).T * dt

### SIDARTHE Agent

The BatchSIDARTHE Agent implements the SIDARTHE mass-action model. It simulates timeseries for a batch of parameters at once.

In [None]:
# S: Susceptibles
# I : (infected) asymptomatic, infected, undetected
# D: (detected) asymptomatic, infected, detected
# A: (ailing) symptomatic, infected, undetected
# R: (recognized) symptomatic, infected, detected
# T: (threatened) acutely symptomatoc, infected, detected
# H: (healed)
# E: (extinct)
class BatchSIDARTHE(Agent):
    def __init__(self,
                 s0, # initial state
                 batch_size=1,
                 alpha=0.57, # contagion from I
                 beta=0.011, # contagion from D
                 gamma=0.456, # contagion from A
                 delta=0.011, # contagion from R
                 epsilon=0.171, # diagnosis
                 zeta=0.125, # developing symptoms while undiagnosed
                 eta=0.125, # developing symptoms after diagnosis
                 theta=0.371, # diagnosis after symptoms
                 kappa=0.017, # A -> H
                 h=0.034, # I -> H
                 mu=0.012, # A -> T
                 nu=0.027, # R -> T
                 xi=0.017, # R -> H
                 rho=0.034, # D -> H
                 sigma=0.017, # T-> H
                 tau=0.003, # T -> E
                 N=6e7,
                 round_state=False, 
                 step_size=0.1):
        
        """Class for SIDARTHE dynamics of the environment
        https://arxiv.org/abs/2003.09861

        Parameters
        ----------
        
        N (int, default=10000): the total siwe of the population
        round_state(bool): round state to nearest integer after each step.

        Attributes
        ----------
        observation_space (gym.spaces.Box, shape=(3,)): at each step, the
            environment only returns the true values S, I, R
        action_space (gym.spaces.Box, shape=(1)): the value beta

        #TODO necessary to wrap gym.Env?

        """
    
        self.batch_size = batch_size  
        self.s0 = s0
        self.N = self._to_batch(N)
        assert (np.sum(self.s0) == self.N).all()
        
        self.alpha = self._to_batch(alpha)
        self.beta = self._to_batch(beta)
        self.gamma = self._to_batch(gamma)
        self.delta = self._to_batch(delta)
        self.epsilon = self._to_batch(epsilon)
        self.zeta = self._to_batch(zeta)
        self.eta = self._to_batch(eta)
        self.theta = self._to_batch(theta)
        self.kappa = self._to_batch(kappa)
        self.h = self._to_batch(h)
        self.mu = self._to_batch(mu)
        self.nu = self._to_batch(nu)
        self.xi = self._to_batch(xi)
        self.rho = self._to_batch(rho)
        self.sigma = self._to_batch(sigma)
        self.tau = self._to_batch(tau)
        
        self.round_state = round_state
        self.step_size = step_size

    def reset(self):
        """returns initial state (s0,  i0, r0)"""
        self.state = self._to_batch(self.s0, (8,))
        if self.round_state:
            self.state = self._pround(self.state)

        return self.state

    def step(self, action=None):
        """performs integration step"""
        
        alpha = self._to_batch(self._get_input(self.alpha, action))
        beta = self._to_batch(self._get_input(self.beta, action))
        gamma = self._to_batch(self._get_input(self.gamma, action))
        delta = self._to_batch(self._get_input(self.delta, action))
        
        self.state = self.euler_step(self.state, 
                                     dt=1, 
                                     alpha=alpha,
                                     beta=beta, 
                                     gamma=gamma, 
                                     delta=delta)
        if self.round_state:
            self.state = self._pround(self.state)
            
        return self.state, 0, False, None

    @staticmethod
    def _pround(x):
        dx = np.random.uniform(size=x.shape) < (x-x.astype(np.int32))
        return x + dx
        
    # variable should have shape (batch, ) + shape
    def _to_batch(self, x, shape=()):
        # return placeholder key or callable
        if isinstance(x, str) or callable(x):
            return x
        
        x_arr = np.array(x)
        target_shape = (self.batch_size, ) + shape

        if x_arr.shape == target_shape:
            return x_arr
        elif (x_arr.shape == shape):
            return np.matlib.repmat(x_arr.reshape(shape), self.batch_size,1).reshape(target_shape)
        elif len(x_arr.shape) > 0 and x_arr.shape[0] == target_shape:
            return x_arr.reshape(target_shape)
        else:
            print("Warning: unable to convert to target shape", x, target_shape)
            return x
    
    def euler_step(self, X, dt, alpha, beta, gamma, delta):
        
        X_ = np.array(X)
        n_steps = int(1/self.step_size)
        for _ in range(n_steps):
            dxdt = self._ode(X_, 
                             dt/n_steps, 
                             self.N, 
                             alpha,
                             beta,
                             gamma,
                             delta,
                             self.epsilon,
                             self.zeta,
                             self.eta,
                             self.theta,
                             self.kappa,
                             self.h,
                             self.mu,
                             self.nu,
                             self.xi,
                             self.rho,
                             self.sigma,
                             self.tau)
            X_ = X_ + dxdt
        return X_

    @staticmethod
    def _ode(Y, dt, N,
             alpha,
             beta, 
             gamma, 
             delta, 
             epsilon, 
             zeta,
             eta, 
             theta,
             kappa,
             h,
             mu,
             nu,
             xi,
             rho,
             sigma,
             tau):
        """Y = (S, I, R)^T """
        keys = ['S', 'I', 'D', 'A', 'R', 'T', 'H', 'E']
        S, I, D, A, R, T, H, E = [Y[:,i] for i in range(8)]
        
        newly_infected = (alpha*I + beta*D + gamma*A + delta*R)
        dS = -S/N * newly_infected
        dI = S/N * newly_infected - (epsilon + zeta + h)*I
        dD = epsilon*I - (eta + rho)*D
        dA = zeta*I - (theta + mu + kappa)*A
        dR = eta*D + theta*A - (nu + xi)*R
        dT = mu*A + nu*R - (sigma + tau)*T
        dH = h*I + rho*D + kappa*A + xi*R + sigma*T
        dE = tau*T

        return np.array([dS, dI, dD, dA, dR, dT, dH, dE]).T * dt
    
    def R0(self):
        r1 = self.epsilon + self.zeta + self.h
        r2 = self.eta + self.rho
        r3 = self.theta + self.mu + self.kappa
        r4 = self.nu + self.xi
        r5 = self.sigma + self.tau
        
        r0 = self.alpha / r1
        r0 += self.beta * self.epsilon / (r1 * r2)
        r0 += self.gamma * self.zeta / (r1 * r3)
        r0 += self.delta * self.eta * self.epsilon / (r1 * r2 * r4)
        r0 += self.delta * self.zeta * self.theta / (r1 * r3 * r4)
        return r0
    
    def render(self, mode='human'):
        pass

    def close(self):
        pass

### Lockdown Agent

In [None]:
class BatchLockdown(Agent):
    """ Lockdown policy.
    
    Agent returns 
        [0, ... suppression start]: beta_high
        [suppression_start, ..., suppression_end): beta_low
        [suppression_end, ..., END]: beta_high
    """
    def __init__(self, beta_high=1, beta_low=0, batch_size=1, suppression_start=0, suppression_end=None):
        self.batch_size = batch_size
        self.beta_high = self._to_batch(beta_high)
        self.beta_low = self._to_batch(beta_low)
        self.suppression_start = suppression_start
        self.suppression_end = suppression_end
        
    # variable should have shape (batch, ) + shape
    def _to_batch(self, x, shape=()):
        # return placeholder key or callable
        if isinstance(x, str) or callable(x):
            return x
        
        x_arr = np.array(x)
        target_shape = (self.batch_size, ) + shape

        if x_arr.shape == target_shape:
            return x_arr
        elif (x_arr.shape == shape):
            return np.matlib.repmat(x_arr.reshape(shape), self.batch_size,1).reshape(target_shape)
        elif len(x_arr.shape) > 0 and x_arr.shape[0] == target_shape:
            return x_arr.reshape(target_shape)
        else:
            print("Warning: unable to convert to target shape", x, target_shape)
            return x
        
    def reset(self):
        self.steps = 1
        return self.beta_high
        
    def step(self, x):
        y = self.beta_high
        if (self.steps >= self.suppression_start):
            y = self.beta_low
        if (self.suppression_end is not None) and (self.steps >= self.suppression_end):
            y = self.beta_high
        self.steps += 1
        return y, 0, False, None

### FPSP Agent

Implementation of the proposed Fast Periodic Switching Policy (FPSP).

In [None]:
class BatchFPSP(Agent):
    """ Fast Periodic Switching between high and low beta policy.
    
    Implementation of https://robertshorten.files.wordpress.com/2020/03/fpsr_title.pdf
    
    Agent returns 
        [0, ... suppression start]: beta_high
        [suppression_start, ..., switching_start): beta_low
        [switching_start, ..., END]: beta_high for steps_high followed by beta_low for steps_low steps.
    
    """
    def __init__(self, beta_high=1, beta_low=0, steps_high=1, steps_low=1, batch_size=1, suppression_start=0, switching_start=0):
        self.batch_size = batch_size
        self.beta_high = self._to_batch(beta_high)
        self.beta_low = self._to_batch(beta_low)
        self.steps_high = steps_high
        self.steps_low = steps_low
        self.suppression_start = suppression_start
        self.switching_start = switching_start
        
    # variable should have shape (batch, ) + shape
    def _to_batch(self, x, shape=()):
        # return placeholder key or callable
        if isinstance(x, str) or callable(x):
            return x
        
        x_arr = np.array(x)
        target_shape = (self.batch_size, ) + shape

        if x_arr.shape == target_shape:
            return x_arr
        elif (x_arr.shape == shape):
            return np.matlib.repmat(x_arr.reshape(shape), self.batch_size,1).reshape(target_shape)
        elif len(x_arr.shape) > 0 and x_arr.shape[0] == target_shape:
            return x_arr.reshape(target_shape)
        else:
            print("Warning: unable to convert to target shape", x, target_shape)
            return x
        
    def reset(self):
        self.steps = 1
        self.cycle_length = None
        return self.beta_high
        
    def step(self, x):
        y = self.beta()
        self.steps += 1
        return y, 0, False, None
    
    def beta(self):
        is_phase_3 = self.steps >= self.switching_start
        is_phase_2 = (self.steps >= self.suppression_start) * (1-is_phase_3)
        is_phase_1 = (1-is_phase_3)*(1-is_phase_2)
        
        cycle_length = self.steps_high + self.steps_low
        cycle_step = np.maximum(0, (self.steps - self.switching_start)).astype(np.int32)
        cycle_step = np.mod(cycle_step, cycle_length)
        is_high_cycle = cycle_step < self.steps_high
        
        out = is_phase_1 * self.beta_high + \
              is_phase_2 * self.beta_low + \
              is_phase_3 * (is_high_cycle * self.beta_high + (1-is_high_cycle) * self.beta_low)
        return out

## Sensitivity to Uncertainty in Model Parameters

### Sample Parameters

A batch of 1000 sets of parameters are sampled from their joint prior distribution.

#### Sample SIQR

In [None]:
n_samples = 1000
default_sd = 0.1 # default relative standard deviation
outfile = 'siqr_sensitivity_r0_samples.npz'

with pm.Model() as model:    
    # Distribution parameters taken from meta analysis in [1]
    R0 = pm.TruncatedNormal('R0', mu=2.675739, sd=0.5719293, lower=0)
    
    # alpha and eta depend on 
    # p_symptomatic:            the probability of developing symptoms
    # p_detection:              the probability of symptomatic patients being tested and confirmed positive
    # undetected_recovery_rate: the rate at which asymptomatic and undetected symptomatic individuals recover
    # test_confirmation_rate:   the rate at which infected individuals get tested and results confirmed
    
    # mean from https://www.niid.go.jp/niid/en/2019-ncov-e/9417-covid-dp-fe-02.html
    p_symptomatic = 0.5
    p_symptomatic = pm.TruncatedNormal('p_symptomatic', mu=p_symptomatic, sd=p_symptomatic*default_sd, lower=0, upper=1)
    
    # assumption from [3]
    p_detection = 0.66 
    p_detection = pm.TruncatedNormal('p_detection', mu=p_detection, sd=p_detection*default_sd, lower=0, upper=1)    
    p_quarantine = p_symptomatic * p_detection
    
    # assumption from [3]
    undetected_recovery_rate = 0.1 
    undetected_recovery_rate = pm.TruncatedNormal('undetected_recovery_rate', 
                                                  mu=undetected_recovery_rate, 
                                                  sd=undetected_recovery_rate*default_sd, 
                                                  lower=0)
    
    alpha = pm.Deterministic('alpha', (1-p_quarantine) * undetected_recovery_rate)
    
    # relates to time from infection to confirmed test result
    # assumption taken from [3]
    test_confirmation_rate = 0.2 
    test_confirmation_rate = pm.TruncatedNormal('test_confirmation_rate', 
                                                  mu=test_confirmation_rate, 
                                                  sd=test_confirmation_rate*default_sd, 
                                                  lower=0)
    
    eta = pm.Deterministic('eta', p_quarantine * test_confirmation_rate)

    beta = pm.Deterministic('beta', R0 * ( alpha + eta))

    # recovery rate of quarantined individuals
    # estimate taken from [3]
    delta = pm.TruncatedNormal('delta', 
                               mu=0.036, 
                               sd=0.003, 
                               lower=0)
    
    prior = pm.sample_prior_predictive(n_samples)

fig, axes = plt.subplots(2, 3, figsize=(16, 8))

ax = axes[0][0]
ax.hist(prior['R0'], density=True, bins=20);
ax.set_title('R0')

ax = axes[0][1]
ax.hist(prior['beta'], density=True, bins=20);
ax.set_title('S->I (beta)')

ax = axes[0][2]
ax.hist(prior['eta'], density=True, bins=20);
ax.set_title('I -> Q (eta)')

ax = axes[1][0]
ax.hist(prior['alpha'], density=True, bins=20);
ax.set_title('I -> R (alpha)')

ax = axes[1][1]
ax.hist(prior['delta'], density=True, bins=20);
ax.set_title('Q->R (delta)')

np.savez(outfile, **prior)

#### Sample SIDARTHE

In [None]:
### SIQR Sensitivity to Uncertainty
# default latent parameters
alpha_ = 0.570
beta_ = 0.011
gamma_ = 0.456
delta_ = 0.011
abcd_ = alpha_ + beta_ + gamma_ + delta_

epsilon_ = 0.171
theta_ = 0.371
zeta_ = 0.125
eta_ = 0.125
mu_ = 0.012
nu_ = 0.027
tau_ = 0.003
h_ = 0.034
rho_ = 0.034
kappa_ = 0.017
xi_ = 0.017
sigma_ = 0.017

n_samples = 1000
default_sd = 0.1 # default relative standard deviation
outfile = 'sidarthe_sensitivity_r0_samples.npz'

with pm.Model() as model:    
    # Distribution parameters taken from meta analysis in [1]
    R0_target = pm.TruncatedNormal('R0', mu=2.675739, sd=0.5719293, lower=0)
    
    # Noisy samples around parameters taken from [4]
    alpha = pm.TruncatedNormal('alpha_', mu=alpha_, sd=alpha_*default_sd, lower=0)
    beta = pm.TruncatedNormal('beta_', mu=beta_, sd=beta_*default_sd, lower=0)
    gamma = pm.TruncatedNormal('gamma_', mu=gamma_, sd=gamma_*default_sd, lower=0)
    delta = pm.TruncatedNormal('delta_', mu=delta_, sd=delta_*default_sd, lower=0)
    epsilon = pm.TruncatedNormal('epsilon', mu=epsilon_, sd=epsilon_*default_sd, lower=0)
    theta = pm.TruncatedNormal('theta', mu=theta_, sd=theta_*default_sd, lower=0)
    zeta = pm.TruncatedNormal('zeta', mu=zeta_, sd=zeta_*default_sd, lower=0)
    eta = pm.TruncatedNormal('eta', mu=eta_, sd=eta_*default_sd, lower=0)
    mu = pm.TruncatedNormal('mu', mu=mu_, sd=mu_*default_sd, lower=0)
    nu = pm.TruncatedNormal('nu', mu=nu_, sd=nu_*default_sd, lower=0)
    tau = pm.TruncatedNormal('tau', mu=tau_, sd=tau_*default_sd, lower=0)
    h = pm.TruncatedNormal('h', mu=h_, sd=h_*default_sd, lower=0)
    rho = pm.TruncatedNormal('rho', mu=rho_, sd=rho_*default_sd, lower=0)
    kappa = pm.TruncatedNormal('kappa', mu=kappa_, sd=kappa_*default_sd, lower=0)
    xi = pm.TruncatedNormal('xi', mu=xi_, sd=xi_*default_sd, lower=0)
    sigma = pm.TruncatedNormal('sigma', mu=sigma_, sd=sigma_*default_sd, lower=0)
    
    # rescale R0 to target
    r1 = epsilon + zeta + h
    r2 = eta + rho
    r3 = theta + mu + kappa
    r4 = nu + xi
    r5 = sigma + tau
    r0 = alpha / r1
    r0 += beta * epsilon / (r1 * r2)
    r0 += gamma * zeta / (r1 * r3)
    r0 += delta * eta * epsilon / (r1 * r2 * r4)
    r0 += delta * zeta * theta / (r1 * r3 * r4)
    
    alpha = pm.Deterministic('alpha', alpha/r0 * R0_target)
    beta = pm.Deterministic('beta', beta/r0 * R0_target)
    gamma = pm.Deterministic('gamma', gamma/r0 * R0_target)
    delta = pm.Deterministic('delta', delta/r0 * R0_target)
    
    prior = pm.sample_prior_predictive(n_samples)
    
np.savez(outfile, **prior)

### Simulate

In [None]:
# function used across simulations in this section to plot quantile infecteds 
def plot_stats(env_name, stats, plot_steps, logy=False):
    plt.rcParams.update({'font.size': 16})
    fig, ax = plt.subplots(figsize=(10, 8))
    if logy:
        ax.semilogy(stats[0,:plot_steps,-1]/N*100, label='median, {}'.format(env_name))
    else:
        ax.plot(stats[0,:plot_steps,-1]/N*100, label='median, {}'.format(env_name))
    ax.plot(stats[1,:plot_steps,-1]/N*100, label=' 75%, {}'.format(env_name))
    ax.plot(stats[2,:plot_steps,-1]/N*100, label=' 95%, {}'.format(env_name))
    ax.set_xlabel('time (days)')
    ax.set_ylabel('Total Infected People [%]')
    
    if logy:
        ylim = ax.get_ylim()
        ax.set_ylim(10e-3, ylim[1])
    ax.grid(True, axis='y')
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.legend()
    plt.tight_layout()

#### SIQR with Lockdown

Simulations with the lockdown policy are used to determine the start time of the FPSP such that the observable number of infected individuals (Q) has fallen below its median value after 50 days.

In [None]:
# SIQR - Lockdown

# REMARK: note that the percentile graphs do not correspond to a single policy, 
# but instead show the x-percentile value at each timestep t

# REMARK: policy (0, Y) map to the same policy ()

# load samples
source_file = 'siqr_sensitivity_r0_samples.npz'
# load samples
samples = {}
with np.load(source_file) as f:
    for x in f:
        samples[x] = f[x]
        
batch_size = samples['R0'].shape[0]
n_steps = 350
N = 1e7
I0 = 500/6
step_size = 0.001

# Lockdown policy
suppression_start = 20
suppression_end = None #150
lockdown_effectiveness = 0.175

beta_high = samples['beta']
beta_low = samples['beta']*lockdown_effectiveness

siqr = BatchSIQR(N=N, I0=I0,
                             alpha=samples['alpha'],
                             beta='policy', 
                             eta=samples['eta'], 
                             delta=samples['delta'],
                             batch_size=batch_size,
                             step_size=step_size)

policy = BatchLockdown(beta_high=beta_high,
                             beta_low=beta_low, 
                             suppression_start=suppression_start,
                             suppression_end=suppression_end,
                             batch_size=batch_size)

env = Composite()
env.add(siqr, out='siqr')
env.add(policy, out='policy')

states = [env.reset()] + [env.step(None)[0] for _ in range(n_steps)]
s_siqr = np.array([s['siqr'] for s in states]).swapaxes(1, 0)
s_policy = np.array([s['policy'] for s in states]).swapaxes(1,0)


infected = np.sum(s_siqr[:,:,1:3], axis=2) # sum over I and Q
s_siqr = np.concatenate([s_siqr, infected.reshape(infected.shape + (1,))], axis=2)
s_siqr_stats = np.quantile(s_siqr, q=[0.5, 0.75, 0.95], axis=0)

filename = 'siqr_ldp_{}_{}_{}_sensitivity_batch_{}_step{}'.format(suppression_start, 
                                                                  suppression_end,
                                                                 lockdown_effectiveness,
                                                                 batch_size, 
                                                                 step_size)


plot_stats('LDP', s_siqr_stats, n_steps, logy=False)
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
#plt.plot([50, 50], ylim, '--', c='blue')
#plt.plot([suppression_end, suppression_end], ylim, '--', c='black')
#plt.savefig(filename+'.png')

plot_stats('LDP', s_siqr_stats, n_steps, logy=True)
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
#plt.plot([50, 50], ylim, '--', c='blue')
#plt.plot([suppression_end, suppression_end], ylim, '--', c='black')
#plt.savefig(filename+'_logy.png')

# select switching start by number of quarantined
# remark: median quarantined at step 50: Q(50)=20042 ~ 0.0334 %
switching_start=50
median_quarantined = s_siqr_stats[0,switching_start,2]
print('Q(50) median', median_quarantined, median_quarantined/N*100)
switching_start = np.argmax(s_siqr[:,switching_start:,2] <=median_quarantined, axis=1)+switching_start
plt.subplots(figsize=(10, 8))
plt.hist(switching_start, bins=10, density=True)
plt.xlabel('lockdown end (day)');
#plt.savefig(filename+'_end_day_histogram.png')

np.savez(filename+'.npz', states=s_siqr, stats=s_siqr_stats, betas=s_policy, switching_start=switching_start)

#### SIDARTHE with lockdown

In [None]:
source_file = 'sidarthe_sensitivity_r0_samples.npz'
# load samples
samples = {}
with np.load(source_file) as f:
    for x in f:
        samples[x] = f[x]
        
# ODE
batch_size = samples['R0'].shape[0]
alpha = samples['alpha']
beta = samples['beta']
gamma = samples['gamma']
delta = samples['delta']

# initial condition
N=1e7
I = 500/6
D = 20
A = 1
R = 2
T = H = E = 0
S = N - I - D - A - R - T - H - E
s0 = np.array([S, I, D, A, R, T, H, E])
step_size=0.001

lockdown_effectiveness = 0.175
suppression_start = 20
suppression_end = None
beta_high = 1.

model = BatchSIDARTHE(s0=s0, 
                    alpha='a',
                    beta='b',
                    gamma='g',
                    delta='d',
                    epsilon=samples['epsilon'],
                    theta=samples['theta'],
                    zeta=samples['zeta'],
                    eta=samples['eta'],
                    mu=samples['mu'],
                    nu=samples['nu'],
                    tau=samples['tau'],
                    h=samples['h'],
                    rho=samples['rho'],
                    kappa=samples['kappa'],
                    xi=samples['xi'],
                    sigma=samples['sigma'],
                    N=N, 
                    batch_size=batch_size,
                    step_size=step_size)

policy = BatchLockdown(beta_high=beta_high,
                 beta_low=lockdown_effectiveness*beta_high, 
                 suppression_start=suppression_start,
                 suppression_end=suppression_end,
                 batch_size=1)

env = Composite()
env.add(model, 
        pre=lambda x: {'a': x['policy']*alpha,
                       'b': x['policy']*beta,
                       'g': x['policy']*gamma,
                       'd': x['policy']*delta},
        out='model')
env.add(policy, out='policy')

labels = ['S', 'I', 'D', 'A', 'R', 'T', 'H', 'E']
n_steps = 350
env_out = [env.reset()] + [env.step()[0] for _ in range(n_steps)]
s_policy = np.array([o['policy'] for o in env_out]).swapaxes(0,1)
env_out = np.array([o['model'] for o in env_out]).swapaxes(0,1)
infected = np.sum(env_out[:,::,1:6], axis=2) # sum over I D A R T
detected = np.sum(env_out[:,::,[2, 4, 5]], axis=2) # sum over D R T

env_out = np.concatenate([env_out, infected.reshape(infected.shape + (1,))], axis=2)
env_out = np.concatenate([env_out, detected.reshape(detected.shape + (1,))], axis=2)
env_stats = np.quantile(env_out, q=[0.5, 0.75, 0.95], axis=0)

filename = 'sidarthe_ldp_{}_{}_{}_sensitivity_batch_{}_step{}'.format(suppression_start, 
                                                                  suppression_end,
                                                                 lockdown_effectiveness,
                                                                 batch_size, 
                                                                 step_size)


plot_stats('LDP', env_stats, n_steps, logy=True)
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')

np.savez(filename+'.npz', states=env_out, stats=env_stats)
#plt.savefig(filename+'_steps_{}.png'.format(n_steps))

# select switching start by number of quarantined
# remark: median quarantined at step 50: Q(50)=20042 ~ 0.0334 %
switching_start=50
median_detected = env_stats[0,switching_start,-1]
print('detected(50) median', median_detected, median_detected/N*100)
switching_start = np.argmax(env_out[:,switching_start:,-1] <=median_detected, axis=1)+switching_start
plt.subplots(figsize=(10, 8))
plt.hist(switching_start, bins=10, density=True)
plt.xlabel('lockdown end (day)');
#plt.savefig(filename+'_end_day_histogram.png')
np.savez(filename+'.npz', states=env_out, stats=env_stats, betas=s_policy, switching_start=switching_start)

# detected(50) median 212432.9703987951 2.124329703987951

#### SIQR with FPSP (variable switching start)

In [None]:
# SIQR - variable switching start

# REMARK: note that the percentile graphs do not correspond to a single policy, 
# but instead show the x-percentile value at each timestep t

# load samples
source_file = 'siqr_sensitivity_r0_samples.npz'
# load samples
samples = {}
with np.load(source_file) as f:
    for x in f:
        samples[x] = f[x]
        
batch_size = samples['R0'].shape[0]
n_steps = 1000
N = 1e7
I0 = 500/6
step_size = 0.001

# FPSP
suppression_start = 20
filename = 'siqr_ldp_20_None_0.175_sensitivity_batch_1000_step0.001.npz'
with np.load(filename) as f:
    switching_start = f['switching_start']
    
lockdown_effectiveness = 0.175

beta_high = samples['beta']
beta_low = samples['beta']*lockdown_effectiveness

policies = [#(1, 6), (2, 5), (3, 4), 
            (2, 12), (3, 11), (4, 10),
            (4, 24), (6, 22), (8, 20),
            (8, 48), (12, 44), (16, 40), 
            (16, 96), (24, 88), (32, 80)]

for steps_high, steps_low in policies:
    if (steps_low == 0) and (steps_high == 0):
        continue
    if (steps_low == 0) and (steps_high > 1):
        continue
    if (steps_high == 0) and (steps_low > 1):
        continue

    env_name = 'FPSP-({}, {})'.format(steps_high, steps_low)
    print('simulating', env_name)    
    siqr = BatchSIQR(N=N, I0=I0,
                                 alpha=samples['alpha'],
                                 beta='fpsp', 
                                 eta=samples['eta'], 
                                 delta=samples['delta'],
                                 batch_size=batch_size,
                                 step_size=step_size)

    fpsp = BatchFPSP(beta_high=beta_high,
                                 beta_low=beta_low, 
                                 steps_high=steps_high,
                                 steps_low=steps_low, 
                                 suppression_start=suppression_start, 
                                 switching_start=switching_start, 
                                 batch_size=batch_size)

    env = Composite()
    env.add(siqr, out='siqr')
    env.add(fpsp, out='fpsp')

    states = [env.reset()] + [env.step(None)[0] for _ in range(n_steps)]
    s_siqr = np.array([s['siqr'] for s in states]).swapaxes(1, 0)
    s_fpsp = np.array([s['fpsp'] for s in states]).swapaxes(1,0)

    filename = 'siqr_fpsp_{}_var_{}_{}_{}_sensitivity_batch_{}_step{}x{}'.format(suppression_start, 
                                                                     steps_high+steps_low, 
                                                                     steps_high, 
                                                                     lockdown_effectiveness,
                                                                     batch_size, 
                                                                     n_steps,
                                                                     step_size)

    infected = np.sum(s_siqr[:,:,1:3], axis=2) # sum over I and Q
    s_siqr = np.concatenate([s_siqr, infected.reshape(infected.shape + (1,))], axis=2)
    s_siqr_stats = np.quantile(s_siqr, q=[0.5, 0.75, 0.95], axis=0)
    #np.savez(filename+'.npz', states=s_siqr, stats=s_siqr_stats, betas=s_fpsp)
    
    plot_stats(env_name, s_siqr_stats, n_steps)
    ylim = plt.ylim()
    plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
    plt.plot([50, 50], ylim, '--', c='blue')
    plt.savefig(filename+'.eps', dpi=1200)

    plot_stats(env_name, s_siqr_stats, n_steps, logy=True)
    ylim = plt.ylim()
    plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
    plt.plot([50, 50], ylim, '--', c='blue')
    plt.savefig(filename+'_logy.eps', dpi=1200)

#### SIDARTHE with FPSP (variable switching start)

In [None]:
source_file = 'sidarthe_sensitivity_r0_samples.npz'
# load samples
samples = {}
with np.load(source_file) as f:
    for x in f:
        samples[x] = f[x]
        
# FPSP
suppression_start = 20
filename = 'sidarthe_ldp_20_None_0.175_sensitivity_batch_1000_step0.001.npz'
with np.load(filename) as f:
    switching_start = f['switching_start']
        
# ODE
batch_size = samples['R0'].shape[0]
alpha = samples['alpha']
beta = samples['beta']
gamma = samples['gamma']
delta = samples['delta']

# initial condition
N=1e7
I = 500/6
D = 20
A = 1
R = 2
T = H = E = 0
S = N - I - D - A - R - T - H - E
s0 = np.array([S, I, D, A, R, T, H, E])
step_size=0.001

lockdown_effectiveness = 0.175
suppression_start = 20
n_steps = 1000

policies = [#(1, 6), (2, 5), (3, 4), 
            (4, 10), (5,9), (6, 8),
            (8, 20), (10, 18), (12, 16),
            (16, 40), (20, 36), (24, 32),
            (32, 80), (40, 72), (48, 64)]

for steps_high, steps_low in policies:
    if (steps_low == 0) and (steps_high == 0):
        continue
    if (steps_low == 0) and (steps_high > 1):
        continue
    if (steps_high == 0) and (steps_low > 1):
        continue

    env_name = 'FPSP-({}, {})'.format(steps_high, steps_low)
    print('simulating', env_name)  

    model = BatchSIDARTHE(s0=s0, 
                        alpha='a',
                        beta='b',
                        gamma='g',
                        delta='d',
                        epsilon=samples['epsilon'],
                        theta=samples['theta'],
                        zeta=samples['zeta'],
                        eta=samples['eta'],
                        mu=samples['mu'],
                        nu=samples['nu'],
                        tau=samples['tau'],
                        h=samples['h'],
                        rho=samples['rho'],
                        kappa=samples['kappa'],
                        xi=samples['xi'],
                        sigma=samples['sigma'],
                        N=N, 
                        batch_size=batch_size,
                        step_size=step_size)

    policy = BatchFPSP(beta_high=1,
                         beta_low=lockdown_effectiveness, 
                         steps_high=steps_high,
                         steps_low=steps_low, 
                         suppression_start=suppression_start, 
                         switching_start=switching_start, 
                         batch_size=batch_size)

    env = Composite()
    env.add(model, 
            pre=lambda x: {'a': x['policy']*alpha,
                           'b': x['policy']*beta,
                           'g': x['policy']*gamma,
                           'd': x['policy']*delta},
            out='model')
    env.add(policy, out='policy')

    labels = ['S', 'I', 'D', 'A', 'R', 'T', 'H', 'E']
    
    env_out = [env.reset()] + [env.step()[0] for _ in range(n_steps)]
    s_policy = np.array([o['policy'] for o in env_out]).swapaxes(0,1)
    env_out = np.array([o['model'] for o in env_out]).swapaxes(0,1)
    infected = np.sum(env_out[:,::,1:6], axis=2) # sum over I D A R T
    env_out = np.concatenate([env_out, infected.reshape(infected.shape + (1,))], axis=2)
    env_stats = np.quantile(env_out, q=[0.5, 0.75, 0.95], axis=0)

    filename = 'sidarthe_fpsp_{}_var_{}_{}_{}_sensitivity_batch_{}_step{}x{}'.format(suppression_start, 
                                                                         steps_high, 
                                                                         steps_low, 
                                                                         lockdown_effectiveness,
                                                                         batch_size, 
                                                                         n_steps,
                                                                         step_size)

    #np.savez(filename+'.npz', states=env_out, stats=env_stats, betas=s_policy)
    
    plot_stats(env_name, env_stats, n_steps)
    ylim = plt.ylim()
    plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
    plt.plot([50, 50], ylim, '--', c='blue')
    plt.savefig(filename+'.eps', dpi=1200)

    plot_stats(env_name, env_stats, n_steps, logy=True)
    ylim = plt.ylim()
    plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
    plt.plot([50, 50], ylim, '--', c='blue')
    plt.savefig(filename+'_logy.eps', dpi=1200)

## Compensatory and Anticipatory Behavior

So far, it has been assumed that the rate of infection during working days would revert back to the rate observed pre-quarantine. The rate of infection may increase above pre-quarantine level if the population mixes with higher frequency during working days. This may occur initially in response to prolonged lockdown and subsequently in anticipation of the following quarantine period.

### SIQR Compensatory Behavior

In [None]:
# SIQR - FPSP (post-quarantine compensation)

# REMARK: note that the percentile graphs do not correspond to a single policy, 
# but instead show the x-percentile value at each timestep t

compensation = np.array([0.05*i for i in range(0, 21)])
batch_size = compensation.shape[0]# number of different values for d
n_steps = 350
N = 1e7
I0 = 500/6
step_size = 0.001

# FPSP
suppression_start = 20
switching_start = 50
lockdown_effectiveness = 0.175

beta_high = 0.373
beta_low = beta_high*lockdown_effectiveness
beta_compensation = beta_high*(1+ compensation)

alpha = 0.067
eta = 0.067
delta = 0.036

steps_high=1
steps_low=6
          
print('FPSP-(', steps_high, ',', steps_low, ')')    
siqr = BatchSIQR(N=N, I0=I0,
                             alpha=alpha,
                             beta='fpsp', 
                             eta=eta, 
                             delta=delta,
                             batch_size=batch_size,
                             step_size=step_size)

fpsp = BatchFPSP(beta_high=beta_high,
                             beta_low=beta_low, 
                             steps_high=steps_high,
                             steps_low=steps_low, 
                             suppression_start=suppression_start, 
                             switching_start=switching_start, 
                             batch_size=batch_size)

env = Composite()
env.add(siqr, out='siqr')
env.add(fpsp, out='fpsp')

states = [env.reset()]
for i in range(n_steps):
    if i > suppression_start:
        fpsp.beta_high = beta_compensation
    states.append(env.step(None)[0])
s_siqr = np.array([s['siqr'] for s in states]).swapaxes(1, 0)
s_fpsp = np.array([s['fpsp'] for s in states]).swapaxes(1,0)

filename = 'fpsp_compensatory_{}_{}_{}_{}_{}_sensitivity_batch_{}_step{}'.format(suppression_start, 
                                                                 switching_start, 
                                                                 steps_high, 
                                                                 steps_low, 
                                                                 lockdown_effectiveness,
                                                                 batch_size, 
                                                                 step_size)

infected = np.sum(s_siqr[:,:,1:3], axis=2) # sum over I and Q
s_siqr = np.concatenate([s_siqr, infected.reshape(infected.shape + (1,))], axis=2)
s_siqr_stats = np.quantile(s_siqr, q=[0.5, 0.75, 0.95], axis=0)
#np.savez(filename+'.npz', states=s_siqr, stats=s_siqr_stats, betas=s_fpsp)

def plot_comp(env_name, stats, compensation, plot_steps, logy=False):
    plt.rcParams.update({'font.size': 16})
    fig, ax = plt.subplots(figsize=(10, 8))
    #ax.plot(stats[0,:,1]/N*100, label='I')
    #ax.plot(stats[0,:,2]/N*100, label='Q')
    
    for i in range(compensation.shape[0]):
        if logy:
            ax.semilogy(stats[i,:plot_steps,4]/N*100, label='d={:.2f}, {}'.format(compensation[i], env_name))
        else:
            ax.plot(stats[i,:plot_steps,4]/N*100, label='d={:.2f}, {}'.format(compensation[i], env_name))
    ax.set_xlabel('time (days)')
    ax.set_ylabel('Total Infected People [%]')
    ylim = ax.get_ylim()
    if logy:
        ax.set_ylim(10e-3, ylim[1])
    ax.grid(True)
    ax.legend()
    plt.tight_layout()

idx = [0, 2, 4, 6, 8, 10, 12]    
    
plot_comp('FPSP-({}, {})'.format(steps_high, steps_low), s_siqr[idx], compensation[idx], n_steps)
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
plt.plot([switching_start, switching_start], ylim, '--', c='blue')
plt.savefig(filename+'.eps', dpi=1200)

In [None]:
### SIDARTHE Compensatory Behavior

# SIDARTHE with FPSP-(1, 6)
#
compensation = np.array([0.1*i for i in range(0, 20)])
batch_size = compensation.shape[0]# number of different values for d

lockdown_effectiveness = 0.175
steps_high = 1
steps_low = 6
suppression_start = 20
switching_start = 50

# ODE
step_size = 0.0005
n_steps = 350
    
# initial condition
N=1e7
I = 500/6
D = 20
A = 1
R = 2
T = H = E = 0
S = N - I - D - A - R - T - H - E
s0 = np.array([S, I, D, A, R, T, H, E])

# default latent parameters
alpha = 0.570
beta = 0.011
gamma = 0.456
delta = 0.011
epsilon = 0.171
theta = 0.371
zeta = 0.125
eta = 0.125
mu = 0.012
nu = 0.027
tau = 0.003
h = 0.034
rho = 0.034
kappa = 0.017
xi = 0.017
sigma = 0.017

model = BatchSIDARTHE(s0=s0, 
                    alpha='a', # variable
                    beta='b', # variable
                    gamma='g', # variable
                    delta='d', #variable
                    epsilon=epsilon,
                    theta=theta,
                    zeta=zeta,
                    eta=eta,
                    mu=mu,
                    nu=nu,
                    tau=tau,
                    h=h,
                    rho=rho,
                    kappa=kappa,
                    xi=xi,
                    sigma=sigma,
                    N=N, 
                    batch_size=batch_size,
                    step_size=step_size)

fpsp = BatchFPSP(beta_high=1,
                             beta_low=lockdown_effectiveness, 
                             steps_high=steps_high,
                             steps_low=steps_low, 
                             suppression_start=suppression_start, 
                             switching_start=switching_start, 
                             batch_size=batch_size)

env = Composite()
env.add(model, 
        pre=lambda x: {'a': x['fpsp']*alpha,
                       'b': x['fpsp']*beta,
                       'g': x['fpsp']*gamma,
                       'd': x['fpsp']*delta},
        out='model')
env.add(fpsp, out='fpsp')

o = [env.reset()]
for i in range(n_steps):
    if i > suppression_start:
        fpsp.beta_high = (1+compensation)
    o.append(env.step()[0])
    
s = np.array([x['model'] for x in o]).swapaxes(0, 1)

plt.rcParams.update({'font.size': 16})
fig, ax = plt.subplots(figsize=(10, 8))

labels = ['S', 'I', 'D', 'A', 'R', 'T', 'H', 'E']
for i in range(s.shape[0]):
    plt.plot(np.sum(s[i,::,1:6], axis=1)/N*100, label='d={:.2f}, FPSP({},{})'.format(compensation[i], steps_high, steps_low))
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
plt.plot([switching_start, switching_start], ylim, '--', c='blue')
ax.set_xlabel('time (days)')
ax.set_ylabel('Total Infected People [%]')
plt.legend()
plt.grid(True)
plt.tight_layout()
#plt.savefig('sidarthe_fpsp_{}_{}_{}_0.175_compensation.eps'.format(steps_high, steps_low, n_steps), dpi=1200)

In [None]:
plt.rcParams.update({'font.size': 16})
fig, ax = plt.subplots(figsize=(10, 8))

labels = ['S', 'I', 'D', 'A', 'R', 'T', 'H', 'E']

indices = [0, 4, 6, 8, 10, 11]
for i in indices:
    plt.plot(np.sum(s[i,::,1:6], axis=1)/N*100, label='d={:.2f}, FPSP({},{})'.format(compensation[i], steps_high, steps_low))
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
plt.plot([switching_start, switching_start], ylim, '--', c='blue')
ax.set_xlabel('time (days)')
ax.set_ylabel('Total Infected People [%]')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('sidarthe_fpsp_{}_{}_{}_0.175_compensation.eps'.format(steps_high, steps_low, n_steps), dpi=1200)

## Lockdown effectiveness

### SIQR Lockdown Effectiveness

In [None]:
# SIQR - FPSP (lockdown effectiveness)

lockdown_effectiveness = np.array([0.175 + 0.02*i for i in range(0, 6)])
print(lockdown_effectiveness)
batch_size = lockdown_effectiveness.shape[0]# number of different values for d
n_steps = 350
N = 1e7
I0 = 500/6
step_size = 0.001

# FPSP
suppression_start = 20
switching_start = 50
default_effectiveness = 0.175

beta_high = 0.373
beta_low = beta_high*default_effectiveness

alpha = 0.067
eta = 0.067
delta = 0.036

steps_high=1
steps_low=6

print('FPSP-(', steps_high, ',', steps_low, ')')    
siqr = BatchSIQR(N=N, I0=I0,
                             alpha=alpha,
                             beta='fpsp', 
                             eta=eta, 
                             delta=delta,
                             batch_size=batch_size,
                             step_size=step_size)

fpsp = BatchFPSP(beta_high=beta_high,
                             beta_low=beta_low, 
                             steps_high=steps_high,
                             steps_low=steps_low, 
                             suppression_start=suppression_start, 
                             switching_start=switching_start, 
                             batch_size=batch_size)

env = Composite()
env.add(siqr, out='siqr')
env.add(fpsp, out='fpsp')

states = [env.reset()]
for i in range(n_steps):
    if i >= switching_start:
        fpsp.beta_low = beta_high*lockdown_effectiveness
    states.append(env.step(None)[0])
s_siqr = np.array([s['siqr'] for s in states]).swapaxes(1, 0)
s_fpsp = np.array([s['fpsp'] for s in states]).swapaxes(1,0)

filename = 'fpsp_effectiveness_{}_{}_{}_{}_sensitivity_batch_{}_step{}'.format(suppression_start, 
                                                                 switching_start, 
                                                                 steps_high, 
                                                                 steps_low,
                                                                 batch_size, 
                                                                 step_size)

infected = np.sum(s_siqr[:,:,1:3], axis=2) # sum over I and Q
s_siqr = np.concatenate([s_siqr, infected.reshape(infected.shape + (1,))], axis=2)
s_siqr_stats = np.quantile(s_siqr, q=[0.5, 0.75, 0.95], axis=0)

#np.savez(filename+'.npz', states=s_siqr, stats=s_siqr_stats, betas=s_fpsp)

def plot_eff(env_name, stats, effectiveness, plot_steps, logy=False):
    plt.rcParams.update({'font.size': 16})
    fig, ax = plt.subplots(figsize=(10, 8))
    #ax.plot(stats[0,:,1]/N*100, label='I')
    #ax.plot(stats[0,:,2]/N*100, label='Q')
    
    for i in range(effectiveness.shape[0]):
        if logy:
            ax.semilogy(stats[i,:plot_steps,4]/N*100, label='q={:.3f}, {}'.format(effectiveness[i], env_name))
        else:
            ax.plot(stats[i,:plot_steps,4]/N*100, label='q={:.3f}, {}'.format(effectiveness[i], env_name))
    ax.set_xlabel('time (days)')
    ax.set_ylabel('Total Infected People [%]')
    if logy:
        ylim = ax.get_ylim()
        ax.set_ylim(10e-3, ylim[1])
    ax.grid(True)
    ax.legend()
    plt.tight_layout()  
    
plot_eff('FPSP-({}, {})'.format(steps_high, steps_low), s_siqr[:], lockdown_effectiveness[:], n_steps)
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
plt.plot([switching_start, switching_start], ylim, '--', c='blue')
plt.savefig(filename+'.eps', dpi=1200)
print('saved to', filename)

### SIDHARTE Lockdown Effectiveness

In [None]:
# SIDARTHE with FPSP-(1, 6)
#
# reduce alpha, beta, gamma, and delta by fixed coefficient
lockdown_effectiveness_start = 0.175
lockdown_effectiveness = np.array([0.175 + 0.04*i for i in range(0, 6)])
batch_size = lockdown_effectiveness.shape[0]

beta_high = 1.
lockdown_effectiveness_start *= beta_high
lockdown_effectiveness *= beta_high

steps_high = 1
steps_low = 6
suppression_start = 20
switching_start = 50

# ODE
step_size = 0.0005
n_steps = 350
    
# initial condition
N=1e7
I = 500/6
D = 20
A = 1
R = 2
T = H = E = 0
S = N - I - D - A - R - T - H - E
s0 = np.array([S, I, D, A, R, T, H, E])

# default latent parameters
alpha = 0.570
beta = 0.011
gamma = 0.456
delta = 0.011
epsilon = 0.171
theta = 0.371
zeta = 0.125
eta = 0.125
mu = 0.012
nu = 0.027
tau = 0.003
h = 0.034
rho = 0.034
kappa = 0.017
xi = 0.017
sigma = 0.017

model = BatchSIDARTHE(s0=s0, 
                    alpha='a', # variable
                    beta='b', # variable
                    gamma='g', # variable
                    delta='d', #variable
                    epsilon=epsilon,
                    theta=theta,
                    zeta=zeta,
                    eta=eta,
                    mu=mu,
                    nu=nu,
                    tau=tau,
                    h=h,
                    rho=rho,
                    kappa=kappa,
                    xi=xi,
                    sigma=sigma,
                    N=N, 
                    batch_size=batch_size,
                    step_size=step_size)

fpsp = BatchFPSP(beta_high=beta_high,
                             beta_low=lockdown_effectiveness_start, 
                             steps_high=steps_high,
                             steps_low=steps_low, 
                             suppression_start=suppression_start, 
                             switching_start=switching_start, 
                             batch_size=batch_size)

env = Composite()
env.add(model, 
        pre=lambda x: {'a': x['fpsp']*alpha,
                       'b': x['fpsp']*beta,
                       'g': x['fpsp']*gamma,
                       'd': x['fpsp']*delta},
        out='model')
env.add(fpsp, out='fpsp')

o = [env.reset()]
for i in range(n_steps):
    if i >= switching_start:
        fpsp.beta_low = lockdown_effectiveness
    o.append(env.step()[0])
    
s = np.array([x['model'] for x in o]).swapaxes(0, 1)

plt.rcParams.update({'font.size': 16})
fig, ax = plt.subplots(figsize=(10, 8))

labels = ['S', 'I', 'D', 'A', 'R', 'T', 'H', 'E']
for i in range(s.shape[0]):
    plt.plot(np.sum(s[i,::,1:6], axis=1)/N*100, label='q={:.3f}, FPSP({},{})'.format(lockdown_effectiveness[i], steps_high, steps_low))
ylim = plt.ylim()
plt.plot([suppression_start, suppression_start], ylim, '--', c='red')
plt.plot([switching_start, switching_start], ylim, '--', c='blue')
ax.set_xlabel('time (days)')
ax.set_ylabel('Total Infected People [%]')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig('sidarthe_fpsp_{}_{}_{}_lockdown_effectiveness_0.175.eps'.format(steps_high, steps_low, n_steps), dpi=1200)