##### Modules and Libraries

In [None]:
# import necessary modules 
import matplotlib 
matplotlib.use('TkAgg') 
from pylab import *
import random as rd
import matplotlib.pyplot as plt
import pycxsimulator

#### Epidemic parameters

In [None]:
states = ["S", "I", "R"] ## Susceptible, Infected, Recovered/Removed

alpha = 0.3   # infection probability when in range
beta = 0.1 # recovery probability per time step
gamma = 0.07 ## if simple SIR model, gamma set to 0. (once recovered, not susceptible again)
isolation_freq = 50 #isolate step


### Evaluation metrics for comparison

In [None]:
def average_n(G):
    countl = []
    for i in list(G.nodes):
        count=0
        for j in G.neighbors(i):
            count+=1
        countl.append(count)
    return mean(countl)

def central_n(G):
    countl = []
    for i in list(G.nodes):
        count=0
        for j in G.neighbors(i):
            count+=1
        countl.append(count)
    return max(countl)

In [None]:
import pandas as pd

def peak_infection_time(df):
    """
    Calculate the time step of peak infection.
    """
    peak_time = df['I'].idxmax()  # Index of the maximum infected count
    return peak_time

def peak_infection_proportion(df, total_population):
    """
    Calculate the proportion of the population infected at the peak.
    """
    peak_infected = df['I'].max()
    return peak_infected / total_population

def total_attack_rate(df, total_population):
    """
    Calculate the total attack rate (TAR).
    """
    final_recovered = df['R'].iloc[-1]  # Total recovered at the last step
    return final_recovered / total_population

def basic_reproduction_number(df):
    """
    Approximate R0 based on the rate of new infections.
    """
    # Calculate changes in the number of infected over steps
    df['delta_infected'] = df['I'].diff().fillna(0)
    R0 = df['delta_infected'].max()
    return R0

def epidemic_duration(df):
    """
    Calculate the duration of the epidemic.
    """
    active_period = df[df['I'] > 0]
    return active_period.index.max() - active_period.index.min() + 1



In [None]:
import networkx as nx
import matplotlib.pyplot as plt
import random

# Create the graph
G = nx.erdos_renyi_graph(n=400, p=0.05)

# Initialize SIR states for each node
def initialize():
    for node in G.nodes:
        G.nodes[node]['state'] = 'S'  # Start all nodes as Susceptible
    initial_infected = random.choice(list(G.nodes))
    G.nodes[initial_infected]['state'] = 'I'  # Infect one random node

# Observe function to plot nodes with colors based on their SIR state
def observe():
    plt.clf()
    color_map = []
    for node in G.nodes:
        if G.nodes[node]['state'] == 'S':
            color_map.append('blue')  # Susceptible nodes are blue
        elif G.nodes[node]['state'] == 'I':
            color_map.append('red')   # Infected nodes are red
        elif G.nodes[node]['state'] == 'R':
            color_map.append('green') # Recovered nodes are green

    # Draw the network with node colors
    nx.draw(G, node_color=color_map, with_labels=False, node_size=50)
    plt.title("SIR Epidemic Simulation on a Network")
#     plt.show()

# Update function to change states based on SIR model rules
def update():
    new_states = {}

    for node in G.nodes:
        if G.nodes[node]['state'] == 'I':  # Currently infected nodes
            # Attempt to infect susceptible neighbors
            for neighbor in G.neighbors(node):
                if G.nodes[neighbor]['state'] == 'S' and random.random() < alpha:
                    new_states[neighbor] = 'I'  # Mark for infection

            # Attempt recovery
            if random.random() < beta:
                new_states[node] = 'R'  # Mark for recovery

        elif G.nodes[node]['state'] == 'R':  # Recovered nodes
            # Attempt to lose immunity and become susceptible again
            if random.random() < gamma:
                new_states[node] = 'S'  # Mark for susceptibility

    # Apply state changes after all updates are determined
    for node, new_state in new_states.items():
        G.nodes[node]['state'] = new_state
        
def isolate_reinstate():
    # First, sever edges between infected and non-infected nodes
    for edge in list(G.edges):
        u, v = edge
        if (G.nodes[u]['state'] == 'I' and G.nodes[v]['state'] != 'I') or (G.nodes[v]['state'] == 'I' and G.nodes[u]['state'] != 'I'):
            G.remove_edge(u, v)
