In [4]:
import networkx as nx
import ndlib.models.ModelConfig as mc
import ndlib.models.epidemics as ep
from ndlib.utils import multi_runs
import numpy as np
import torch
import scipy
import matplotlib.pyplot as plt
from ndlib.viz.mpl.TrendComparison import DiffusionTrendComparison
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (20,8)

class Model:
    '''Class for modeling pandemics. It's made to test parameters and how the simulations behaves given these parameters.'''
    def __init__(self,infection_probability ,latent_period ,removal_probability, N, iteration, epn, topology, plot,repeats,raw):
        # set model parameters #0.0025
        self.infection_probability = infection_probability #0.025
        self.removal_probability = removal_probability #0.01
        self.latent_period = latent_period #0.033333
        self.N = N
        self.edges_per_node = epn #0.015
        self.iteration = iteration
        self.topologies = topology
        self.trends = []
        self.what_to_plot = plot
        self.ranger = repeats
        self.raw = raw

    def build(self):
        '''After initializing, call build method to build model given the pre-initialized parameter'''
        self.sus,self.exp,self.inf,self.rem = (np.empty((self.iteration,self.ranger,len(self.topologies))) for i in range(4))
        for i,topology in enumerate(self.topologies):
            # Network topology
            if topology == 'CG':
                g = nx.generators.community.gaussian_random_partition_graph(n=self.N,s= 15,v= 10,p_in= self.edges_per_node/self.N*10,p_out=self.edges_per_node/self.N) #(self.N, cluster_size, cluster_std, intra_cluster_edge_prob, inter_cluster_edge_prob)
            elif topology == 'ER':
                g = nx.erdos_renyi_graph(self.N,self.edges_per_node/self.N)
            elif topology == 'WS':
                g = nx.watts_strogatz_graph(self.N,self.edges_per_node*2,self.edges_per_node/self.N*4)
            elif topology == 'BA':
                g = nx.barabasi_albert_graph(self.N, self.edges_per_node//2)
            else:
                raise AssertionError('topology not found.')
            # Model selection
            model = ep.SEIRModel(g)

            # Model Configuration
            cfg = mc.Configuration()
            cfg.add_model_parameter('alpha', self.latent_period)
            cfg.add_model_parameter('beta', self.infection_probability)
            cfg.add_model_parameter('gamma', self.removal_probability)
            cfg.add_model_parameter("fraction_infected", 1/self.N)
            model.set_initial_status(cfg)

        # Simulation execution
            self.trends.append(multi_runs(model, execution_number=self.ranger, iteration_number=self.iteration, infection_sets=None, nprocesses=8))
            
            for j,model in enumerate(self.trends[i]):
                self.sus[:,j,i] = np.array(model['trends']['node_count'][0])
                self.exp[:,j,i] = np.array(model['trends']['node_count'][2])
                self.inf[:,j,i] = np.array(model['trends']['node_count'][1])
                self.rem[:,j,i] = np.array(model['trends']['node_count'][3])

    def status(self):
        '''Print status of current model.'''
        for i,topology in enumerate(self.topologies):
            status = np.vstack((self.sus[-1,:,i].squeeze(),self.exp[-1,:,i].squeeze(),self.inf[-1,:,i].squeeze(),self.rem[-1,:,i].squeeze()))
            maxrem = np.max(status[3,:])
            status = np.mean(status, axis=1)
            status = np.append(status,maxrem)
            print(f'\nStatus for topology: {topology}')
            if self.raw == False:
                print(f'#Susceptible = {status[0]}\n#Exposed = {status[1]}\n#Infected = {status[2]}\n#Removed = {status[3]}\nmax # of removed = {status[4]}')
            else:
                return [status[0], status[1], status[2], status[3]]
        
    def plot(self):
        '''Show a graphical plot of the current model.'''
        linestyle = [str('-'), str('-.'), str('--'), str('o')]
        plt.xlabel('iterations')
        plt.ylabel('#nodes')
        plt.title(f'Diffusion plot of nodes status over iteration (averaged over {self.ranger} simulations)')
        for i,topology in enumerate(self.topologies):
            if self.what_to_plot[0] == True: plt.plot(np.mean(self.sus[:,:,i],axis=1).squeeze(), label=f'topology: {topology}, Susceptible',linestyle=linestyle[i] )
            if self.what_to_plot[1] == True: plt.plot(np.mean(self.exp[:,:,i],axis=1).squeeze(), label=f'topology: {topology}, Exposed',linestyle=linestyle[i] )
            if self.what_to_plot[2] == True: plt.plot(np.mean(self.inf[:,:,i],axis=1).squeeze(), label=f'topology: {topology}, Infected',linestyle=linestyle[i] )
            if self.what_to_plot[3] == True: plt.plot(np.mean(self.rem[:,:,i],axis=1).squeeze(), label=f'topology: {topology}, Removed',linestyle=linestyle[i] )
        plt.legend()
        plt.show()
        plt.savefig('plot.png')
    
    def interactive():
        '''Start interactive model builder using ndlib-rest.'''
        !rm -rf ndlib-rest
        !git clone https://github.com/GiulioRossetti/ndlib-rest.git > /dev/null
        %cd ndlib-rest/
        import subprocess as sub
        sub.call('./gunicorn.sh', shell=True)

In [None]:
#set model parameters #0.0025
infection_probability = 1/30 #0.025
removal_probability = 1/100 #0.01
latent_period = 1/30 #0.033333
N = 1000
edges_per_node = 6 #0.015
iteration = 200
topologies = ['ER'] #['CG','ER','WS','BA']
what_to_plot = [True, False, True, True] #[Susceptible, Exposed, Infected, Recovered]
repeats = 100
raw = True

model = Model(infection_probability,latent_period,removal_probability,
              N, iteration,edges_per_node, topologies, what_to_plot, repeats, raw)
model.build()
model.plot()
model.status()