## Analysis File

For the functionality in this file are following libraries: <b>numpy, pickle, itertools, typing, collections </b> and <b>functools</b> required. Also self implemented functions from <b> rk4</b> and <b> awareness</b> are needed. 

This file is used to run and evaluate all simulations. This includes simulation for <b> rungekutta4, default SIR</b> and <b> information</b>. For this data has to be loaded from the saved files. The results of the simulations are saved in additional files. The files are stored in the <b> data</b>-folder. The naming pattern includes the corresponding function names and parameters. <br>
Detailed aspects and code-design decisions are being explained in the functions itself.


In [None]:
# library imports
import numpy as np
import pickle
from itertools import product
from typing import Union
from collections.abc import Callable
from functools import partial
import import_ipynb
from scipy.interpolate import interp1d

In [None]:
# local imports
from rk4 import rk4
from awareness import awareness_SIR, g_awareness
from input_output import str_name, save_data, load_data, parse_params

importing Jupyter notebook from rk4.ipynb


In [None]:
def awareness_param_names() -> list:
    """Returns the names for the awareness parameters to convert lists into dictionaries."""
    return ['alpha', 'omega', 'lam', 'rho', 'kappa']

In [None]:
def set_model_parameters(model, t, **kwargs) -> Callable:
    """
    Sets the model parameters which might be time dependent.

    If the parameters are time dependant a linear interpolation is used
    (because Runge-Kutta also interpolates between the time steps).
    Otherwise a function is generated which returnes a constant value.
    returns: the model with parameters set
    """
    for key, val in kwargs.items():
        if isinstance(val, np.ndarray):
            kwargs[key] = interp1d(t, val, kind='linear')
        else:
            def f(t, v):
                return v
            kwargs[key] = partial(f, v=val)  # partial necessary to freeze val into function
    new_model = partial(model, **kwargs)
    return new_model

In [None]:
def simulate_rk4(model: Callable[[float, np.ndarray], np.ndarray], y0: np.ndarray, params_dict: dict,\
    t: np.ndarray, fname='test_data.txt', save=True) -> None:
    """
    Simulates using the Runge-Kutta method and by default saves results.
    returns: time and y (grid with compartments over time)
    """
    print(params_dict)
    p_model = set_model_parameters(model, t, **params_dict)
    t, y = rk4(p_model, y0, t)
    data = (t, y)
    if save:
        save_data(data, fname)
    return t, y

In [None]:
def check_probability_conserved(y: np.ndarray) -> None:
    """Checks whether the sum of all compartments is one for each point in time."""
    print(f'probability conserved: {np.array([np.sum(y[i, j], axis=(0, 1)).all() for i in range(y.shape[0]) for j in range(y.shape[1])]).all()}')

In [None]:
def axis_product(params: list, default_params: list) -> list:
    """
    Determines combinations for parameters, varying one axis at a time.

    params: list containing lists for different parameter values
    default_params: default values to use when another axis is being varied
    returns: list containing different sets of parameters
    """
    combinations = []
    combinations.append(default_params)
    for i in range(len(default_params)):
        options = [params[j] if j == i else [default_params[j]] for j in range(len(default_params))]
        combinations += [x for x in product(*options)]
    combinations_filtered = []
    for comb in combinations:
        if comb not in combinations_filtered:
            combinations_filtered.append(comb)
    return combinations_filtered


In [None]:
def get_initial_condition(grid_size: int, info_compartments: int) -> np.ndarray:
    """
    Creates an array containing the initial conditions for the simulation.
    
    grid_size: number of individuals along one axis of the grid
    info_compartmenes: number of compartments for the awareness
    returns: numpy array with initial conditions for the simulation

    Every individual is assumed to be susceptible and without any awareness.
    Then, one individual, located in the middle of the grid, is moved to the infected compartment
    (while still keeping the uninformed state.)
    """
    y0 = np.zeros(shape=(grid_size, grid_size, 3, info_compartments))  # start parameters
    y0[:,:,0,-1] = 1  # everyone is susceptible, no awareness at the start

    mid = int((grid_size-1)/2)
    y0[mid, mid, 1, -1] = 1  # one infection in the middle
    y0[mid, mid, 0, -1] = 0  # remove infected from susceptible pool

    return y0

In [None]:
def run_simulation(infection_params: list, awareness_params: list, simulation_params: list, name: str) -> None:
    """
    Sets up and runs the simulation for the given parameters and saves it afterwards.

    infection_params: list of beta and gamma, modeling the disease itself
    awareness_params: list of alpha, omega, lambda, rho and kappa, modelling the information spread
    simulation_params: list with the grid size, the states of the compartements and the time t
    """

    # unpack parameters
    beta0, gamma0 = infection_params
    alpha0, omega0, lam0, rho0, kappa0 = awareness_params
    grid_size, info_compartments, t = simulation_params

    # setup model and initial conditions
    y0 = get_initial_condition(grid_size, info_compartments)
    model = set_model_parameters(awareness_SIR, t, beta=beta0, gamma=gamma0)
    
    # preparate
    params_dict = dict(zip(awareness_param_names(), awareness_params))
    filename = f'data/{name}_simulation___.' + str_name(awareness_param_names(), parse_params(awareness_params))
    
    # simulate
    simulate_rk4(model, y0, params_dict, t, fname=filename)

    # analyze 
    # t, y = load_data(filename)
    # data_av = np.average(y[:, :, :, :], axis=(0, 1)) # average over grid
    # awareness_level = g_awareness(rho0, data_av[0])  # everyone is susceptible, there are no infections
    # save_data((t, awareness_level), f'data/{name}_simulation_awareness.pkl')


