# implementation of get_probabilities_of_infection from this snippet:
Zakł. że $\lambda_i \in [0.05 - 0.15]$
$\lambda_{0, i} = 0.1$
1. Dla każdego index case losuję domek. Powtarzam 10000 razy.
2. Startujemy od $\lambda_{i, 0}$, $i = 1,..5$
3. Liczymy prawdopobieństwa get_probabilities_of_infection dla $\lambda_{i, j}$
    1. dla poprzedniej wersji: -> macierz trójkątna K x K
    2. dla nowej wersji: -> ~~macierz $(K\times 5)\times {K+4\choose{4}} \times 5$.~~ Funkcja która zwraca tupla z 5 prawdopodobieństwami dla zadanych parametrów na wejściu. Jeżeli jakaś grupa jest nieobecna w wylosowanej kombinacji grup wiekowych to oznaczana jest przez NaN.
    (lambda_1, ..., lambda_5, ~~grupa wiekowa index case'a~~, k (rozmiar domku), n_1, ..., n_5) -> (p_1, .., p_5)
    n_i - liczba osób w i-tej grupie wiekowej w domku
4. Zakażanie w domku wg macierzy prawdopodobieństwa z poprzedniego punktu oraz bootstrapu z punktu 1.
5. Dla każdej grupy wiekowej sprawdzamy, czy średnia wylosowanych I/N w tej grupie wiekowej jest mniejsza / większa od obserwowanej I/N w tej grupie wiekowej i w zależności od tego odpowiednio zwiększamy / zmniejszamy lambdę tej grupy wiekowej.
6. Wróć do punktu 3.
    1. Kryterium stopu: max iteracji = 6 został osiągnięty.


In [51]:
import numpy as np

class InfectionProbabilitiesCalculator:
    def __init__(self, repeats=1000, rng=None):
        self.precalculated_values = dict()
        if rng is None:
            self.rng = np.random.default_rng()
        else:
            self.rng = rng
        self.repeats = repeats

    def _calculate_infection_probabilities(self, lambdas, index_age_group, household_size_age_grouped):
        
        probabilities = np.zeros(5)
        for i in range(5):
            if household_size_age_grouped[i] == 0:
                probabilities[i] = None
        k = np.sum(household_size_age_grouped) # secondary household members
        for repeat in np.arange(self.repeats):
            
            # 1. preprocess and setup the job
            simulated = (0, 0, 0, 0, 0)
            infected = np.zeros(k).astype(bool)
            sampled = np.zeros(k).astype(bool)
            age_groups = np.zeros(k)
            start = 0
            for i, how_many in enumerate(household_size_age_grouped):
                stop = start + how_many
                age_groups[start:stop] = i
                start += stop - start
            
            # 2. zero iteration - infect from source
            for i in range(5):
                group_size = household_size_age_grouped[i]
                if group_size == 0:
                    continue
                age_group_probs = np.zeros_like(age_groups)
                age_group_probs[age_groups == i] = 1/group_size
                index_age_group_infections = rng.binomial(group_size, lambdas[i])
                choices = rng.choice(k, index_age_group_infections, replace=False, p=age_group_probs)
                infected[choices] = True
            #print(infected)
            # 3. remaining iterations - infect from infected
            while np.any(infected & ~sampled):
                arr = infected & ~sampled
                infecting_id = arr.nonzero()[0][0]
                sampled[infecting_id] = True
                
                age_group_of_infecting = age_groups[infecting_id]
                for i in range(5):
                    group_size = household_size_age_grouped[i]
                    if age_group_of_infecting == i:
                        group_size -= 1
                    if group_size == 0:
                        continue
                    age_group_probs = np.zeros_like(age_groups)
                    age_group_probs[age_groups == i] = 1/group_size
                    age_group_probs[infecting_id] = 0
                    
                    index_age_group_infections = rng.binomial(group_size, lambdas[i])
                    choices = rng.choice(k, index_age_group_infections, replace=False, p=age_group_probs)
                    infected[choices] = True
            # 4. update probabilities
            for i in range(5):
                if household_size_age_grouped[i] > 0:
                    age_group_infected = sum(infected[age_groups == i])
                    current_simulated_value = age_group_infected / household_size_age_grouped[i]
                    # online average
                    probabilities[i] = (repeat * probabilities[i] + current_simulated_value) / (repeat + 1)

        return probabilities
    
    def get_probabilities_of_infection(self, lambda1, lambda2, lambda3, lambda4, lambda5, index_age_group, 
                                       household_size_age_group1, household_size_age_group2, 
                                       household_size_age_group3, household_size_age_group4, 
                                       household_size_age_group5):

        dict_key = ((lambda1, lambda2, lambda3, lambda4, lambda5), index_age_group,
                   (household_size_age_group1, household_size_age_group2, household_size_age_group3,
                   household_size_age_group4, household_size_age_group5))

        if dict_key not in self.precalculated_values:
            self.precalculated_values[dict_key] = self._calculate_infection_probabilities((lambda1, lambda2, 
                                                                                          lambda3, lambda4, 
                                                                                          lambda5), index_age_group, 
                                                                                          (household_size_age_group1,
                                                                                          household_size_age_group2, 
                                                                                          household_size_age_group3, 
                                                                                          household_size_age_group4, 
                                                                                          household_size_age_group5))
        return self.precalculated_values[dict_key]
    


In [60]:
import time

In [63]:
# test 1 - should infect only 0 group
calculator = InfectionProbabilitiesCalculator()
lambda1 = 0.10
lambda2 = 0.00
lambda3 = 0.00
lambda4 = 0.00
lambda5 = 0.00
index_age_group = 2
household_size_age_group1 = 3
household_size_age_group2 = 0
household_size_age_group3 = 4
household_size_age_group4 = 1
household_size_age_group5 = 5
start = time.time()
ret = calculator.get_probabilities_of_infection(lambda1, lambda2, lambda3, lambda4, lambda5, index_age_group, 
                                                household_size_age_group1, household_size_age_group2, 
                                                household_size_age_group3, household_size_age_group4, 
                                                household_size_age_group5)
end = time.time()
print(f'returned: {ret} - elapsed time: {end - start} s')

returned: [0.11966667        nan 0.         0.         0.        ] - elapsed time: 0.31027984619140625 s


In [64]:
# test 2 - should return result immediately 
start = time.time()
ret = calculator.get_probabilities_of_infection(lambda1, lambda2, lambda3, lambda4, lambda5, index_age_group, 
                                                household_size_age_group1, household_size_age_group2, 
                                                household_size_age_group3, household_size_age_group4, 
                                                household_size_age_group5)
end = time.time()
print(f'returned: {ret} - elapsed time: {end - start} s')

returned: [0.11966667        nan 0.         0.         0.        ] - elapsed time: 9.703636169433594e-05 s


In [65]:
# test 3 - should infect almost same numbers of each age group
calculator = InfectionProbabilitiesCalculator()
lambda1 = 0.10
lambda2 = 0.10
lambda3 = 0.10
lambda4 = 0.10
lambda5 = 0.10
index_age_group = 0
household_size_age_group1 = 3
household_size_age_group2 = 3
household_size_age_group3 = 3
household_size_age_group4 = 3
household_size_age_group5 = 3
start = time.time()
ret = calculator.get_probabilities_of_infection(lambda1, lambda2, lambda3, lambda4, lambda5, index_age_group, 
                                                household_size_age_group1, household_size_age_group2, 
                                                household_size_age_group3, household_size_age_group4, 
                                                household_size_age_group5)
end = time.time()
print(f'returned: {ret} - elapsed time: {end - start} s')

returned: [0.44833333 0.44266667 0.444      0.43866667 0.44133333] - elapsed time: 2.0086779594421387 s


In [66]:
# test 4 - should infect all
calculator = InfectionProbabilitiesCalculator()
lambda1 = 1.0
lambda2 = 1.0
lambda3 = 1.0
lambda4 = 1.0
lambda5 = 1.0
index_age_group = 0
household_size_age_group1 = 3
household_size_age_group2 = 3
household_size_age_group3 = 3
household_size_age_group4 = 3
household_size_age_group5 = 3
start = time.time()
ret = calculator.get_probabilities_of_infection(lambda1, lambda2, lambda3, lambda4, lambda5, index_age_group, 
                                                household_size_age_group1, household_size_age_group2, 
                                                household_size_age_group3, household_size_age_group4, 
                                                household_size_age_group5)
end = time.time()
print(f'returned: {ret} - elapsed time: {end - start} s')

returned: [1. 1. 1. 1. 1.] - elapsed time: 7.062686920166016 s
