In [1]:
%load_ext nb_black

<IPython.core.display.Javascript object>

In [138]:
import plotly
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio

pio.templates.default = "plotly_white"
import pandas as pd
import numpy as np

from ipywidgets import interact, interact_manual
import ipywidgets as widgets

<IPython.core.display.Javascript object>

### exponential growth figure

In [59]:
def get_series(t, i):
    """
    takes in a base population of infected <i>
    and the duration of spreading and returns an array of cumulative infected <c> (recovery period assumed as 14 days)
    and of newly infected <x> per day.
    """
    p = 0  # newly infected per day
    x = []  # array of cumulative infected per day
    c = []  # array of newly infected per day
    for z in range(t + 1):
        if z <= 14:
            p += i ** z
            x.append(p)
            c.append(i ** z)
        else:
            p += i ** z
            x.append(p)
            c.append(i ** z - c[z - 14])

    return (x, c)


def get_frame(t, vals):
    """
    takes in the duration of spreading <t> and an array of initially infected <vals> and calls get_series for every element of 
    vals. A pandas Dataframe with the concatenated results is returned.
    """
    df = pd.DataFrame()
    for i in vals:
        data = (
            pd.DataFrame(get_series(t, i))
            .transpose()
            .rename(columns={0: "cum_{}".format(i), 1: "ind_{}".format(i)})
        )
        df = pd.concat([df, data], axis=1, join="outer", ignore_index=False)
    return df.reset_index()

<IPython.core.display.Javascript object>

In [286]:
t = 10
vals_cont = list(np.linspace(1.5, 3, 5))

df = get_frame(t, vals_cont)
fig = go.Figure()
for i in vals_cont:
    fig.add_trace(
        go.Scatter(
            x=df["index"],
            y=df["cum_{}".format(i)],
            mode="lines",
            name="Growth factor of {}".format(i),
            line_shape="spline",
            legendgroup="cum",
        )
    )
#     fig.add_trace(
#         go.Scatter(
#             x=df["index"],
#             y=df["ind_{}".format(i)],
#             mode="lines",
#             name="Daily {}".format(i),
#             line_shape="spline",
#             line=dict(dash="dash"),
#             legendgroup="ind",
#         )
#     )

fig.update_layout(
    title="Exponential growth of infected population (cumulative sum)",
    xaxis_title="Number of periods",
    yaxis_title="Number of infected",
)
fig.show()
fig.write_image("exp_growth.svg", width=700, height=400)

<IPython.core.display.Javascript object>

### Compartmental model

In [61]:
rho = 0.7  # social distancing rate (1 == no control; 0 == complete control)
beta = 1.75  # contact rate
teta = 0.5  # prob of being infected when in contact with somebody infectious
alpha = 0.2  # inverse of incubation period
gamma = 0.5  # inverse of infectious period after symptoms
zeta = 1 / 6  # inverse of infectious period before symptoms
epsilon = 0.02  # death rate (in case one is infected and without intensive care)
eta = 0.7  # rate of symptomatic disease
delta = 1 / 100  # immunity constant (Half a year of immunity)

t_max = 2 * 365  # total number of days
dt = 1  # timesteps
N = 100  # total population
I = 5  # Initially infected

SECIRD_vals = [1 - I / N, I / N, 0, 0, 0, 0, 0]  # normalized inputs
SECIRD_params = [alpha, beta, gamma, delta, epsilon, eta, zeta, rho, teta]
SECIRD_t = np.linspace(0, t_max, int(t_max / dt) + 1)