In [None]:
def run_information_simulation(infection_params: list, awareness_params: list, simulation_params: list) -> None:
    """
    Simulate how information/awareness spreads through the network and decays over time
    when no infections are present.
    infection_params: list of beta and gamma, modeling the disease itself
    awareness_params: list of alpha, omega, lambda, rho and kappa, modelling the information spread
    simulation_params: list with the grid size, the states of the compartements and the time t
    """
    
    # unpack parameters
    beta0, gamma0 = infection_params
    alpha0, omega0, lam0, rho0, kappa0 = awareness_params
    grid_size, info_compartments, t = simulation_params
    
    # setup initial conditions
    y0 = np.zeros(shape=(grid_size, grid_size, 3, info_compartments))  # start parameters
    y0[:,:,0,-1] = 1  # everyone is susceptible, no awareness at the start

    mid = int((grid_size-1)/2)
    y0[mid, mid, 0, 0] = 1  # one aware dot in the middle
    y0[mid, mid, 0, -1] = 0  # remove unaware from susceptible pool

    # setup model
    model = set_model_parameters(awareness_SIR, t, beta=beta0, gamma=gamma0)

    # prepare simulation
    params_dict = dict(zip(awareness_param_names(), awareness_params))
    filename = 'data/information_only___' + str_name(awareness_param_names(), awareness_params)

    # simulate
    simulate_rk4(model, y0, params_dict, t, fname=filename)

    # analyze
    t, y = load_data(filename)
    data_av = np.average(y[:, :, :, :], axis=(0, 1)) # average over grid
    awareness_level = g_awareness(rho0, data_av[0])  # everyone is susceptible, there are no infections

    # save results
    save_data((t, awareness_level), 'data/awareness.pkl')


In [None]:
def run_sensitivity_sweep(infection_params: list, awareness_params: list,\
    awareness_params_variation: list, simulation_params: list, folder: str) -> None:
    """
    Varies one parameter at a time and records the maximum number of simulateous infections (sensitvity analysis).
    
    infection_params: list of beta and gamma, modeling the disease itself
    awareness_params: list of alpha, omega, lambda, rho and kappa, modelling the information spread
    awareness_params_variation: list with ranges in which the parameters shall be modeled
    simulation_params: list with the grid size, the states of the compartements and the time t

    """

    # unpack parameters
    beta0, gamma0 = infection_params
    alpha0, omega0, lam0, rho0, kappa0 = awareness_params
    alpha, omega, lam, rho, kappa = awareness_params_variation
    grid_size, info_compartments, t = simulation_params

    # setup model and initial conditions
    y0 = get_initial_condition(grid_size, info_compartments)
    model = model = set_model_parameters(awareness_SIR, t, beta=beta0, gamma=gamma0)
    sweep_params = axis_product(awareness_params_variation, awareness_params)

    # preparate
    sim_results = {}

    # simulate different parameter sets
    for i, param_set in enumerate(sweep_params):
        print(f'{i+1}/{len(sweep_params)}:   ', end='')  # print progress
        
        # preparate
        params_dict = dict(zip(awareness_param_names(), param_set))
        filename = f'{folder}/' + str_name(awareness_param_names(), param_set)

        # simulate
        t, y = simulate_rk4(model, y0, params_dict, t, fname=filename, save=False)

        # analyze
        infections = np.average(np.sum(y[:, :, 1, :], axis=2), axis=(0, 1))
        max_infections = np.max(infections)
        if max_infections == infections[-1]:
            print('WARNING: the peak of infections has not been reached, increase simulation time!')
        data = (param_set, [max_infections])
        sim_results[str(param_set)] = data

    # group results and save them

    # default params
    param_set = awareness_params
    filename = f'{folder}' + str_name(awareness_param_names(), param_set)
    data = sim_results[str(param_set)]
    save_data(data, f'{folder}/variation_default.pkl')

    # combine the various runs into useful chunks of parameter variation

    # iterate over different parameters to vary
    for i in range(len(awareness_params)):
        options = [awareness_params_variation[j] if j == i else [awareness_params[j]] for j in range(len(awareness_params))]
        combinations = [x for x in product(*options)]

        # place to store the results
        param_variation = []
        concurrent_infections = []

        # get data from each related run
        for param_set in combinations:
            filename = f'{folder}' + str_name(awareness_param_names(), param_set)

            params, max_infections = sim_results[str(param_set)]

            param_variation.append(param_set[i])
            concurrent_infections.append(max_infections[0])

        # save results
        data = (param_variation, concurrent_infections)
        save_data(data, f'{folder}/variation_{awareness_param_names()[i]}.pkl')

    print('done')



<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=ac0f8ce2-3132-47be-a4d1-6216636e93ff' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>