#     random_infect = 
    # Reconnect edges where non-infected nodes (S or R) have non-infected neighbors (S or R) but no edge exists
    for node in G.nodes():
        if G.nodes[node]['state'] != 'I':  # Check only non-infected nodes
            for neighbor in G.neighbors(node):
                if G.nodes[neighbor]['state'] != 'I' and not G.has_edge(node, neighbor):  # Ensure neighbor is non-infected and edge doesn't exist
                    G.add_edge(node, neighbor)



In [None]:
print(f"Graph_properties:\nNo.of nodes: {len(G.nodes)}\nNo.of edges: {len(G.edges)}\nDensity: {len(G.nodes)/len(G.edges)}\nCentral node nieghbours: {central_n(G)}\nAverage neigbours: {average_n(G)}")

In [None]:
# Simulation example
pycxsimulator.GUI().start(func=[initialize, observe, update])

In [None]:
susceptible_counts = []
infected_counts = []
recovered_counts = []
time_steps = 0
sir_df = pd.DataFrame(columns=["S", "I", "R", "InfNodes"])
initialize()
plot_no = 1
fig, axs = plt.subplots(2, 2, figsize=(10, 8))
for step in range(1, 1000):
    susceptible = sum(1 for node in G.nodes if G.nodes[node]['state'] == 'S')
    infected = sum(1 for node in G.nodes if G.nodes[node]['state'] == 'I')
    recovered = sum(1 for node in G.nodes if G.nodes[node]['state'] == 'R')

    susceptible_counts.append(susceptible)
    infected_counts.append(infected)
    recovered_counts.append(recovered)
    infected_nodes = [node for node in G.nodes if G.nodes[node]['state'] == 'I']
    
    observe()
    update()
    sir_df.loc[step] = [susceptible, infected, recovered, infected_nodes]
    
    ## 
    if step%isolation_freq==0:
        isolate_reinstate()
        if alpha != 0.00:
            infected = random.choice(list(G.nodes))
            G.nodes[infected]['state'] = 'I'  
 
    if infected == 0:
        break



In [None]:
sir_df
plt.figure(figsize=(10, 6))
plt.plot(sir_df["S"], label="Susceptible", color="blue")
plt.plot(sir_df["I"], label="Infected", color="red")
plt.plot(sir_df["R"], label="Recovered", color="green")
plt.xlabel('Time steps')
plt.ylabel('Number of nodes')
plt.title('SIR Epidemic Simulation: Susceptible, Infected, and Recovered Over Time')
plt.legend()
plt.show()

In [None]:
total_population = 200
print("Peak Infection Time:", peak_infection_time(sir_df))
print("Peak Infection Proportion:", peak_infection_proportion(sir_df, total_population))
print("Total Attack Rate:", total_attack_rate(sir_df, total_population))
print("Basic Reproduction Number (R0):", basic_reproduction_number(sir_df))
print("Epidemic Duration:", epidemic_duration(sir_df))

### Real-world graph

In [None]:
import itertools
import random

def sub_graph(gml_file):
    G = nx.read_gml(gml_file)
    random.seed(42)
    sub_edges = random.sample(list(G.edges), 300)
    sub_nodes = {element for tuple_ in sub_edges for element in tuple_}
    g_sub = nx.subgraph(G, sub_nodes)
    return g_sub

In [None]:
G_frozen = sub_graph('nn/as-22july06.gml')
G = nx.Graph(G_frozen)

# Initialize SIR states for each node
def initialize():
    for node in G.nodes:
        G.nodes[node]['state'] = 'S'  # Start all nodes as Susceptible
#         random.seed(123)
    initial_infected = random.choice(list(G.nodes))
    G.nodes[initial_infected]['state'] = 'I'  # Infect one random node

# Observe function to plot nodes with colors based on their SIR state
def observe():
    plt.clf()
    color_map = []
    for node in G.nodes:
        if G.nodes[node]['state'] == 'S':
            color_map.append('blue')  # Susceptible nodes are blue
        elif G.nodes[node]['state'] == 'I':
            color_map.append('red')   # Infected nodes are red
        elif G.nodes[node]['state'] == 'R':
            color_map.append('green') # Recovered nodes are green

    # Draw the network with node colors
    nx.draw(G, node_color=color_map, with_labels=False, node_size=50)
    plt.title("SIR Epidemic Simulation on a Network")
