<a href="https://colab.research.google.com/github/Emivk/Control_MPC_en_Simulacion_de_epidemias/blob/main/MPC_SIR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Algoritmo de control MPC aplicado en un modelo SIR sin demografía con modulación en amplitud y frecuencia de las variaciones ($SIR_{real}$) usando como modelo de sustitución $SIR_{ideal}$.


In [1]:

import numpy as np
from scipy.integrate import odeint
from scipy.optimize import minimize
import matplotlib.pyplot as plt
from math import sin as sen

class paramsv:
  def __init__(self,beta):
    self.beta = beta
    self.ret = 0
  def reiniciar(self):
    self.beta = 0
    self.ret = 0

def SIR(y, t, N, beta, gamma):
    """
    Definición del Modelo SIR sin demografía (SIR_ideal) usado como modelo de sustitución.
    Parámetros
    y(list(1,3)): valores iniciales de las variables SIR
    t(list): tiempo de simulación
    N(int): población total
    beta(float): tasa de transmisión beta
    gamma(float): tasa de recuperación gamma
    """
    S, I, R, C = y
    dSdt = -beta * S * I / N
    dIdt = beta * S * I / N - gamma * I
    dRdt = gamma * I
    dCdt = beta* S * I / N
    return dSdt, dIdt, dRdt, dCdt

def SIR_real(y, t, N, beta, gamma, alpha, b):
    """Definición del Modelo SIR sin demografía con modulación en la tasa de transmisión.
    Parámetros
    y(list(1,4)): valores iniciales de las variables SIRC
    t(list): tiempo de simulación
    N(int): población total
    beta(float): tasa de transmisión beta
    gamma(float): tasa de recuperación gamma
    alpha(float): factor de modulación de amplitud de la modulación
    b(int): variación de la frecuencia temporal en la modulación del modelo SIR_real (1/b)
    """
    S, I, R, C = y
    dSdt = -(beta*(1+sen(t/b)*alpha)) * S * I / N
    dIdt = beta*(1+sen(t/b)*alpha) * S * I / N - gamma * I
    dRdt = gamma * I
    dCdt = beta*(1+sen(t/b)*alpha) * S * I / N
    return dSdt, dIdt, dRdt, dCdt

def objectivo(actuacion,t,hc,hdp,I_0,R_0,N,cI,clase,gamma,c,fce,fcac):
    """ Función para definir el valor de la función de costo en cada horizonte de predicción
    """
    u=float(actuacion[0])
    S_0 = N - I_0 - R_0 # calculo de individuos susceptibles inicial en el
                        # periodo del Horizonte de predicción actual

    # control de la transmisión de la enfermedad dentro de la función objetivo
    # para el problema de optimización
    beta_n = clase.beta * (1 - u)

    t1=[i for i in range(t,t+hc+hdp+1,1)] # tiempo del Horizonte de predicción

    # Solución del modelo de sustitución en el periodo actual del horizonte usando el control calculado
    clase.ret = odeint(SIR, [S_0, I_0, R_0, cI], t1, args=(N, beta_n, gamma))

     # criterio de costo para infectados
    if fce=='M':
      infectados = max(clase.ret[:, 1]) # Maximo de infectados en el periodo actual de la simulación
    elif fce=='C':
      infectados = clase.ret[-1,3] # Conteo acumulado de infectados en el periodo actual de la simulacón

    # criterio de funcion de costo por infectados
    # aplicación de control, costo de aplicación temporal
    # y peso c
    if fcac=='B':
      return infectados + u*(hc+hdp)*c #básico(suma simple)
    elif fcac=='C':
      return infectados**2 + u**2*(hc+hdp)*c #cuadrado
    elif fcac=='R':
      return infectados + (u/(1.1-u))*(hc+hdp)*c # racional
    elif fcac=='M':
      return infectados**2 + (u/(1.1-u))*(hc+hdp)*c # mixto

def const(actuacion):
    """función que definine las restricciones del problema de optimización de forma que 1-u<=0
    """
    return 1 - actuacion[0]

