Toy functions to simulate infection and recovery events given number of inf/rec events and starting states.

First, simulate forward and then reject sampling. 

In [2]:
## the basis: Gillespie algorithm for forward simulation....
def simulate_sir_gillespie(beta, gamma, N, t0, t1, S0, I0, rng=None):
    """Simulate CTMC SIR on [t0,t1] starting from (S0,I0) using Gillespie.
    Returns event_times (strictly inside (t0,t1)), event_types list 'inf'/'rem',
    and a list of states at each event (S,I) including start and end.
    Also returns full times array including t0 and t1 and states at those times.
    """
    if rng is None:
        rng = np.random.default_rng()
    t = float(t0)
    S = int(S0); I = int(I0); R = N - S - I
    times = [t]
    states = [(S,I,R)]
    event_times = []
    event_types = []
    while t < t1 and I > 0:
        a_inf = beta * S * I
        a_rem = gamma * I
        a0 = a_inf + a_rem
        if a0 <= 0:
            break
        dt = rng.exponential(1.0 / a0)
        t_next = t + dt
        if t_next > t1:
            break
        u = rng.random()
        if u < a_inf / a0:
            # infection
            S -= 1; I += 1
            event_types.append('inf')
        else:
            # removal
            I -= 1; R += 1
            event_types.append('rem')
        event_times.append(t_next)
        t = t_next
        times.append(t)
        states.append((S,I,R))
    # append final time
    times.append(float(t1)); states.append((S,I,R))
    return np.array(times), event_times, event_types, states

In [5]:
def sample_event_times_given_counts(beta, gamma, N, t0, t1, S0, I0, Ninf_target, Nrem_target, max_tries=200, rng=None):
    """Attempt to sample an exact CTMC path on [t0,t1] with counts equal to targets.

    Method: rejection sampling — simulate Gillespie paths and accept ones whose
    counts match the targets. This is exact but may be inefficient if acceptance probability low.

    Returns a dict with keys: 'times', 'event_times', 'event_types', 'states', 'Ninf', 'Nrem', 'T_SI', 'T_I'
    or None if sampling failed after max_tries.
    """
    if rng is None:
        rng = np.random.default_rng()
    for attempt in range(max_tries):
        times, event_times, event_types, states = simulate_sir_gillespie(beta, gamma, N, t0, t1, S0, I0, rng=rng)
        # count events inside interval
        Ninf = sum(1 for et in event_types if et == 'inf')
        Nrem = sum(1 for et in event_types if et == 'rem')
        if Ninf == Ninf_target and Nrem == Nrem_target:
            # compute exact integrals T_SI and T_I
            T_SI = 0.0; T_I = 0.0
            # states array includes states at event times and the final state
            # times array aligns with states
            for j in range(len(times)-1):
                dt = float(times[j+1] - times[j])
                S_j, I_j, _ = states[j]
                T_SI += dt * (S_j * I_j)
                T_I += dt * I_j
            return {
                'times': times,
                'event_times': event_times,
                'event_types': event_types,
                'states': states,
                'Ninf': Ninf,
                'Nrem': Nrem,
                'T_SI': T_SI,
                'T_I': T_I,
                'attempts': attempt+1
            }
    return None

A small working example of this: 

In [16]:
rng = np.random.default_rng(42)
N=50
S0=45
I0=5
tmax=4.0
beta=0.02
gamma=0.5

# Example run
res1 = sample_event_times_given_counts(beta,gamma,N,0.0,tmax,S0,I0,Ninf_target=18,Nrem_target=17,max_tries=500,rng=rng)

print('Rejection sampler example:')

if res1: print('T_SI:',res1['T_SI'],'T_I:',res1['T_I'],'Ninf:',res1['Ninf'],'Nrem:',res1['Nrem'])

Rejection sampler example:
T_SI: 987.3500582106637 T_I: 29.24265554552101 Ninf: 18 Nrem: 17


2. Another function that uses uniformization and forward filtering and backward sampling (FFBS) technique

It works, roughly speaking, in the following steps: 

1. Construct the full state space `[(S,I)]` for `0 ≤ I ≤ N`, `0 ≤ S ≤ N-I`.
2. Compute the **uniformization rate** `Omega = 1.05 * max_total_exit_rate`.
3. Sample a Poisson number `M ~ Poisson(Omega * (t1-t0))` of uniformized jump times within the interval.
4. Construct a dense transition matrix `P` for the uniformized chain.
5. **Forward filtering:** propagate probabilities across the uniformized times starting from `start_state`.
6. **Backward sampling:** sample a trajectory consistent with the filtered distributions.
7. Compute trajectory statistics:
   - `Ninf` = number of infections
   - `Nrem` = number of recoveries
   - `T_SI` = integral of `S(t)*I(t)` over the interval
   - `T_I` = integral of `I(t)` over the interval


