In [1]:
#Mount my drive- run the code, go to the link, accept.
from google.colab import drive
drive.mount('/content/gdrive')

#Change working directory to make it easier to access the files
import os
os.chdir("/content/gdrive/My Drive/Colab Notebooks/dinn")
os.getcwd() 

Mounted at /content/gdrive


'/content/gdrive/My Drive/Colab Notebooks/dinn'

In [2]:
import torch
from torch.autograd import grad
import torch.nn as nn
from numpy import genfromtxt
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F

tSI_vaccination_data = genfromtxt('tSI_vaccination_data.csv', delimiter=',') #in the form of [t, S, I]

torch.manual_seed(1234)

<torch._C.Generator at 0x7f27efdaccf0>

In [3]:
%%time

PATH = 'vacc_complex_higher_e_lr.pt' 

class DINN(nn.Module):
    def __init__(self, t, S_data, I_data): #t, S_data, I_data
        super(DINN, self).__init__()
        self.t = torch.tensor(t).float()
        self.S = torch.tensor(S_data)
        self.I = torch.tensor(I_data)

        self.losses = []

        #learnable parameters
        self.alpha1_tilda = torch.nn.Parameter(torch.rand(1, requires_grad=True))
        self.alpha2_tilda = torch.nn.Parameter(torch.rand(1, requires_grad=True))
        self.beta_tilda = torch.nn.Parameter(torch.rand(1, requires_grad=True))
        self.mu_tilda = torch.nn.Parameter(torch.rand(1, requires_grad=True))
        self.u_tilda = torch.nn.Parameter(torch.rand(1, requires_grad=True))
        self.tao_tilda = torch.nn.Parameter(torch.rand(1, requires_grad=True))

        #NN
        self.net_si = self.Net_si()
        self.params = list(self.net_si.parameters())
        self.params.extend(list([self.mu_tilda, self.beta_tilda, self.alpha1_tilda, self.alpha2_tilda, self.u_tilda, self.tao_tilda]))

        #get values for normalization
        self.t_min = torch.min(self.t)
        self.t_max = torch.max(self.t)
        self.S_min = torch.min(self.S)
        self.S_max = torch.max(self.S)
        self.I_min = torch.min(self.I)
        self.I_max = torch.max(self.I)

        #normalize 
        self.S_hat = (self.S - self.S_min) / (self.S_max - self.S_min)
        self.I_hat = (self.I - self.I_min) / (self.I_max - self.I_min)
        self.t_hat = (self.t - self.t_min) / (self.t_max - self.t_min)

    #force parameters to be in a range
    @property
    def alpha1(self):
        return torch.tanh(self.alpha1_tilda) * 2

    @property
    def alpha2(self):
        return torch.tanh(self.alpha2_tilda)

    @property
    def beta(self):
        return torch.tanh(self.beta_tilda)*0.01

    @property
    def mu(self):
        return torch.tanh(self.mu_tilda)*6

    @property
    def u(self):
        return torch.tanh(self.u_tilda)

    @property
    def tao(self):
        return torch.tanh(self.tao_tilda)

    #nets
    class Net_si(nn.Module): # input = [t]
        def __init__(self):
            super(DINN.Net_si, self).__init__()
            self.fc1=nn.Linear(1, 32) #takes t
            self.fc2=nn.Linear(32, 32)
            self.fc3=nn.Linear(32, 64)
            self.fc4=nn.Linear(64, 64)
            self.fc5=nn.Linear(64, 128)
            self.fc6=nn.Linear(128, 128)
            self.fc7=nn.Linear(128, 64)
            self.fc8=nn.Linear(64, 32)
            self.out=nn.Linear(32, 2) #outputs S, I

        def forward(self, t):
            si=F.relu(self.fc1(t))
            si=F.relu(self.fc2(si))
            si=F.relu(self.fc3(si))
            si=F.relu(self.fc4(si))
            si=F.relu(self.fc5(si))
            si=F.relu(self.fc6(si))
            si=F.relu(self.fc7(si))
            si=F.relu(self.fc8(si))
            si=self.out(si)
            return si    

    def net_f(self, t_hat):  
        # our NN take as input normalized data and outputs normalized data
        si_hat = self.net_si(t_hat) 

        S_hat, I_hat = si_hat[:,0], si_hat[:,1]

        S = self.S_min + (self.S_max - self.S_min) * S_hat
        I = self.I_min + (self.I_max - self.I_min) * I_hat
        t = self.t_min + (self.t_max - self.t_min) * t_hat

        S_hat_t_hat = grad(S_hat, t_hat, retain_graph=True)[0][0] 
        I_hat_t_hat = grad(I_hat, t_hat, retain_graph=True)[0][0] 
        
        f1_hat = S_hat_t_hat  - (- self.beta * S * I  + self.u * torch.nn.Sigmoid()(-1e3*(self.tao - t)) * (- self.alpha1) * S) * ((self.t_max - self.t_min)/(self.S_max - self.S_min))
        f2_hat = I_hat_t_hat  - (self.beta * S * I  - self.mu * I + self.u * torch.nn.Sigmoid()(-1e3*(self.tao - t)) * (-self.alpha2) * I)* ((self.t_max - self.t_min)/(self.I_max - self.I_min))
        return f1_hat, f2_hat, S_hat, I_hat
    
    def load(self):
      # Load checkpoint
      try:
        checkpoint = torch.load(PATH) 
        print('\nloading pre-trained model...')
        self.load_state_dict(checkpoint['model'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.scheduler.load_state_dict(checkpoint['scheduler'])
        epoch = checkpoint['epoch']
        loss = checkpoint['loss']
        self.losses = checkpoint['losses']
        print('loaded previous loss: ', loss)
      except RuntimeError :
          print('changed the architecture, ignore')
          pass
      except FileNotFoundError:
          pass

    def train(self, n_epochs):
      #try loading
      self.load()

      #train
      print('\nstarting training...\n')
      
      for epoch in range(n_epochs):
        #lists to hold the output (maintain only the final epoch)
        S_pred_list= []
        I_pred_list= []

        self.optimizer.zero_grad()
        for time_step in range(len(self.t_hat)):
            
            t_value = self.t_hat[time_step]
            t_value = torch.tensor(t_value, requires_grad=True).unsqueeze(0).unsqueeze(1)
            
            f1_hat, f2_hat, S_pred, I_pred = self.net_f(t_value)

            S_pred_list.append(S_pred) #these are the normalized predictions. Don't forget to denorm. them before graphing
            I_pred_list.append(I_pred)
            
            loss = (torch.mean(torch.square(self.S_hat[time_step]-S_pred))+torch.mean(torch.square(self.I_hat[time_step]-I_pred)) \
            +torch.mean(torch.square(f1_hat)) + torch.mean(torch.square(f2_hat))) #/100


            loss.backward(retain_graph=True)
        self.optimizer.step()
        self.scheduler.step() #scheduler

        self.losses.append(loss)

        #loss + model parameters update
        if epoch % 100 == 0:
          #checkpoint save every 1000 epochs if the loss is lower
          print('\nSaving model... Loss is: ', loss)
          torch.save({
              'epoch': epoch,
              'model': self.state_dict(),
              'optimizer_state_dict': self.optimizer.state_dict(),
              'scheduler': self.scheduler.state_dict(),
              'loss': loss,
              'losses': self.losses,
              }, PATH)

          #old_loss = loss #change old loss to current loss

        if epoch % 100 == 0:
          print('epoch: ', epoch)
          print('loss: ' ,loss)
          print('alpha1: (goal 1)', self.alpha1)
          print('\nalpha2: (goal 0)', self.alpha2)
          print('\nbeta: (goal 0.0075): ', self.beta)
          print('\nmu (goal 5): ', self.mu)
          print('\nu: (goal 0.515151515): ', self.u)
          print('\ntao (goal 0.58): ', self.tao)
          print('#################################')                

        
      #plot
      plt.plot(self.losses, color = 'teal')
      plt.xlabel('Epochs')
      plt.ylabel('Loss')
      return S_pred_list, I_pred_list

    def test(self):
        # Load checkpoint
        try:
            print('\nLoading trained model...')
            checkpoint = torch.load(PATH)        
            #self.net_si.load_state_dict(checkpoint['net_si_state_dict'])
            self.load_state_dict(checkpoint['model'])
            self.losses = checkpoint['losses']

        except RuntimeError :
            print('changed the architecture, ignore')
            pass
        except FileNotFoundError:
            pass
        
        S_pred_list= []
        I_pred_list= []

        for time_step in range(len(self.t)):

            t_value = self.t[time_step]
            t_value = torch.tensor(t_value, requires_grad=True).unsqueeze(0).unsqueeze(1)
            
            f1, f2, S_pred, I_pred = self.net_f(t_value)
            S_pred_list.append(S_pred)
            I_pred_list.append(I_pred)

        return S_pred_list, I_pred_list

CPU times: user 79 µs, sys: 0 ns, total: 79 µs
Wall time: 82.7 µs


In [None]:
%%time
dinn = DINN(tSI_vaccination_data[0], tSI_vaccination_data[1], tSI_vaccination_data[2]) #t, S_data, I_data

learning_rate = 0.02
optimizer = optim.Adam(dinn.params, lr = learning_rate)
dinn.optimizer = optimizer

scheduler = torch.optim.lr_scheduler.CyclicLR(dinn.optimizer, base_lr=1e-5, max_lr=1e-3, step_size_up=3000, mode="triangular2", cycle_momentum=False)

dinn.scheduler = scheduler

S_pred_list, I_pred_list = dinn.train(70000) #train
#S_pred_list, I_pred_list = dinn.test() #test


loading pre-trained model...
loaded previous loss:  tensor(0.0604, requires_grad=True)

starting training...






Saving model... Loss is:  tensor(0.0609, grad_fn=<AddBackward0>)
epoch:  0
loss:  tensor(0.0609, grad_fn=<AddBackward0>)
alpha1: (goal 1) tensor([0.3206], grad_fn=<MulBackward0>)

alpha2: (goal 0) tensor([-0.7085], grad_fn=<TanhBackward>)

beta: (goal 0.0075):  tensor([0.0029], grad_fn=<MulBackward0>)

mu (goal 5):  tensor([2.9102], grad_fn=<MulBackward0>)

u: (goal 0.515151515):  tensor([0.6505], grad_fn=<TanhBackward>)

tao (goal 0.58):  tensor([0.6050], grad_fn=<TanhBackward>)
#################################

Saving model... Loss is:  tensor(0.0568, grad_fn=<AddBackward0>)
epoch:  100
loss:  tensor(0.0568, grad_fn=<AddBackward0>)
alpha1: (goal 1) tensor([0.3183], grad_fn=<MulBackward0>)

alpha2: (goal 0) tensor([-0.7088], grad_fn=<TanhBackward>)

beta: (goal 0.0075):  tensor([0.0030], grad_fn=<MulBackward0>)

mu (goal 5):  tensor([2.9050], grad_fn=<MulBackward0>)

u: (goal 0.515151515):  tensor([0.6501], grad_fn=<TanhBackward>)

tao (goal 0.58):  tensor([0.6049], grad_fn=<TanhBac

In [None]:
#normalize the predictions
fig = plt.figure(facecolor='w')
ax = fig.add_subplot(111, facecolor='#dddddd', axisbelow=True)
ax.plot(tSI_vaccination_data[0], tSI_vaccination_data[1], 'navy', alpha=0.9, lw=2, label='Susceptible')
ax.plot(tSI_vaccination_data[0], S_pred_list, 'violet', alpha=0.9, lw=2, label='Susceptible Prediction', linestyle='dashed')
ax.plot(tSI_vaccination_data[0], tSI_vaccination_data[2], 'dodgerblue', alpha=0.9, lw=2, label='Infected')
ax.plot(tSI_vaccination_data[0], I_pred_list, 'darkgreen', alpha=0.9, lw=2, label='Infected Prediction', linestyle='dashed')

ax.set_xlabel('Time /days')
ax.set_ylabel('Number')
ax.yaxis.set_tick_params(length=0)
ax.xaxis.set_tick_params(length=0)
ax.grid(b=True, which='major', c='w', lw=2, ls='-')
legend = ax.legend()
legend.get_frame().set_alpha(0.5)
for spine in ('top', 'right', 'bottom', 'left'):
    ax.spines[spine].set_visible(False)
plt.show()

In [None]:
print('alpha1: (goal 1)', round(dinn.alpha1.item(),2))
print('\nalpha2: (goal 0)', round(dinn.alpha2.item(),2))
print('\nbeta: (goal 0.0075): ', round(dinn.beta.item(),4))
print('\nmu (goal 5): ', round(dinn.mu.item(),2))
print('\nu: (goal 0.515151515): ', round(dinn.u.item(),2))
print('\ntao (goal 0.58): ', round(dinn.tao.item(),2))


print('\nerror:')
print('alpha1: ', round((1-round(dinn.alpha1.item(),2))/1,2)*100,'%')
print('alpha2: ', round((0-round(dinn.alpha2.item(),2))/1e-20,2)*100,'%')
print('beta: ', round((0.0075-round(dinn.beta.item(),4))/0.0075,2)*100,'%')
print('mu: ', round((5-round(dinn.mu.item(),2))/5,2)*100,'%')
print('u: ', round((0.515151515-round(dinn.u.item(),2))/0.515151515,2)*100,'%')
print('tao: ', round((0.58-round(dinn.tao.item(),2))/0.58,2)*100,'%')

In [None]:
#vaccination! 

import numpy as np
from scipy.integrate import odeint
import matplotlib.pyplot as plt

# Initial number of infected individuals, I0
I0 = 1
# Everyone else, S0, is susceptible to infection initially.
S0 = 2000
# Contact rate, beta, and mean recovery rate, mu.
beta, mu = dinn.beta, dinn.mu
# A grid of time points (in days)
t = np.linspace(0, 3, 100) 
#parameters
u = dinn.u
tao = dinn.tao
alpha1 = dinn.alpha1
alpha2 = dinn.alpha2

# The SIR model differential equations.
def deriv(y, t, beta, mu, u, tao, alpha1, alpha2):
    S, I = y
    dSdt = -beta * S * I + u * (t > tao) * alpha1 * (-S)
    dIdt = beta * S * I - mu * I + u * (t > tao) * alpha2 * (-I)
    return dSdt, dIdt

#add u = 0.5, get the corresponding tao, generate the SI data
# learn u (self.u), the corresponding tao (self.tao)

# Initial conditions vector
y0 = S0, I0
# Integrate the SIR equations over the time grid, t.
ret = odeint(deriv, y0, t, args=(beta, mu, u, tao, alpha1, alpha2))
S, I = ret.T

# Plot the data on two separate curves for S(t), I(t)
fig = plt.figure(facecolor='w')
ax = fig.add_subplot(111, facecolor='#dddddd', axisbelow=True)
ax.plot(tSI_vaccination_data[0], tSI_vaccination_data[1], 'navy', alpha=0.9, lw=2, label='Actual Susceptible')
ax.plot(t, S, 'violet', alpha=0.5, lw=2, label='Learnable Param Susceptible', linestyle='dashed')
ax.plot(tSI_vaccination_data[0], tSI_vaccination_data[2], 'dodgerblue', alpha=0.9, lw=2, label='Actual Infected')
ax.plot(t, I, 'darkgreen', alpha=0.5, lw=2, label='Learnable Param Infected', linestyle='dashed')
ax.set_xlabel('Time /days')
#ax.set_ylabel('Number (1000s)')
#ax.set_ylim(0,1.2)
ax.yaxis.set_tick_params(length=0)
ax.xaxis.set_tick_params(length=0)
ax.grid(b=True, which='major', c='w', lw=2, ls='-')
legend = ax.legend()
legend.get_frame().set_alpha(0.5)
for spine in ('top', 'right', 'bottom', 'left'):
    ax.spines[spine].set_visible(False)
plt.show()