def simulacion(poblacion: int, Beta:float, Gamma:float, Iinicial:int, Rinicial:int, uinicial:float, rin:float, rfin:float, Ht:int, Hc:int, Hdp:int, c:float, b:float, graficar:bool,fce,fcac):
  """Función para Simular una epidemia usando un control predictivo (MPC) en un modelo SIR sin demografía con variaciones.
  Parámetros:
  poblacion(int): numerp total de individuos
  Beta(float): tasa de transmision
  Gamma(float): tasa de recuperacion
  Iinicial(int): numero inicial de infectados
  Rinicial(int): numero inicial de recuperados
  uinicial(float): acción(valor del control) inical propuesto
  rin(int): restricción inferior del control
  rfin(int): restricción superior del control
  ht(int): tiempo de simulación
  hc(int): horizonte de control
  hdp(int): diferencia del horizonte de predicción con el horizonte de control (hp=hc+hdp)
  c(float): peso usado en el costo por aplicación de control
  b=(int): variación de la frecuencia temporal en la modulación del modelo SIR_real (1/b)
  graficar(bool): graficar resultados de la simulación
  fce: criterio de costo para infectados 'M' para max(I) y 'C' para conteo acumulado de I
  fcac: criterio de funcion de costo por infectados 'B' para suma simple, 'C' para cuadrado, 'R' para racional y 'M' para mixto
  """
 # parametros iniciales
  pv=paramsv(Beta)
  N = poblacion
  gamma = Gamma
  I0 = Iinicial
  R0 = Rinicial
  cI_0=I0
  S0=N-I0-R0

  # Configuración del problema de optimización
  acinicial = uinicial
  restricciones = [(rin , rfin)]
  horizontet=Ht #Tiempo de simulación en días
  hc=Hc # horizonte de control
  hdp=Hdp # horizonte de predicción

  SIR_0=np.array([[S0],[I0],[R0],[cI_0]]) # almacernar variables del modelo
  t_cI=[0] # almacenar momento correspondiente a cada C

  acciones=[acinicial] # almacenar valor de los controles calculados (acciones)
  tiemposu=[0] # almacenar ubicación temporal de cada aplicación de las acciones
  betas=[pv.beta] #almacernar betas modificadas
  t0 = [i for i in range(0, horizontet + 1)] # establecer tiempos del horizonte temporal
  dts = [i for i in range(0, horizontet-hc+1,hc)] # momento del inicio del horizonte de control
  final=[] # guadar estado de término de la simulación

  # iniciar secuencia de simulación
  for i in t0:
      if i in dts:
        # Resolver el problema de optimización
        result = minimize(objectivo, acinicial, args=(i,hc,hdp,I0,R0,N,cI_0,pv,gamma,c,fce,fcac), bounds=restricciones, constraints={'type': 'ineq', 'fun': const})
        u_opt= result.x[0] # resultado de la minimización (control u optima)
        beta_n = pv.beta * (1 - u_opt) # beta optima

        # pv.beta = beta_vac # Actualizar las betas en cada horizonte de predicción (quitar comentario para activarlo)
        betas.append(beta_n) # Almacenar las beta calculada
        acciones.append(u_opt) # Almacenar acciones calculada
        tiemposu.append(i) # Guardar el tiempo de la aplicación del control

        t2=[j for j in range(i,i+hc+1,1)] # periodo del horizonte de control
        S_0=N-I0-R0 # calcular suceptibles actuales

        pv.ret=odeint(SIR_real, [S_0, I0, R0, cI_0], t2, args=(N, beta_n, gamma, 2, b)) # simular el horizonte de control con el control optimo calculado
        SIR_0=np.hstack((SIR_0,np.round(pv.ret[1:,:].T,9))) # almacenar las variables del sistema

        S,I,R,cI=SIR_0 #separar las variables

        t_cI.append(i+hc+1) #guardar la ubicación temporal de C

        # actualizar valores iniciales
        I0=I[-1]
        R0=R[-1]
        cI_0=cI[-1]
        acinicial=u_opt
      if round(acciones[-1])==0 and round(cI[-1]-R[-1])==0:
          #print('terminó la pandemia',' en el tiempo:', i)
          final='blue'
          break
  if final!='blue':
      #print('no terminó la pandemia')
      final='gray'


  if graficar==True:
    # Graficar simulación
    t0=[i for i in range(0,len(S))]
    plt.plot(t0[:], S, label='Susceptibles')
    plt.plot(t0[:], I, label='Infectados')
    plt.plot(t0[:], R, label='Recuperados')
    plt.xlabel('Tiempo (días)')
    plt.ylabel('Población')
    plt.title('Simulación de epidemia de Varicela SIR-MPC')
    plt.legend()
    #plt.savefig('u cuadrados hc=9 hp=1 - MPC epidemia.png')
    plt.show()
    pv.reiniciar()

    #Graficar acciones (control) aplicados en el tiempo en la simulación
    plt.plot(tiemposu, acciones, label=r'Implementaciones de $u$ en el tiempo', color='red')
    plt.title('Acciones calculadas en la simulación')
    plt.xlabel('Tiempo (días)')
    plt.ylabel('Valor de la acción')
    plt.legend()
    plt.show()

    #Graficar conteo acumulado de infectados
    plt.plot(t0,cI, color='green')
    plt.title('Conteo acumulado de infectados en la simulación')
    plt.xlabel('Tiempo (días)')
    plt.ylabel(r'Conteo acumulado de Infectados $C$')
    plt.scatter(t0[-1], cI[-1], color='r', s=100, label=f'Máximo del conteo de acumulados: ({cI[-1]}')
    plt.legend()
    plt.show()

  # Devolver resultados de la simulación
  if fce=='M':
      return max(I), final # regresar el máximo de Infectados de la epidemia
  elif fce=='C':           # y el estado final de la simulación
    return cI[-1],final # regresar el conteo acumulado de infectados final
                        # y el estado final de la simulación


