In [259]:
import math
import numpy as np
import time
import ipywidgets as widgets
import pandas as pd
import matplotlib.pyplot as plt 
import ipywidgets as widgets
from ipywidgets import interactive
import os
import seaborn as sns
import glob
import moviepy.editor as mpy
from IPython.display import Image, clear_output

# Begin with example w/ a simple graph of a line, with `m` and `b` as parameters

In [287]:
def line_model(m, b):
    plt.figure(2)
    x = np.linspace(-10, 10, num=1000)
    plt.plot(x, m * x + b)
    plt.ylim(-5, 5)
    plt.show()

In [288]:
interactive_plot = interactive(line_model, m=(-2.0, 2.0), b=(-3, 3, 0.5))
output = interactive_plot.children[-1]
interactive_plot

interactive(children=(FloatSlider(value=0.0, description='m', max=2.0, min=-2.0), FloatSlider(value=0.0, descr…

In [289]:
def update_progress(progress):
    bar_length = 20
    if isinstance(progress, int):
        progress = float(progress)
    if not isinstance(progress, float):
        progress = 0
    if progress < 0:
        progress = 0
    if progress >= 1:
        progress = 1

    block = int(round(bar_length * progress))

    clear_output(wait = True)
    text = "Creating Plots: [{0}] {1:.1f}%".format( "#" * block + "-" * (bar_length - block), progress * 100)
    print(text)

In [268]:
np.random.seed(int(time.time()))

In [269]:
class person:
    def __init__(self, _id,  x, y, v, status, time_infected):
        self.x = x
        self.y = y
        self.v = v
        self.status = status
        self._id = _id 
        self.time_infected = time_infected
    
    def healthy(self):
        self.status = "naive"
    def infect(self):
        self.status = "infected"
    def recover(self):
        self.status = "immune"
        
    def step_x(self, dist):
        self.x += dist
    def step_y(self, dist):
        self.y += dist
        
    def random_walk(self):
        step_x = np.random.random() - 0.5
        step_y = np.random.random() - 0.5
        self.step_x(self.v * step_x)
        self.step_y(self.v * step_y)
    
    def periodic_conditions(self, grid_x, grid_y):
        if self.get_x() < 0:
            self.step_x(grid_x)
        if self.get_x() > grid_x:
            self.step_x(-grid_x)
        if self.get_y() < 0:
            self.step_y(grid_y)
        if self.get_y() > grid_y:
            self.step_y(-grid_y)
        
    def add_time_infected(self, time):
        self.time_infected += time
    
    def get_x(self):
        return self.x
    def get_y(self):
        return self.y
    def get_v(self):
        return self.v
    def get_status(self):
        return self.status
    def get_time_infected(self):
        return self.time_infected
    def _get_id(self):
        return self._id    

helper functions for populations

In [270]:
def healthy_people(list_of_people):
    """returns a list of healthy people"""
    healthy_people = []
    for person in list_of_people:
        if person.status == "naive":
            healthy_people.append(person)
    return healthy_people

def infected_people(list_of_people):
    """returns a list of infected people"""
    infected_people = []
    for person in list_of_people:
        if person.status == "infected":
            infected_people.append(person)
    return infected_people

def immune_people(list_of_people):
    """returns a list of immune people"""
    immune_people = []
    for person in list_of_people:
        if person.status == "immune":
            immune_people.append(person)
    return immune_people

make people

In [271]:
grid_size = 50
grid_x = grid_size
grid_y = grid_size
num_people = 200
print((grid_x * grid_y) / num_people)

12.5


In [272]:
def create_people(num_people, x_max, y_max, num_infected_start = 1):
    list_of_people = []
    infected_people = np.random.randint(0, num_people, num_infected_start)
    for num in range(0, num_people):
        x = np.random.random() * x_max
        y = np.random.random() * y_max
        v = abs(np.random.normal(loc=0.0, scale=2.5, size=None))
        list_of_people.append(person(num, x, y, v, "naive", 0))
    for infection in infected_people:
        list_of_people[infection].infect()
    return list_of_people

move people

In [273]:
def move_people(people, grid_x, grid_y):
    list_of_people = people.copy()
    for person in list_of_people:
        person.random_walk()
        person.periodic_conditions(grid_x, grid_y)
    return list_of_people

In [274]:
def append_population(healthy_population, infected_population, immune_population):
    total_population = []
    for person in healthy_population:
        total_population.append(person)
    for person in infected_population:
        total_population.append(person)
    for person in immune_population:
        total_population.append(person)
    return total_population

infections

In [275]:
def infect_population(people, infection_range = 3):
    list_of_people = people.copy()
    healthy_population = healthy_people(list_of_people)
    infected_population = infected_people(list_of_people)
    immune_population = immune_people(list_of_people)
    
    for person in healthy_population:
        for neighbor in infected_population:
            distance_between = calculate_distance(person.get_x(), neighbor.get_x(), person.get_y(), neighbor.get_y())
            infected = infection_event(distance_between, infection_range)
            if infected:
                person.infect()
                #print(f"infection: {distance_between}")
    
    #if len(infected_people(healthy_population)) > 0:
        #print(f"newly infected: {len(infected_people(healthy_population))}")
    
    return append_population(healthy_population, infected_population, immune_population)


In [276]:
def infected_to_immune(people, time_to_heal):
    list_of_people = people.copy()
    healthy_population = healthy_people(list_of_people)
    infected_population = infected_people(list_of_people)
    immune_population = immune_people(list_of_people)
    
    for person in infected_population:
        if person.get_time_infected() >= time_to_heal:
            person.recover()
        else:
            person.add_time_infected(1)
            
    return append_population(healthy_population, infected_population, immune_population)

In [277]:
def infection_event(distance_between, infection_range = 3):
    """return 1 if infection occurs, 0 if safe"""
    infected = 0
    if distance_between < infection_range:
        infected = 1
    return infected

In [278]:
def calculate_distance(x1, x2, y1, y2):
    return math.sqrt((x1 - x2)**2 + (y1 - y2)**2)

In [279]:
def collect_data(people):
    list_of_people = people.copy()
    healthy_population = healthy_people(list_of_people)
    infected_population = infected_people(list_of_people)
    immune_population = immune_people(list_of_people)
    
    
    num_healthy = len(healthy_population)
    num_infected = len(infected_population)
    num_immune = len(immune_population)
    
    x_positions = [person.get_x() for person in people]
    y_positions = [person.get_y() for person in people]
    status = [person.get_status() for person in people]
    return num_healthy, num_infected, num_immune, x_positions, y_positions, status

In [303]:
def run_model(infection_range, time_to_recover):
    global last_day
    print("Generating People")
    try:
        del people
    except:
        pass
    # create people
    people = create_people(num_people, grid_x, grid_y)
    # collect initial stats
    num_healthy, num_infected, num_immune, x_positions, y_positions, status = collect_data(people)
    df_infections = pd.DataFrame()
    row_infections = pd.DataFrame([pd.Series([0, num_healthy, num_infected, num_immune])])
    df_infections = pd.concat([row_infections, df_infections], ignore_index=True)
    
    df_positions_today = pd.DataFrame()
    df_positions_today["x_position"] = x_positions
    df_positions_today["y_position"] = y_positions
    df_positions_today["status"] = status
    df_positions_today["day"] = 0
    df_positions = df_positions_today.copy()
    
    print("Simulating Days")
    last_day = 1e4
    day = 1
    while (day != last_day) and (day < 365):
        people = move_people(people, grid_x, grid_y)
        people = infect_population(people, infection_range = np.sqrt(infection_range))
        people = infected_to_immune(people, time_to_recover)
        num_healthy, num_infected, num_immune, x_positions, y_positions, status = collect_data(people)
        row_infections = pd.DataFrame([pd.Series([day, num_healthy, num_infected, num_immune])])
        df_infections = pd.concat([row_infections, df_infections], ignore_index=True)
        df_positions_today["x_position"] = x_positions
        df_positions_today["y_position"] = y_positions
        df_positions_today["status"] = status
        df_positions_today["day"] = day
        df_positions = df_positions.append(df_positions_today)
        if num_healthy == 0 or num_infected == 0:
            last_day = day
        else: 
            day = day + 1
    print(f"{last_day} day(s) Simulated")
    df_infections = df_infections.rename({0: "days", 1: "healthy", 2: "infected", 3: "immune"}, axis='columns')
    return df_infections, df_positions, last_day

In [304]:
def make_plots(df_infections, df_positions, last_day):
    os.system("rm -rf plots")
    os.mkdir("plots")
    print("Generating Plots")
    for day in range(0, min(365, last_day)):
        sns.scatterplot(data = df_positions[df_positions["day"] == day], x = "x_position", y = "y_position", hue = "status", hue_order = ["naive", "infected", "immune"])
        frame1 = plt.gca()
        for xlabel_i in frame1.axes.get_xticklabels():
            xlabel_i.set_visible(False)
            xlabel_i.set_fontsize(0.0)
        for xlabel_i in frame1.axes.get_yticklabels():
            xlabel_i.set_fontsize(0.0)
            xlabel_i.set_visible(False)
        for tick in frame1.axes.get_xticklines():
            tick.set_visible(False)
        for tick in frame1.axes.get_yticklines():
            tick.set_visible(False)
        plt.xlabel("")
        plt.ylabel("")
        num_infected = df_infections[df_infections["days"] == day].infected.iloc[0]
        plt.title(f"Day: {day}")
        plt.legend(loc=1)
        plt.savefig(f"plots/day_{day}.png", dpi = 100)
        plt.savefig(f"plots/day_{day}.pdf")
        plt.clf()
        update_progress(day / min(365, last_day))

        
    gif_name = 'infections'
    fps = 8
    file_list = glob.glob('plots/*.png') # Get all the pngs in the current directory
    list.sort(file_list, key=lambda x: int(x.split('_')[1].split('.png')[0])) # Sort the images by #, this may need to be tweaked for your use case
    clip = mpy.ImageSequenceClip(file_list, fps=fps)
    clip.write_gif('{}.gif'.format(gif_name), fps=fps)
    
    sns.scatterplot(data = df_infections, x = "days", y = "infected", label = "infected")
    sns.scatterplot(data = df_infections, x = "days", y = "healthy", label = "naive")
    sns.scatterplot(data = df_infections, x = "days", y = "immune", label = "immune")
    plt.legend()
    plt.ylabel("People")
    plt.xlabel("Days")
    plt.savefig("infections_over_time.png")
    plt.show()
    
    return Image("infections.gif")

In [305]:
style = {'description_width': 'initial'}

In [306]:
def run(infection_range, time_to_recover):
    df_infections, df_positions, last_day = run_model(infection_range, time_to_recover)
    gif = make_plots(df_infections, df_positions, last_day)
    return gif

In [308]:
x = widgets.interact_manual(run, infection_range = widgets.IntSlider(min = 1, max = 50, step = 1, value = 10, description = "Infectivity", style=style), time_to_recover = widgets.IntSlider(min = 1, max = 28, step = 1, value = 14, description = "Recovery Time", style=style))

interactive(children=(IntSlider(value=10, description='Infectivity', max=50, min=1, style=SliderStyle(descript…

In [309]:
def show_day(Day):
    return Image(f"plots/day_{Day}.png")

In [312]:
widgets.interact(show_day, Day = (0,last_day - 1))

interactive(children=(IntSlider(value=27, description='Day', max=55), Output()), _dom_classes=('widget-interac…

<function __main__.show_day(Day)>