In [10]:
import numpy as np
import pandas as pd
import os
from datetime import timedelta, datetime

In [11]:
# =============================================================================
# 1. Load the target dataset. (COVID hospitalizations)
# =============================================================================
dataset_fname = "https://raw.githubusercontent.com/european-modelling-hubs/RespiCast-Covid19/refs/heads/main/target-data/latest-hospital_admissions.csv"
df = pd.read_csv(dataset_fname, low_memory=False, delimiter=',', header=0, encoding='ascii')


In [12]:
# =============================================================================
# 2. Determine the most recent date from the dataset.
# Set the start and end dates for data selection.
# =============================================================================
most_recent_date = df['truth_date'].max()
print('Most recent date:', most_recent_date)
start_date = '2024-09-15'
end_date = most_recent_date
start_date_model = '2024-09-15'

Most recent date: 2025-03-02


In [None]:
# =============================================================================
# 3. Define the stochastic SEIR model.
# This function simulates the SEIR dynamics with a stochastic evolution for beta.
# =============================================================================
def stochastic_seir(S0, E0, I0, R0, beta0, sigma, gamma, dt, num_steps, N):
    S_vals = np.zeros(num_steps + 1)
    E_vals = np.zeros(num_steps + 1)
    I_vals = np.zeros(num_steps + 1)
    R_vals = np.zeros(num_steps + 1)
    beta_vals = np.zeros(num_steps + 1)

    # Initialize compartments
    S_vals[0], E_vals[0], I_vals[0], R_vals[0] = S0, E0, I0, R0
    beta_vals[0] = beta0

    for t in range(1, num_steps + 1):
        # Random walk in log-space for beta
        log_beta = np.log(beta_vals[t - 1]) + np.random.normal(0, 0.01)
        beta = np.exp(log_beta)
        beta = np.clip(beta, 0.1, 1.0)
        beta_vals[t] = beta

        # SEIR equations
        dS = -beta * S_vals[t - 1] * I_vals[t - 1] / N * dt
        dE = (beta * S_vals[t - 1] * I_vals[t - 1] / N - sigma * E_vals[t - 1]) * dt
        dI = (sigma * E_vals[t - 1] - gamma * I_vals[t - 1]) * dt
        dR = gamma * I_vals[t - 1] * dt

        # Update compartments (non-negative values)
        S_vals[t] = max(S_vals[t - 1] + dS, 0)
        E_vals[t] = max(E_vals[t - 1] + dE, 0)
        I_vals[t] = max(I_vals[t - 1] + dI, 0)
        R_vals[t] = max(R_vals[t - 1] + dR, 0)

    return S_vals, E_vals, I_vals, R_vals, beta_vals

In [14]:
# =============================================================================
# 4. Define the simulation function for a given country.
# =============================================================================
def generate_simulations_for_country(country_name, N):
    # Filter the dataset for the given country and convert the date column
    country = df[df.location == country_name]
    filtered_country = country[(country['truth_date'] >= start_date) & (country['truth_date'] <= end_date)].copy()
    
    if filtered_country.empty:
        print(f"No data available for country {country_name}. Skipping.")
        return None, None

    filtered_country['truth_date'] = pd.to_datetime(filtered_country['truth_date'])
    filtered_country = filtered_country.sort_values(by='truth_date')


    I_data = filtered_country['value'].values
    if len(I_data) == 0:
        print(f"No observed values for country {country_name}. Skipping.")
        return None, None

    # Fixed model parameters
    p_h = 0.002 # Probability of hospitalization
    infectious_period = 2.5 # Average infectious period in days
    # Calculation of rate from I to R using fixed probability.
    gamma = (1 - p_h) / infectious_period # Recovery rate (I -> R)
    sigma = 1 / 1.5 # Incubation rate (E -> I)

    start_simulation_date = pd.to_datetime(start_date_model) - pd.Timedelta(weeks=1)
    extended_end_date = pd.to_datetime(end_date) + pd.DateOffset(weeks=6)
    num_days = (extended_end_date - start_simulation_date).days

    initial_I_data_absolute = I_data[0]
    all_simulations = []
    num_simulations = 1_000_000

       # Run all simulations and store each simulation's trajectory
    for i in range(num_simulations):
        initial_beta = np.random.uniform(0.1, 1.0)
        lower_bound = max(0.01, initial_I_data_absolute)
        upper_bound = initial_I_data_absolute
        E0 = np.random.uniform(lower_bound, upper_bound * 2)
        I0 = np.random.uniform(lower_bound, upper_bound * 2)
        R0 = np.random.uniform(0.1 * N, 0.3 * N)
        S0 = N - E0 - I0 - R0

        S_vals, E_vals, I_vals, R_vals, beta_vals = stochastic_seir(S0, E0, I0, R0, initial_beta, sigma, gamma, 1, num_days, N)
        hospitalization = np.diff(R_vals)
        hospitalization = np.insert(hospitalization, 0, hospitalization[0])
        
        # Align simulation output to the observed dates
        simulated_df = pd.DataFrame({'hospitalization': hospitalization},
                                      index=pd.date_range(start=start_simulation_date, end=extended_end_date))
        simulated_hospitalization = simulated_df.loc[filtered_country['truth_date']].values.flatten()
        all_simulations.append(simulated_hospitalization)

    simulations_array = np.array(all_simulations)
    return simulations_array, filtered_country

In [None]:
# =============================================================================
# 5. Main - Process all available countries.
# - Load population data from "population_data.csv" (columns: "Country_Code", "Population").
# - For each country present in the target data, run the simulation and save results in "simulations_data" folder.
# =============================================================================

if __name__ == '__main__':
    output_folder = "simulations_data"
    os.makedirs(output_folder, exist_ok=True)

    population_df = pd.read_csv('population_data.csv')
    population_dict = dict(zip(population_df['Country_Code'], population_df['Population']))

    unique_countries = sorted(df['location'].unique())
    print("Countries to process:", unique_countries)

    for country in unique_countries:
        N = population_dict.get(country)
        if N is None:
            print(f"Population not found for country {country}. Skipping.")
            continue
        print(f"Processing country {country} with population {N}.")
        simulations_array, filtered_country = generate_simulations_for_country(country, N)
        if simulations_array is None:
            continue
        # Save all simulation outputs for the country into the output folder
        np.save(os.path.join(output_folder, f'simulations_{country}.npy'), simulations_array)

Countries to process: ['AT', 'BE', 'BG', 'CY', 'CZ', 'DE', 'DK', 'EE', 'ES', 'FI', 'FR', 'GR', 'HU', 'IE', 'IS', 'IT', 'LI', 'LT', 'LU', 'LV', 'MT', 'NL', 'PL', 'PT', 'RO', 'SE', 'SI', 'SK']
Processing country AT with population 9118631.
No data available for country AT. Skipping.
Processing country BE with population 11744683.
No data available for country BE. Skipping.
Processing country BG with population 6741923.
No data available for country BG. Skipping.
Processing country CY with population 1362806.
Processing country CZ with population 10708981.
Population not found for country DE. Skipping.
Processing country DK with population 5818553.
No data available for country DK. Skipping.
Processing country EE with population 1326535.
No data available for country EE. Skipping.
Population not found for country ES. Skipping.
Processing country FI with population 5540720.
No data available for country FI. Skipping.
Processing country FR with population 65273511.
Processing country GR wit