def SECIRD_model(init_vals, params, t):
    S_0, E_0, C_0, I_a_0, I_s_0, R_0, D_0 = init_vals
    test_sum = [S_0 + E_0 + C_0 + I_a_0 + I_s_0 + R_0 + D_0]
    S, E, C, I_a, I_s, R, D = [S_0], [E_0], [C_0], [I_a_0], [I_s_0], [R_0], [D_0]
    alpha, beta, gamma, delta, epsilon, eta, zeta, rho, teta = params
    beta = rho * beta
    dt = t[1] - t[0]
    for _ in t[1:]:
        #         print(R[-1])
        next_S = (
            S[-1]
            - beta * teta * S[-1] * (I_a[-1] + I_s[-1] + C[-1]) * dt
            + delta * R[-1] * dt
        )
        next_E = (
            E[-1]
            + (beta * teta * S[-1] * (I_a[-1] + I_s[-1] + C[-1]) - alpha * E[-1]) * dt
        )
        next_C = C[-1] + (alpha * E[-1] - zeta * C[-1]) * dt
        next_I_a = I_a[-1] + (zeta * C[-1] * (1 - eta) - (gamma * I_a[-1])) * dt
        next_I_s = I_s[-1] + (zeta * C[-1] * (eta) - (gamma * I_s[-1])) * dt
        next_R = (
            R[-1] + (gamma * (1 - epsilon) * (I_a[-1] + I_s[-1]) - delta * R[-1]) * dt
        )
        next_D = D[-1] + (gamma * (epsilon) * (I_a[-1] + I_s[-1])) * dt
        next_test_sum = next_S + next_E + next_C + next_I_a + next_I_s + next_R + next_D

        S.append(next_S)
        E.append(next_E)
        C.append(next_C)
        I_a.append(next_I_a)
        I_s.append(next_I_s)
        R.append(next_R)
        D.append(next_D)
        test_sum.append(next_test_sum)

    return (
        pd.DataFrame(np.stack([S, E, C, I_a, I_s, R, D, test_sum]).T)
        .rename(
            columns={
                0: "Susceptibles",
                1: "Exposed",
                2: "Carrier",
                3: "Infective_asymptomatic",
                4: "Infective_symptomatic",
                5: "Recovered",
                6: "Dead",
                7: "test_sum",
            }
        )
        .reset_index()
    )

<IPython.core.display.Javascript object>

In [62]:
@interact_manual
def int_graph(
    rho=widgets.FloatSlider(
        min=0, max=1, step=0.01, value=1, description="social distancing rate"
    ),
    beta=widgets.FloatSlider(
        min=0, max=5, step=0.1, value=1.75, description="contact rate"
    ),
    teta=widgets.FloatSlider(
        min=0, max=1, step=0.1, value=1, description="prob of infection"
    ),
    alpha=widgets.FloatSlider(
        min=0, max=1, step=0.01, value=0.2, description="inv of incubation"
    ),
    gamma=widgets.FloatSlider(
        min=0,
        max=1,
        step=0.01,
        value=0.5,
        description="inv of infectious period after symptoms",
    ),
    zeta=widgets.FloatSlider(
        min=0,
        max=1,
        step=0.01,
        value=1 / 6,
        description="inv of infectious period before symptoms",
    ),
    epsilon=widgets.FloatSlider(
        min=0, max=1, step=0.01, value=0.02, description="death rate"
    ),
    eta=widgets.FloatSlider(
        min=0, max=1, step=0.01, value=0.7, description="rate of symptomatic disease"
    ),
    delta=widgets.FloatSlider(
        min=0, max=1, step=0.01, value=0.01, description="immunity constant"
    ),
    t_max=widgets.FloatSlider(
        min=1, max=500, step=1, value=100, description="length of observation"
    ),
    dt=widgets.FloatSlider(
        min=0.0, max=10, step=0.1, value=0.1, description="time steps"
    ),
):
    #     t_max = 200  # total number of days
    #     dt = 0.1  # timesteps
    N = 82000000  # total population
    I = 1  # Initially infected

    SECIRD_vals = [1 - I / N, I / N, 0, 0, 0, 0, 0]  # normalized inputs
    SECIRD_params = [alpha, beta, gamma, delta, epsilon, eta, zeta, rho, teta]
    SECIRD_t = np.linspace(0, t_max, int(t_max / dt) + 1)
    SECIRD_results = SECIRD_model(SECIRD_vals, SECIRD_params, SECIRD_t)

    fig = go.Figure()
    for col in SECIRD_results.columns:
        if (
            col
            != "index"
            #                     and
            #                     (col != "test_sum")
        ):
            fig.add_trace(
                go.Scatter(
                    x=SECIRD_results["index"],
                    y=SECIRD_results[col],
                    mode="lines",
                    name=col,
                    line_shape="spline",
                )
            )
    fig.show()