In [14]:
def uniformize_and_ffbs_prior(t0, t1, start_state, N, beta, gamma, rng=None):
    """Draw a path on [t0,t1] from the CTMC prior conditional on start_state.
    Returns a dict with sampled_states (list of (S,I)), times array including t0,t1,
    Ninf, Nrem, T_SI, T_I, Omega, M
    """
    if rng is None:
        rng = np.random.default_rng()
    Delta = float(t1 - t0)
    if Delta < 0:
        raise ValueError('t1 must be >= t0')
    states = [(S, I) for I in range(0, N+1) for S in range(0, N - I + 1)]
    idx = {s:i for i,s in enumerate(states)}
    Ssize = len(states)
    # Omega
    maxq = 0.0
    for S,I in states:
        q = beta * S * I + gamma * I
        if q > maxq: maxq = q
    Omega = max(1e-8, maxq * 1.05)
    mean_M = Omega * Delta
    M = int(rng.poisson(mean_M)) if mean_M > 0 else 0
    taus = np.sort(rng.uniform(t0, t1, size=M)) if M > 0 else np.array([])
    times = np.concatenate(([float(t0)], taus.astype(float), [float(t1)]))
    L = M + 1
    # dense P
    P = np.zeros((Ssize, Ssize))
    for i,(S,I) in enumerate(states):
        q_inf = beta * S * I
        q_rem = gamma * I
        q_sum = q_inf + q_rem
        p_stay = max(0.0, 1.0 - q_sum / Omega)
        P[i,i] = p_stay
        if S>=1:
            s2 = (S-1, I+1)
            if s2 in idx: P[i, idx[s2]] = q_inf / Omega
        if I>=1:
            s2 = (S, I-1)
            if s2 in idx: P[i, idx[s2]] = q_rem / Omega
    # forward filter
    alpha = np.zeros(Ssize, dtype=float)
    if start_state not in idx:
        raise ValueError('start_state not feasible')
    alpha[idx[start_state]] = 1.0
    filtered = [alpha.copy()]
    for j in range(1, L+1):
        alpha = alpha.dot(P)
        ssum = alpha.sum()
        if ssum <= 0.0:
            alpha = np.ones_like(alpha) * 1e-12
            ssum = alpha.sum()
        alpha = alpha / ssum
        filtered.append(alpha.copy())
    # backward sample
    X_idx = [None]*(L+1)
    probs_final = filtered[L] / filtered[L].sum()
    X_idx[L] = int(rng.choice(len(probs_final), p=probs_final))
    for j in range(L-1, -1, -1):
        col = P[:, X_idx[j+1]]
        wt = filtered[j] * col
        ssum = wt.sum()
        if ssum <= 0.0:
            wt = np.ones_like(wt) / len(wt)
        else:
            wt = wt / ssum
        X_idx[j] = int(rng.choice(len(wt), p=wt))
    sampled_states = [states[i] for i in X_idx]
    # compute exact integrals and counts
    deltas = np.diff(times)
    Ninf = 0; Nrem = 0; T_SI = 0.0; T_I = 0.0
    for j in range(1, L+1):
        S_prev, I_prev = sampled_states[j-1]
        S_cur, I_cur = sampled_states[j]
        dt = float(deltas[j-1])
        T_SI += dt * (S_prev * I_prev)
        T_I += dt * I_prev
        if (S_cur == S_prev - 1) and (I_cur == I_prev + 1):
            Ninf += 1
        elif (S_cur == S_prev) and (I_cur == I_prev - 1):
            Nrem += 1
    return {
        'times': times,
        'sampled_states': sampled_states,
        'Ninf': Ninf,
        'Nrem': Nrem,
        'T_SI': T_SI,
        'T_I': T_I,
        'Omega': Omega,
        'M': M
    }


In [15]:
res2 = uniformize_and_ffbs_prior(0.0,tmax,(S0,I0),N,beta,gamma,rng=rng)
print('Uniformization+FFBS example:')
print('T_SI:',res2['T_SI'],'T_I:',res2['T_I'],'Ninf:',res2['Ninf'],'Nrem:',res2['Nrem'])

Uniformization+FFBS example:
T_SI: 814.4212461522379 T_I: 22.830778812622604 Ninf: 18 Nrem: 17
