In [1]:
!pip install gitpython

Collecting gitpython
[?25l  Downloading https://files.pythonhosted.org/packages/bc/91/b38c4fabb6e5092ab23492ded4f318ab7299b19263272b703478038c0fbc/GitPython-3.1.18-py3-none-any.whl (170kB)
[K     |████████████████████████████████| 174kB 5.1MB/s 
Collecting gitdb<5,>=4.0.1
[?25l  Downloading https://files.pythonhosted.org/packages/ea/e8/f414d1a4f0bbc668ed441f74f44c116d9816833a48bf81d22b697090dba8/gitdb-4.0.7-py3-none-any.whl (63kB)
[K     |████████████████████████████████| 71kB 7.1MB/s 
[?25hCollecting smmap<5,>=3.0.1
  Downloading https://files.pythonhosted.org/packages/68/ee/d540eb5e5996eb81c26ceffac6ee49041d473bc5125f2aa995cf51ec1cf1/smmap-4.0.0-py2.py3-none-any.whl
Installing collected packages: smmap, gitdb, gitpython
Successfully installed gitdb-4.0.7 gitpython-3.1.18 smmap-4.0.0


In [2]:
from git import Repo
import os
access_token = 'f7c5ce60b5af24da91aa3df0855044dd3906b33c' # enter your github access token here. delete after cloning!
HTTPS_REMOTE_URL = f'https://{access_token}:x-oauth-basic@github.com/Ecological-Complexity-Lab/socio_epi_networks'
if not os.path.exists('./repo'):
  cloned_repo = Repo.clone_from(HTTPS_REMOTE_URL, to_path='./repo')

In [1]:
import sys
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import time
import networkx as nx
import itertools
import random
import math
import numpy as np
import seaborn as sns
sys.path.insert(0, "./repo/stochastic_models")
from graphing_utils import *
from capsule_utils import *

device = "cuda" if torch.cuda.is_available() else "cpu"
#device = "cpu"
print(device)

cuda


In [2]:
class S_I(torch.nn.Module):
    def __init__(self, contact_net, pinf, I_name, susceptiveness_name, infectiveness_name):
        super(S_I,self).__init__()
        self._contact_net = contact_net
        self._pinf = pinf
        self._n = self._contact_net.size()[0]
        self._states_names = {'I': I_name, 'susceptiveness': susceptiveness_name, 'infectiveness': infectiveness_name}
        
    def forward(self, states):
        I = states[self._states_names['I']] # boolean state of being infected
        susceptiveness = states[self._states_names['susceptiveness']] # floating point, number between 0 and 1 means probability of being infected
        infectiveness = states[self._states_names['infectiveness']] # floating point, number between 0 and 1 means probability of infecting another
        dI = I*infectiveness
        dI = 1 - (1 - self._contact_net.to_dense()*dI.unsqueeze(dim=1).matmul(susceptiveness.unsqueeze(dim=1).t())).prod(dim=0) # probability of being infected in that day
        dI = torch.rand(self._n).to(device) < dI.squeeze() # whether or not infected that day
        I = torch.max(I, dI.float()) 
        states[self._states_names['I']] = I
        return states

In [3]:
class S_E(torch.nn.Module):
    def __init__(self, contact_net, pinf, E_name, susceptiveness_name, infectiveness_name, incubation_name, infective_name=None):
        super(S_E, self).__init__()
        self._contact_net = contact_net
        self._pinf = pinf
        self._n = self._contact_net.size()[0]
        self._states_names = {'E': E_name, 'susceptiveness': susceptiveness_name, 'infectiveness': infectiveness_name, 'incubation': incubation_name}
        if infective_name: self._states_names['infective'] = infective_name # if those that are infective are more selective than just having E == 0
            
    def forward(self, states):
        E = states[self._states_names['E']] # int state of how much time left for incubation (until fully infected). inf->not infected at all, 0->infective, other->incubation
        susceptiveness = states[self._states_names['susceptiveness']] # floating point, number between 0 and 1 means probability of being infected
        infectiveness = states[self._states_names['infectiveness']] # floating point, number between 0 and 1 means probability of infecting another
        incubation = states[self._states_names['incubation']] # floating point, number at least 0 means time for person to become fully infected after exposure

        E = F.relu(E-1)
        infective = (E==0) if 'infective' not in self._states_names else states[self._states_names['infective']]
        susceptible = (E==math.inf)
        
        dI = infectiveness*infective
        dI = 1 - (1 - self._contact_net.to_dense()*dI.unsqueeze(dim=1).matmul(susceptiveness.unsqueeze(dim=1).t())).prod(dim=0) # probability of being infected in that day
        dI = torch.rand(self._n).to(device) < dI.squeeze() # whether or not infected that day
        dI = dI*susceptible
        
        E = torch.where(dI, states[self._states_names['incubation']], E) # got infected
        states[self._states_names['E']] = E
        return states

In [4]:
class E_I(torch.nn.Module):
    def __init__(self, I_name, E_name):
        super(E_I, self).__init__()
        self._states_names = {'E': E_name, 'I': I_name}
    
    def forward(self, states):
        states[self._states_names['I']] = (states[self._states_names['E']]==0).float()
        return states

In [5]:
class I_S(torch.nn.Module):
    def __init__(self, contact_net, I_name, R_name, recover_time_name):
        super(I_S,self).__init__()
        self._contact_net = contact_net
        self._n = self._contact_net.size()[0]
        self._states_names = {'I': I_name, 'R': R_name, 'recover_time': recover_time_name}
        
    def forward(self, states):
        R = states[self._states_names['R']]
        I = states[self._states_names['I']]
        recover_time = states[self._states_names['recover_time']]
        R = F.relu(R-1)
        R = torch.where((I==1) & (R==math.inf), recover_time, R) # setting recovery time for those newly infected  
        I = torch.where((R==0), torch.zeros(self._n).to(device), I)
        R = torch.where((R==0), torch.ones(self._n).to(device)*math.inf, R) # reset recovery times for those who are susceptible again
        states[self._states_names['R']] = R
        states[self._states_names['I']] = I
        return states

In [6]:
class I_R(torch.nn.Module):
    def __init__(self, I_name, R_name, recover_time_name):
        super(I_R,self).__init__()
        self._states_names = {'I': I_name, 'R': R_name, 'recover_time': recover_time_name}
        
    def forward(self, states):
        R = states[self._states_names['R']]
        I = states[self._states_names['I']]
        recover_time = states[self._states_names['recover_time']]
        n = R.size()[0]
        
        R = F.relu(R-1)
        R = torch.where((I==1) & (R==math.inf), recover_time, R) # setting recovery time for those newly infected  
        I = torch.where((R==0), torch.zeros(n).to(device), I)
        
        states[self._states_names['R']] = R
        states[self._states_names['I']] = I
        return states

In [7]:
class I_D(torch.nn.Module):
    def __init__(self, I_name, D_name, fatality_name):
        super(I_D, self).__init__()
        self._states_names = {'I': I_name, 'D': D_name, 'fatality': fatality_name}

    def forward(self, states):
        D = states[self._states_names['D']]
        I = states[self._states_names['I']]
        fatality = states[self._states_names['fatality']]
        n = D.size()[0]
        
        D = F.relu(D-1)
        D = torch.where((I==1) & (D==math.inf), fatality, D) # setting fatality for those newly infected  
        I = torch.where((D==0), torch.zeros(n).to(device), I)
        
        states[self._states_names['D']] = D
        states[self._states_names['I']] = I
        return states

In [8]:
class ControllingState_SusceptivenessInfectiveness(torch.nn.Module):
    def __init__(self, controlling_state_name, susceptiveness_name, infectiveness_name, susceptiveness_prob, infectiveness_prob, base_susceptiveness_prob, base_infectiveness_prob):
        super(ControllingState_SusceptivenessInfectiveness, self).__init__()
        self._susceptiveness_prob = susceptiveness_prob
        self._infectiveness_prob = infectiveness_prob
        self._base_susceptiveness_prob = base_susceptiveness_prob
        self._base_infectiveness_prob = base_infectiveness_prob
        
        # if a person is in controlling_state, that person's infectiveness and susceptiveness will change from the default
        self._states_names = {"controlling_state": controlling_state_name, "susceptiveness": susceptiveness_name, "infectiveness": infectiveness_name}
            
    def forward(self, states):
        controlling_state = states[self._states_names['controlling_state']]
        susceptiveness = states[self._states_names['susceptiveness']]
        infectiveness = states[self._states_names['infectiveness']]
        
        susceptiveness[(controlling_state==1)] = self._susceptiveness_prob
        infectiveness[(controlling_state==1)] = self._infectiveness_prob
        
        susceptiveness[(controlling_state==0)] = self._base_susceptiveness_prob
        infectiveness[(controlling_state==0)] = self._base_infectiveness_prob
        return states

In [9]:
class ControllingState_ControlledState(torch.nn.Module):
    def __init__(self, controlling_state_name, controlled_state_name, controlling_state_value, controlled_state_value, pinf):
        super(ControllingState_ControlledState, self).__init__()
        self._controlling_state_value = controlling_state_value
        self._controlled_state_value = controlled_state_value # the value that people with controlling_state_value in controlling_state will take on in controlled_state
        self._pinf = pinf # the probability of a person's controlled_state to follow what they are in controlling_state
        self._states_names = {"controlling_state": controlling_state_name, "controlled_state": controlled_state_name}
            
    def forward(self, states):
        controlling_state = states[self._states_names['controlling_state']]
        controlled_state = states[self._states_names['controlled_state']]
        n = controlled_state.size()[0]
        controlled_state[((torch.rand(n).to(device)<(torch.ones(n).to(device)*self._pinf))&(controlling_state==self._controlling_state_value))] = self._controlled_state_value
        states[self._states_names['controlled_state']] = controlled_state
        return states

In [10]:
class Contacts_InfectingState(torch.nn.Module):
    def __init__(self, contact_net, contacts_state_name, susceptible_state_name, pinf, infected_value):
        super(Contacts_InfectingState, self).__init__()
        self._contact_net = contact_net
        self._pinf = pinf # probability of a contact "infecting" 
        self._infected_value = infected_value # the value people "infected" by this transition take on in susceptible_state
        self._n = self._contact_net.size()[0]

        # contacts can "infect" and change the state of someone in susceptible_state to be infected_value
        self._states_names = {"contacts_state": contacts_state_name, "susceptible_state": susceptible_state_name}
        
    def forward(self, states):
        contacts_state = states[self._states_names['contacts_state']]
        susceptible_state = states[self._states_names['susceptible_state']]
        
        dI = self._contact_net.mm(contacts_state.unsqueeze(dim=1))
        dI = 1 - (1-self._pinf)**dI.squeeze()
        dI = torch.rand(self._n).to(device) < dI

        susceptible_state = torch.where(dI, torch.ones(self._n).to(device)*self._infected_value, susceptible_state)

        states[self._states_names['susceptible_state']] = susceptible_state
        return states

In [11]:
class Contacts_SwitchState(torch.nn.Module):
    def __init__(self, preevaluate, contact_net, state_name, active_state_name=None):
        super(Contacts_SwitchState, self).__init__()
        self._preevaluate = preevaluate # the probability with which a person is willing to reconsider their position in state
        self._contact_net = contact_net
        self._n = self._contact_net.size()[0]
        self._states_names = {"state": state_name} # the state in which contacts can influence their contacts to switch to or stay in their state
        if active_state_name: self._states_names["active_state"] = active_state_name # the state which tells which people can be influenced and can influence for state - the ones without a zero in active state
        
    def forward(self, states):
        state = states[self._states_names['state']]
        active_state = states[self._states_names['active_state']] if 'active_state' in self._states_names else torch.ones(self._n).to(device)
        
        active = (active_state!=0) # those with state 0 in mask_state cannot influence of be influenced
        idx_reevaluate = torch.rand(self._n).to(device)<self._preevaluate # the people who could change their mind
        idx_reevaluate = (idx_reevaluate&active)
        contacts_instate = self._contact_net.mm(((state==1)&active).float().unsqueeze(dim=1)).squeeze() # number of contacts in the state (value 1)
        total_contacts = self._contact_net.mm(active.float().unsqueeze(dim=1)).squeeze()
        choose_state = torch.rand(self._n).to(device) < (contacts_instate/total_contacts) # if a person has A contacts in the state and B contacts not, then there is probability A/(A+B) that this person will choose state A (symmetric is true with B)
        state[(idx_reevaluate&choose_state)] = 1 
        state[(idx_reevaluate&(~choose_state))] = 0 
        return states

In [12]:
n = 2000
pinf_mask_bio = 0.1
psus_mask_bio = 0.1
pinf_nomask_bio = 0.2
psus_nomask_bio = 0.2
recover_time_bio = 9.3
incubation_time_bio = 6.4
fatality_bio = 0.1
I_name_bio = "infected_covid"
R_name_bio = "remaining_recovery_time"
D_name_bio = "remaining_death_time"
E_name_bio = "remaining_incubation_time"
susceptiveness_name_bio = "susceptiveness_covid"
infectiveness_name_bio = "infectiveness_covid"
recover_time_name_bio = "recovery_times"
fatality_name_bio = "fatality_times"
incubation_name_bio = "incubation_times"
seeds_bio = 3

I_name_socio = "antimask"
preevaluate_socio = 0.3
pinf_socio_biocontacts = 0.01
pinf_socio_bioself = 0.5
seeds_proportion_socio = 0.5
maxt = 150

def initializations(pinf, psus, avg_recover_time, avg_fatality, avg_incubation_time, E_name, I_name, R_name, D_name, susceptiveness_name, infectiveness_name, recover_time_name, incubation_name, fatality_name, seeds=1):
    E = torch.ones(n).to(device) * math.inf
    I = torch.zeros(n).to(device)
    R = torch.ones(n).to(device) * math.inf
    D = torch.ones(n).to(device) * math.inf
    E[random.sample(range(0, n-1), seeds)] = 0
    susceptiveness = torch.ones(n).to(device)*psus
    infectiveness = torch.ones(n).to(device)*pinf
    
    # destiny in case of infection is predetermined
    incubation = F.relu(torch.distributions.normal.Normal(avg_incubation_time,2).rsample((n,)).to(device))
    fatality = torch.where((torch.rand(n).to(device) < avg_fatality), incubation+14*torch.ones(n).to(device), math.inf*torch.ones(n).to(device))        
    recover_time = F.relu(torch.distributions.normal.Normal(avg_recover_time,2).rsample((n,)).to(device))+incubation
    
    return  {E_name: E, I_name: I, R_name: R, D_name: D, recover_time_name: recover_time, susceptiveness_name: susceptiveness, infectiveness_name: infectiveness, incubation_name: incubation, fatality_name: fatality}            

In [19]:
def sim(repeats=1):
    with torch.no_grad():
        final_log = []
        
        for _ in range(repeats):
            # contact nets generation
            m_bio = 10*n # number of connections in contact matrix
            idx = torch.randint(n,(2,m_bio)).to(device)
            con_bio = torch.sparse.FloatTensor(idx,torch.ones(m_bio).to(device),torch.Size([n,n])).to(device)
            con_bio = con_bio + con_bio.t()

            m_socio = 10*n
            idx = torch.randint(n,(2,m_socio)).to(device)
            con_socio = torch.sparse.FloatTensor(idx,torch.ones(m_socio).to(device),torch.Size([n,n])).to(device)
            con_socio = con_socio + con_socio.t()
            
            # states initialization
            states = initializations(pinf=pinf_mask_bio, psus=psus_mask_bio, avg_recover_time=recover_time_bio, avg_fatality=fatality_bio, 
                                     avg_incubation_time=incubation_time_bio, recover_time_name=recover_time_name_bio, I_name=I_name_bio, E_name=E_name_bio,
                                     R_name=R_name_bio, D_name=D_name_bio, susceptiveness_name=susceptiveness_name_bio, infectiveness_name=infectiveness_name_bio, 
                                     incubation_name=incubation_name_bio, fatality_name=fatality_name_bio, seeds = seeds_bio) # bio states
            I_socio = torch.zeros(n).to(device)
            I_socio[random.sample(range(0, n-1), int(seeds_proportion_socio*n))] = 1
            states[I_name_socio] = I_socio
            assert len(states) == 10 # check against duplicate names
            
            # transitions between states
            se_bio = S_E(contact_net=con_bio, pinf=pinf_mask_bio, E_name=E_name_bio, susceptiveness_name=susceptiveness_name_bio, infectiveness_name=infectiveness_name_bio, incubation_name=incubation_name_bio, infective_name=I_name_bio)
            ei_bio = E_I(E_name=E_name_bio,I_name=I_name_bio)
            ir_bio = I_R(I_name=I_name_bio, R_name=R_name_bio, recover_time_name=recover_time_name_bio)
            id_bio = I_D(I_name=I_name_bio, D_name=D_name_bio, fatality_name=fatality_name_bio)
            seird_bio = torch.nn.Sequential(se_bio, ei_bio, ir_bio, id_bio)
            
            symmetric_socio = Contacts_SwitchState(preevaluate=preevaluate_socio, contact_net=con_socio, state_name=I_name_socio, active_state_name=D_name_bio)
            
            # transitions between layers
            socio_to_bio = ControllingState_SusceptivenessInfectiveness(controlling_state_name=I_name_socio, susceptiveness_name=susceptiveness_name_bio, 
                                                                        infectiveness_name=infectiveness_name_bio, susceptiveness_prob=psus_nomask_bio, infectiveness_prob=pinf_nomask_bio,
                                                                        base_susceptiveness_prob=psus_mask_bio, base_infectiveness_prob=pinf_mask_bio)
            
            covidinfection_means_maskwearing = ControllingState_ControlledState(controlling_state_name=I_name_bio, controlled_state_name=I_name_socio, controlling_state_value=1, controlled_state_value=0, pinf=pinf_socio_bioself)
            infected_contacts_leads_to_mask = Contacts_InfectingState(contact_net=con_socio, contacts_state_name=I_name_bio, susceptible_state_name=I_name_socio, pinf=pinf_socio_biocontacts, infected_value=0)

            # two layer models
            seird_bio_symmetric_socio = torch.nn.Sequential(seird_bio, symmetric_socio, socio_to_bio)#, infected_contacts_leads_to_mask)#covidinfection_means_maskwearing, infected_contacts_leads_to_mask)
            
            log = []
            for _ in range(maxt):
                states = seird_bio_symmetric_socio(states)
                log.append(torch.stack((((states[E_name_bio] > 0)&(states[E_name_bio] < math.inf)), states[I_name_bio], ((states[R_name_bio]==0)&(states[D_name_bio]>0)).float(), (states[D_name_bio]==0).float(), states[I_name_socio])))
                
            final_log.append(torch.stack(log))
        final_log = torch.stack(final_log)
    return final_log

In [None]:
colors = [("yellow", "exposed_covid"), ("red", I_name_bio), ("green", "recovered_covid"), ("black", "dead_covid"), ("blue", "no_mask")]
age_sizes = [n]
log = sim(20)
plot(log, colors, age_sizes, by_age=False, title=f"SEIRD bio, symmetric socio (social to bio transition only)", percent_extremes_remove=0.1)

In [None]:
matrix_area = []
matrix_peak = []
matrix_time = []
for pinf_socio_biocontacts in np.arange(0, 1, 0.1):
    row_area = []
    row_peak = []
    row_time = []
    for preevaluate_socio in np.arange(0, 1, 0.1):
        log = sim(5)
        row_area.append(log.mean(dim=0)[:, 1].sum().item())
        row_peak.append(torch.max(log.mean(dim=0)[:, 1].sum(dim=1)).item())
        row_time.append(torch.where(log.mean(dim=0)[:, 1].sum(dim=1)==row_peak[-1])[0][0].item())
    matrix_area.append(row_area)
    matrix_peak.append(row_peak)
    matrix_time.append(row_time)

In [None]:
fig, ax = plt.subplots()
sns.heatmap(np.array(matrix_time)[::-1])
ax.set_title("Time until peak biological infection")
ax.set_xticklabels(range(10))
ax.set_yticklabels(range(10)[::-1])
plt.xlabel("probability of reevaluating mask decision (x10^-1)")
plt.ylabel("probability of contact of infected to wear mask (x10^-1)")

In [None]:
fig, ax = plt.subplots()
sns.heatmap((np.array(matrix_area)[::-1][:-1]/n))
ax.set_title("Normalized area under biological infection curve")
ax.set_xticklabels(range(10))
ax.set_yticklabels(range(10)[::-1])
plt.xlabel("probability of reevaluating mask decision (x10^-1)")
plt.ylabel("probability of contact of infected to wear mask (x10^-1)")

In [None]:
fig, ax = plt.subplots()
sns.heatmap((np.array(matrix_peak)[::-1][:-1]/n))
ax.set_title("Peak bio infection (proportion)")
ax.set_xticklabels(range(10))
ax.set_yticklabels(range(10)[::-1])
plt.xlabel("probability of reevaluating mask decision (x10^-1)")
plt.ylabel("probability of contact of infected to wear mask (x10^-1)")