interactive(children=(FloatSlider(value=1.0, description='social distancing rate', max=1.0, step=0.01), FloatS…

<IPython.core.display.Javascript object>

### ECDC Data graphs

In [63]:
df_ECDC = pd.read_csv("https://opendata.ecdc.europa.eu/covid19/casedistribution/csv")

<IPython.core.display.Javascript object>

In [65]:
df_ECDC["dateRep"] = pd.to_datetime(df_ECDC["dateRep"], infer_datetime_format=True)
df_ECDC["cases_cum"] = (
    df_ECDC.sort_values("dateRep").groupby("countriesAndTerritories")["cases"].cumsum()
)
df_ECDC["deaths_cum"] = (
    df_ECDC.sort_values("dateRep").groupby("countriesAndTerritories")["deaths"].cumsum()
)

<IPython.core.display.Javascript object>

In [140]:
fig_CFR = px.scatter(
    df_ECDC,
    x="cases_cum",
    y="deaths_cum",
    color="countriesAndTerritories",
    log_x=True,
    log_y=True,
)
fig_CFR.show()

<IPython.core.display.Javascript object>

In [283]:
df_ECDC_pv = df_ECDC.pivot_table(
    df_ECDC, columns="countriesAndTerritories", aggfunc="max"
).transpose()

<IPython.core.display.Javascript object>

Graph based on https://ourworldindata.org/covid-mortality-risk

In [284]:
fig_CFR_pv = px.scatter(
    df_ECDC_pv[df_ECDC_pv["continentExp"] != "Other"].reset_index(),
    x="cases_cum",
    y="deaths_cum",
    color="continentExp",
    log_x=True,
    log_y=True,
    hover_data=["countriesAndTerritories"],
    title="Cumulative cases and deaths in different countries",
)
v = [0.1, 0.025, 0.00625, 0.0015625]
diags = []
for i in v:
    diags.append(1 / i)
for diag in diags:
    fig_CFR_pv.add_trace(
        go.Scatter(
            x=[df_ECDC_pv["cases_cum"].min() * diag, df_ECDC_pv["cases_cum"].max()],
            y=[df_ECDC_pv["cases_cum"].min(), df_ECDC_pv["cases_cum"].max() / diag],
            mode="lines",
            name="CFR of {}%".format(round((1 / diag * 100), 3)),
            line_shape="spline",
            line=dict(width=1, dash="dot",),
            legendgroup="cfr",
        )
    )
fig_CFR_pv.update_layout(legend_title_text="")
fig_CFR_pv.update_xaxes(
    nticks=6, title_text="Log of cumulative confirmed cases",
)
fig_CFR_pv.update_yaxes(
    nticks=6, title_text="Log of cumulative confirmed deaths",
)
fig_CFR_pv.show()
fig_CFR_pv.write_image("cases_deaths_ecdc.svg", width=700, height=400)

<IPython.core.display.Javascript object>

### Baye's rate neglect und Testing fallacies

Testing is one of our main sources of information of Corona. It helps both in determining how big the infected population is and in finding out which patients with which symptoms are connected to the epidemic. 
When testing for a new sickness, there are two different doctrines with fundamentally different aims and outcomes. The first being "reactive" testing. This is often employed during the early development of an outbreak, when few tests are available and medical ressources in general are not yet fully moblized. This approach means that mainly people who present typical symptoms and the people who came in contact with them will be tested. While it can help prioritize resources to areas with clusters of infection and help in early studies on symptoms, this approach has many downsides, which will be discussed later on.
The other doctrine is "representative, random" testing. It requires a big number of tests and is therefor only available later during the epidemic. The aim of this approach is to test a representative, random sample of the population for the disease. This can be especially helpful in finding clusters of disease where symptoms have not yet shown and learn more about the broader distribution of symptoms and the disease in the whole population.

To inform our discussion of these doctrines and their advantages and disadvantages we will go through a number simplified mathematical models during the following paragraphs and discuss their outcomes.

In [56]:
RKI_tests = [
    [10, 7115, 28],
    [11, 31010, 93],
    [12, 64725, 111],
    [13, 103515, 113],
    [14, 116655, 132],
    [15, 123304, 112],
    [16, 136064, 126],
    [17, 141815, 133],
]
"https://de.statista.com/statistik/daten/studie/1110951/umfrage/testkapazitaeten-fuer-das-coronavirus-covid-19-in-deutschland/abs"

'https://de.statista.com/statistik/daten/studie/1110951/umfrage/testkapazitaeten-fuer-das-coronavirus-covid-19-in-deutschland/abs'

<IPython.core.display.Javascript object>

In [279]:
def testing_errors_model(init_values, change_rates, periods):
    """
    inputs:
    initial values for outputs, rates    
    outputs:
    tot_population, inf_population, test_pos_population, test_neg_population, rem_population, implied_inf_population, period
    """
    (
        sus_pop_0,
        inf_pop_0,
        rem_pop_0,
        test_pos_pop_0,
        test_neg_pop_0,
        implied_inf_pop_0,
    ) = init_values
    (sus_pop, inf_pop, rem_pop, test_pos_pop, test_neg_pop, implied_inf_pop,) = (
        [sus_pop_0],
        [inf_pop_0],
        [rem_pop_0],
        [test_pos_pop_0],
        [test_neg_pop_0],
        [implied_inf_pop_0],
    )
    tot_pop = [(sus_pop_0 + inf_pop_0 + rem_pop_0)]
    r_contact, r_prob_inf, r_removed, r_tests = change_rates
    dt = periods[1] - periods[0]
    for _ in periods[1:]:
        n_tot_pop = sus_pop[-1] + inf_pop[-1] + rem_pop[-1]
        n_sus_pop = (
            sus_pop[-1] - (r_contact * r_prob_inf * sus_pop[-1] * inf_pop[-1]) * dt
        )
        n_inf_pop = (
            inf_pop[-1]
            + (r_contact * r_prob_inf * sus_pop[-1] * inf_pop[-1]) * dt
            - (inf_pop[-1] * r_removed) * dt
        )
        n_rem_pop = rem_pop[-1] + (inf_pop[-1] * r_removed) * dt
        #### end of ode_1
        n_test_pos_pop = test_pos_pop[-1] + n_inf_pop * r_tests * dt
        n_test_neg_pop = (
            test_neg_pop[-1] + n_sus_pop * r_tests * dt + n_rem_pop * r_tests * dt
        )
        n_implied_inf_pop = +n_test_pos_pop / (n_test_neg_pop + n_test_pos_pop)
        ### update of lists
        tot_pop.append(n_tot_pop)
        sus_pop.append(n_sus_pop)
        inf_pop.append(n_inf_pop)
        test_pos_pop.append(n_test_pos_pop)
        test_neg_pop.append(n_test_neg_pop)
        rem_pop.append(n_rem_pop)
        implied_inf_pop.append(n_implied_inf_pop)
    return (
        pd.DataFrame(
            np.stack(
                [
                    tot_pop,
                    sus_pop,
                    inf_pop,
                    rem_pop,
                    test_pos_pop,
                    test_neg_pop,
                    implied_inf_pop,
                ]
            ).T
        ).rename(
            columns={
                0: "Total Population",
                1: "Susceptible",
                2: "Infected",
                3: "Removed",
                4: "Tested positive",
                5: "Tested negative",
                6: "Implied infected",
            }
        )
        #         .reset_index()
    )

<IPython.core.display.Javascript object>

In [282]:
tot_pop = 82000000
inf_pop = 1
init_values = [1 - inf_pop / tot_pop, inf_pop/tot_pop, 0, 0, 0, 0]

r_contact = 1.75
r_prob_inf = 0.2
r_removed = 1/31
# r_tests=0.1
r_tests = (141815/7)/82000000

# r_contact =  1.75
# r_prob_inf = 0.01
# r_removed = 0
# r_tests = 0.001

change_rates = [r_contact, r_prob_inf,r_removed, r_tests]

t_max =365
dt = 1
periods = np.linspace(0, t_max, int(t_max / dt) + 1)

testing_errors_model_results = testing_errors_model(init_values, change_rates, periods)

fig = go.Figure()
for col in testing_errors_model_results.reset_index().columns:
    if (col in ["Tested positive", "Tested negative", "Implied infected"]):
        fig.add_trace(
            go.Scatter(
                x=testing_errors_model_results.reset_index()["index"],
                y=testing_errors_model_results[col],
                mode="lines",
                name=col,
                legendgroup="secondary testing",
                line=dict(dash="dash")
#                 line_shape="spline",
            )
        )
    elif (col != "index"):
        fig.add_trace(
            go.Scatter(
                x=testing_errors_model_results.reset_index()["index"],
                y=testing_errors_model_results[col],
                mode="lines",
                name=col,
                legendgroup="primary ode",
#                 line=dict(dash="dash")
#                 line_shape="spline",
            )
        )
        
fig.show()
# testing_errors_model_results


<IPython.core.display.Javascript object>