#     plt.show()

## to document steps uncomment the next two lines 
#     if step%10==0:
#         plt.savefig(f'report/isolation/{step}.png')

# Update function to change states based on SIR model rules
def update():
    new_states = {}

    for node in G.nodes:
        if G.nodes[node]['state'] == 'I':  # Currently infected nodes
            # Attempt to infect susceptible neighbors
            for neighbor in G.neighbors(node):
                if G.nodes[neighbor]['state'] == 'S' and random.random() < alpha:
                    new_states[neighbor] = 'I'  # Mark for infection

            # Attempt recovery
            if random.random() < beta:
                new_states[node] = 'R'  # Mark for recovery

        elif G.nodes[node]['state'] == 'R':  # Recovered nodes
            # Attempt to lose immunity and become susceptible again
            if random.random() < gamma:
                new_states[node] = 'S'  # Mark for susceptibility

    # Apply state changes after all updates are determined
    for node, new_state in new_states.items():
        G.nodes[node]['state'] = new_state

def isolate_reinstate():
    # First, sever edges between infected and non-infected nodes
    for edge in list(G.edges):
        u, v = edge
        if (G.nodes[u]['state'] == 'I' and G.nodes[v]['state'] != 'I') or (G.nodes[v]['state'] == 'I' and G.nodes[u]['state'] != 'I'):
            G.remove_edge(u, v)
#     random_infect = 
    # Reconnect edges where non-infected nodes (S or R) have non-infected neighbors (S or R) but no edge exists
    for node in G.nodes():
        if G.nodes[node]['state'] != 'I':  # Check only non-infected nodes
            for neighbor in G.neighbors(node):
                if G.nodes[neighbor]['state'] != 'I' and not G.has_edge(node, neighbor):  # Ensure neighbor is non-infected and edge doesn't exist
                    G.add_edge(node, neighbor)


In [None]:
print(f"Graph_properties:\nNo.of nodes: {len(G.nodes)}\nNo.of edges: {len(G.edges)}\nDensity: {len(G.nodes)/len(G.edges)}\nCentral node nieghbours: {central_n(G)}\nAverage neigbours: {average_n(G)}")

In [None]:
## simulator
pycxsimulator.GUI().start(func=[initialize, observe, update])

In [None]:
susceptible_counts = []
infected_counts = []
recovered_counts = []
time_steps = 0
sir_df = pd.DataFrame(columns=["S", "I", "R", "InfNodes"])
initialize()
for step in range(1, 1000):
    susceptible = sum(1 for node in G.nodes if G.nodes[node]['state'] == 'S')
    infected = sum(1 for node in G.nodes if G.nodes[node]['state'] == 'I')
    recovered = sum(1 for node in G.nodes if G.nodes[node]['state'] == 'R')

    susceptible_counts.append(susceptible)
    infected_counts.append(infected)
    recovered_counts.append(recovered)
    infected_nodes = [node for node in G.nodes if G.nodes[node]['state'] == 'I']
    
    observe()
    update()
    sir_df.loc[step] = [susceptible, infected, recovered, infected_nodes]
    
    ## 
    if step%isolation_freq==15:
        isolate_reinstate()
        if alpha != 0.00:
            infected = random.choice(list(G.nodes))
            G.nodes[infected]['state'] = 'I'  
        
    if infected == 0:
        break

        
        

In [None]:
sir_df
plt.figure(figsize=(10, 6))
plt.plot(sir_df["S"], label="Susceptible", color="blue")
plt.plot(sir_df["I"], label="Infected", color="red")
plt.plot(sir_df["R"], label="Recovered", color="green")
plt.xlabel('Time steps')
plt.ylabel('Number of nodes')
plt.title('SIR Epidemic Simulation: Susceptible, Infected, and Recovered Over Time')
plt.legend()
plt.show()

In [None]:
total_population = len(G_frozen.nodes)
print("Peak Infection Time:", peak_infection_time(sir_df))
print("Peak Infection Proportion:", peak_infection_proportion(sir_df, total_population))
print("Total Attack Rate:", total_attack_rate(sir_df, total_population))
print("Basic Reproduction Number (R0):", basic_reproduction_number(sir_df))
print("Epidemic Duration:", epidemic_duration(sir_df))