Ejemplo de cómo realizar una simulación

In [None]:
simulacion(poblacion=1000, Beta=0.35, Gamma=0.1, Iinicial=1, Rinicial=0, uinicial=0.1, rin=0, rfin=1, Ht=5000, Hc=95, Hdp=9, c=1, b=1, graficar=True,fce='C',fcac='R')

Configuración de las corridas experimentales

In [None]:
# Experimento: probar diferentes combinaciones de hc y hdp para una misma
# función de costo
hc=[i for i in range(1,100+1)] # valores de hc a probar
hdp=[i for i in range(1,100+1)] # valores de hdp a probar

resultados=[] # guardar resultados de cada simulación
finales=[] # Estados finales de cada simulación
for pi in hdp:
  temp=[] # Guardar temporalmente el resultado de la simulación
  tempc=[] # Guardar temporalmente el estado de la simulación
  for ci in hc:
    r,c=simulacion(1000,0.35,0.1,1,0,0.1,0,1,5000,ci,pi,1.5,1,False,'C','R') # configuración de los experimentos
    temp.append(r)
    tempc.append(c)
  if pi<=hdp[0]: # primera iteración
    resultados=temp
    finales=tempc
  else:
    resultados=np.vstack((resultados,temp))
    finales=np.vstack((finales,tempc))

Graficar experimentos

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.patches as mpatches

# Crear la figura y el objeto Axes3D
fig = plt.figure(figsize=(15,15),dpi=100)

ax = fig.add_subplot(111, projection='3d')
c, p = np.meshgrid(hc, hdp)
# Graficar la superficie
ax.plot_surface(c, p, resultados, facecolors=finales)# cmap='viridis')

# Mostrar el mínimo
z_min = np.min(resultados)
min_index = np.unravel_index(np.argmin(resultados), resultados.shape)

x_min = c[min_index]
y_min = p[min_index]
#print(f'El valor mínimo de z es {z_min} y se encuentra en (c, p) = ({x_min}, {y_min})')

# Configurar etiquetas
ax.set_xlabel(r'Días del Horizonte de control $h_c$')
ax.set_ylabel(r'Días restantes del Horizonte de Predicción $h_{dp}$')
ax.set_zlabel(r'Conteo Acumulado final de Infectados $C$')
#ax.set_zlabel(r'Cantidad Máxima de Infectados $\max (I)$')

ax.scatter(x_min, y_min, z_min, color='r', s=100, label=f'Mínimo: ({x_min:.2f}, {y_min:.2f}, {z_min:.2f})')
#ax.text(x_min, y_min, z_min, f'({x_min:.2f}, {y_min:.2f}, {z_min:.2f})', color='red', fontsize=12)
# Mostrar la gráfica
plt.title('Impacto del Horizonte de control $h_c$ y dias restantes de predicción $h_{dp}$ en la evolución de Infectados en un modelo SIR-MPC')
blue_patch = mpatches.Patch(color='blue', label='Epidemia Erradicada')
grey_patch = mpatches.Patch(color='grey', label='Epidemia no Erradicada')
plt.legend(handles=[blue_patch, grey_patch, ax.scatter(x_min, y_min, z_min, color='r', s=100, label=f'Mínimo: ({x_min:.2f}, {y_min:.2f}, {z_min:.2f})')])

plt.show()