<a href="https://colab.research.google.com/github/hollberg/CovidPooledTesting/blob/main/COVID_Pooled_testing_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1>COVID Pooled Testing Modeling</h1>
<p>Model testing multiple people at once by merging samples of 'n' people in a single test.
Vary the total population of tested individuals, as well as the base infection rate and
size of merged samples.</p>

In [1]:
# Imports
import random
import pandas as pd
# import matplotlib.pyplot as plt
import plotly
import plotly.express as px

Define functions that will be used to generate populations/infections, and to run tests.


In [2]:
def infect_people(num_folks, pct_inf):
    people = {}
    for i in range(0, num_folks):
        people[i] = [0]

    num_inf = len(people) * pct_inf
    num_inf = round(num_inf, 0)
    num_inf = int(num_inf)
    for i in range(0, num_inf):
        people[i] = [1]

    return (people, pct_inf, population)


def run_test(people: dict, batch_size: int):
    people_list = [i for i in range(0, len(people))]
    random.shuffle(people_list)
    # print(f'People list: {people_list}')

    num_tests = round(len(people) / batch_size, 0)
    # print(f'Number of tests: {str(num_tests)}')
    positive_tests = 0
    for i in range(0, len(people_list), batch_size):
        current_batch = people_list[i:i + batch_size]
        # if i < 100:
        #   print(current_batch)
        sick_people = False
        for j in current_batch:
            person_status = people[j]
            if person_status == [1]:
                sick_people = True

        if sick_people == True:
            positive_tests = positive_tests + 1

    return (len(people), batch_size, positive_tests)


def test_efficiency(population, batch_size, postive_tests):
    initial_tests = population / batch_size
    retests = positive_tests * batch_size
    total_tests = initial_tests + retests
    tests_saved = population - total_tests
    return tests_saved

Define a population of people, the base infection rate, and then "infect" the appopriate number of people

In [3]:
population = 1000

base_infection_rates = [.001, .005, .01, .03, .05, .07, .09, .1, .25, .5, .75]
batch_sizes = range(2, 16)
trials = 100

outcomes = {}

for experiment in range(0, trials):
    for pct_inf in base_infection_rates:
        for batch_size in batch_sizes:
            people, pct_inf, population = infect_people(num_folks=population, pct_inf=pct_inf)
            _, _, positive_tests = run_test(people=people, batch_size=batch_size)
            saved_tests = test_efficiency(population=population, batch_size=batch_size, postive_tests=positive_tests)
            outcomes[(experiment, pct_inf, batch_size)] = population, pct_inf, batch_size, positive_tests, saved_tests
print(outcomes)

{(0, 0.001, 2): (1000, 0.001, 2, 1, 498.0), (0, 0.001, 3): (1000, 0.001, 3, 1, 663.6666666666667), (0, 0.001, 4): (1000, 0.001, 4, 1, 746.0), (0, 0.001, 5): (1000, 0.001, 5, 1, 795.0), (0, 0.001, 6): (1000, 0.001, 6, 1, 827.3333333333334), (0, 0.001, 7): (1000, 0.001, 7, 1, 850.1428571428571), (0, 0.001, 8): (1000, 0.001, 8, 1, 867.0), (0, 0.001, 9): (1000, 0.001, 9, 1, 879.8888888888889), (0, 0.001, 10): (1000, 0.001, 10, 1, 890.0), (0, 0.001, 11): (1000, 0.001, 11, 1, 898.0909090909091), (0, 0.001, 12): (1000, 0.001, 12, 1, 904.6666666666666), (0, 0.001, 13): (1000, 0.001, 13, 1, 910.0769230769231), (0, 0.001, 14): (1000, 0.001, 14, 1, 914.5714285714286), (0, 0.001, 15): (1000, 0.001, 15, 1, 918.3333333333334), (0, 0.005, 2): (1000, 0.005, 2, 5, 490.0), (0, 0.005, 3): (1000, 0.005, 3, 5, 651.6666666666667), (0, 0.005, 4): (1000, 0.005, 4, 5, 730.0), (0, 0.005, 5): (1000, 0.005, 5, 5, 775.0), (0, 0.005, 6): (1000, 0.005, 6, 5, 803.3333333333334), (0, 0.005, 7): (1000, 0.005, 7, 4, 829

Now *Outcomes* is a dictionary with keys (experiment #, % infected, Batch size)

In [4]:
summary_results = {}
for key, value in outcomes.items():
    _, pct_inf, batch_size = key
    _, _, _, _, saved_tests, = value

    if (pct_inf, batch_size) in summary_results:
        summary_results[(pct_inf, batch_size)] = summary_results[(pct_inf, batch_size)] + saved_tests
    else:
        summary_results[(pct_inf, batch_size)] = saved_tests

for key in summary_results.keys():
    summary_results[key] = summary_results[key] / trials
print(summary_results)

{(0.001, 2): 498.0, (0.001, 3): 663.6666666666658, (0.001, 4): 746.0, (0.001, 5): 795.0, (0.001, 6): 827.3333333333331, (0.001, 7): 850.1428571428562, (0.001, 8): 867.0, (0.001, 9): 879.88888888889, (0.001, 10): 890.0, (0.001, 11): 898.0909090909106, (0.001, 12): 904.6666666666673, (0.001, 13): 910.0769230769223, (0.001, 14): 914.5714285714298, (0.001, 15): 918.3333333333327, (0.005, 2): 490.0, (0.005, 3): 651.7266666666657, (0.005, 4): 730.24, (0.005, 5): 775.3, (0.005, 6): 803.4533333333333, (0.005, 7): 822.9828571428562, (0.005, 8): 835.72, (0.005, 9): 844.6088888888901, (0.005, 10): 850.9, (0.005, 11): 855.4109090909105, (0.005, 12): 858.346666666667, (0.005, 13): 859.7669230769222, (0.005, 14): 859.4114285714296, (0.005, 15): 860.583333333333, (0.01, 2): 480.04, (0.01, 3): 637.1166666666658, (0.01, 4): 710.52, (0.01, 5): 751.45, (0.01, 6): 775.3133333333334, (0.01, 7): 790.1528571428562, (0.01, 8): 797.24, (0.01, 9): 803.1188888888901, (0.01, 10): 804.0, (0.01, 11): 804.2609090909

Make a Pandas dataframe from our "Summary" dictionary

In [5]:
df_tests = pd.DataFrame.from_dict(summary_results, orient='index')

# print(f'1st iteration of dataframe \n{df_tests.head(5)}')


df_tests.reset_index(inplace=True)
# print(f'After reseting index of df: \n{df_tests.head(5)}')

# Split the new "index" column, which is a tuple of (pct_infected, batch size) into 2 distinct
df_tests[['Percent Infected', 'Batch Size']] = pd.DataFrame(df_tests['index'].tolist(), index=df_tests.index)
# print(f'after splitting df index into two new columns, "Percent Infected" and "Batch Size": \n{df_tests.head(5)}')

df_tests.rename(columns={0: 'Saved Tests'}, inplace=True)
df_tests.drop(columns=['index'], inplace=True)

# print(df_tests.head(5))

fig = px.line(df_tests, x="Batch Size", y="Saved Tests", color='Percent Infected')
fig.layout = {'title': f'Saved (Wasted) Test Kits by Test Batch Size vs. Base Infection Rate (per {population} tested)',
              "xaxis_title": 'Batch Size',
              'yaxis_title': 'Saved (Wasted) Test Kits'}
fig.show()