In [None]:
#Paquetes iniciales para el funcionamiento de todo el cuaderno
import numpy as np
from torch import nn
import asyncio
from torch.optim import SGD

## Bitácora desarrollo de red neuronal con entrenamiento distribuído asíncrono
En este cuaderno se encuentra el proceso de desarrollo y el paso desde nuestra versión semiasíncrona hasta llegar a la versión completamente asíncrona y las consideraciones y retos dentro del proceso

### Planteamiento inicial
Desde el punto de partida contábamos con una versión que utilizaba Docker para el despliegue con el dataset CIFAR10, realizando pruebas locales desplegando hasta 4 contenedores que realizaban el entrenamiento comunicando todo mediante sockets. Dentro de esta versión contábamos con un parameter server que de forma asíncrona esperaba solicitudes por parte de workers, para entregar a estos lotes de trabajo, sin embargo la optimización la realizaba de forma sincrónica ya que acumulaba los gradientes de todos los workers activos antes de optimizar, para así evitar gradientes obsoletos. Este fue nuestro punto de partida, ya que el propósito de esta segunda implementación era llegar a una versión completamente asincrónica y primero recurrimos a la literatura, basándonos así en los siguientes recursos como punto de partida:
- Entrenamiento asíncrono con SGD (https://arxiv.org/abs/1511.05950)
- Comportamiento del momentum en ASGD (https://arxiv.org/abs/1907.11612)

### Primeros pasos
Después de recurrir a la literatura vimos una pequeña ventaja desde el punto de partida. Nuestra versión previa ya calculaba los gradientes de forma asíncrona pero perdía el factor asíncrono en el momento en que acumulaba los gradientes antes de optimizar, así que las modificaciones para la versión asíncrona eran simples. Partimos de modificar primero nuestro modelo de parameter server, teniendo los siguientes atributos:

In [None]:
class ParameterServer:
    def __init__(self, id, host, port, model, longitude,
                 epochs=50, batch_size=128, lr=0.08, alpha=1.0):
        self.id = id
        self.host = host
        self.port = port
        self.model = model
        self.longitude = longitude
        self.indexes = np.random.permutation(longitude).tolist()
        self.batch_size = batch_size
        self.batch_pointer = 0
        self.base_lr = lr
        self.alpha = alpha
        self.optimizer = SGD(self.model.parameters(), lr, momentum=0.0)
        self.epochs = epochs
        self.current_epoch = 0
        self.server = None
        self.connections = {}
        self.model_version = 0
        self.time = None
        self.lock = asyncio.Lock()
        self.active_workers = 0

Vemos una variable llamada model_version, que en este caso será la encargada de conservar la versión del modelo, ya que este valor será esencial para nuestro entrenamiento asíncrono. Primero es importante entender como funciona este entrenamiento asíncrono. Para entrenamiento asíncronos hablamos de **staleness** para referirnos al desfase que genera actualizar un modelo con gradientes obsoletos, por lo tanto la manera en que se controla dicho desfase, es manejando la versión del modelo y calculando el **staleness** como la versión global del modelo menos la versión local que traigan los gradientes a la hora de optimizar. En base a este valor obtenemos la siguiente fórmula para el control de dicho desfase:
$$
\text{async\_lr} = \frac{\text{base\_lr}}{1 + \alpha \cdot \text{staleness}}
$$
Básicamente lo que hacemos es tomar el valor de learning rate y entre mayor *staleness* tengamos, menor será el learning rate, disminuyendo así la manera en que los gradientes afectaran, controlando así el desfase. Ahora bien también tenemos un parámetro $\alpha$, que definimos como la severidad que tendrá el **staleness** que para nuestro caso lo mantenemos en 1 como se puede ver desde los atributos del parameter server.

Para tener mayor claridad veremos en código la manera en que se actualizan los gradientes dentro de nuestra implementación:

In [None]:
from models import MSG_GRADIENTS, msg_type, message, self

if msg_type == MSG_GRADIENTS:
    worker_version = message.get("model_version", 0)
    grads = message.get("data")
    staleness = max(0, self.model_version - worker_version)
    async_lr = self.base_lr / (1.0 + self.alpha * staleness)

    async with self.lock:
        self.optimizer.param_groups[0]['lr'] = async_lr
        self.optimizer.zero_grad()
        for param, grad in zip(self.model.parameters(), grads):
            param.grad = grad.clone().to(param.device)
        self.optimizer.step()
        self.model_version += 1

Básicamente tenemos un tipo de mensaje que el parameter server distingue al recibir gradientes por parte de un worker, donde obtiene la versión del modelo para calcular el **staleness** y mediante este desfase actualiza el learning rate para el optimizador que teníamos definido desde los atributos que vimos previamente. En base a esta actualización del learning rate carga los gradientes recibidos y realiza el step con el optimizador preparado, aumentando también la versión del modelo. Si nos fijamos a la hora de realizar estos procesos lo hace mediante un self.lock, esto para que no se generen condiciones de carrera con los workers a la hora de querer optimizar el modelo global, ya que este lock básicamente se encarga de bloquear dicho segmento para ser accesible solo por un worker al tiempo.

Ahora teniendo aplicado el funcionamiento observado en el primer paper, pasamos al segundo paper, donde se habla del momentum. El momentum en nuestra primera implementación tenía valores altos, entre 0.8 y 0.9, sin embargo el paper establece que para modelos asíncronos los valores de momentum altos complican el entrenamiento debido al comportamiento asíncrono, lo cual pudimos comprobar al punto de dejar nuestro momentum en 0.0, ya que probando valores de hasta 0.1 afectaba en gran medida el accuracy final del modelo.

### Despliegue de la implementación
En nuestra primera implementación usamos Docker, sin embargo no fue tan sencillo, ya que para poder usar GPU's fue necesario primero instalar el toolkit de NVIDIA en el kernel de Docker, que para nuestro caso al ser en Windows, debimos instalar primero un WSL, en este WSL instalar el toolkit y usar Docker basado en dicho WSL. Para nuestra segunda implementación intentamos usar **Conda**, sin embargo el primer problema fue el funcionamiento de **Conda** en Windows, el cual tiene una gran cantidad de errores gracias a la incompatibilidad con ciertos DLL's. Probamos también mediante el uso del clásico **venv** para así usar entornos virtuales simples, que en un inicio eran completamente funcionales pero al usar **venv** en Windows no se puede establecer ningún control sobre el uso de RAM y demás, por lo tanto las pruebas terminaban siendo bastante complejas porque los entornos virtuales comenzaban a consumir recursos sin control. Después de todo esto optamos por continuar con Docker inicialmente, ya que parte de esta segunda implementación era el cambio de dataset, lo cual veremos a continuación.

### Cambio de Dataset a ImageNet1k
Uno de los mayores retos para nosotros fue el cambio de dataset, ya que en un inicio, cuando realizamos los primeros cambios implementando el manejo del **staleness** y el momentum, todo fue probado con el dataset de CIFAR10, obteniendo valores en accuracy por encima el 40%, lo cual en un inicio nos pareció un buen punto de partida. Sin embargo a la hora de cambiar el dataset corrimos con varias complicaciones; primero la magnitud del dataset era mucho mayor, teniendo presente que CIFAR10 tiene un peso menor a 3GB e ImageNet1k pesaba entre 60-70GB en su versión más liviana que se encuentra en Kaggle. Igualmente descargamos el dataset y modificamos nuestras capas convolucionales para poder adaptarlas a este problema. A continuación veremos nuestro modelo para CIFAR10 y el modelo que adaptamos para ImageNet1k:

In [None]:
class Convolutional_ImageNet(nn.Module):
    def __init__(self, num_classes=1000):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((7, 7))
        )
        
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 7 * 7, 8192),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(8192, 4096),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(4096, 2048),
            nn.ReLU(),
            nn.Dropout(0.2),

            nn.Linear(2048, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


class Convolutional_CIFAR10(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128*4*4, 1024),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.15),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

De entrada basta con solo mirar las capas convolucionales para ver la forma en que aumentó la amplitud del modelo, ya que en ImageNet1k procesamos imágenes de 224x224, siendo CIFAR10 de imágenes de solo 32x32. Gracias a esto la capa de clasificación también aumentó bastante, generando todo esto bloqueos en la máquina sobre la cual realizamos las pruebas, teniendo que disminuir el tamaño de los batches bastante para poder al menos correr el código sin tener bloqueos. Después de tener varios inconvenientes con el manejo del dataset, podíamos empezar a ver que teníamos un problema relacionado con la capacidad de cómputo y no con el planteamiento del algoritmo, ya que en CIFAR10 veíamos una base correcta como punto de partida. Decidimos explorar más datasets que se encontraran en un punto medio entre CIFAR10 e ImageNet1k y encontramos Food101, el cual clasifica alimentos, pero también tiene imágenes de tamaños variados, que recomiendan usar a 224x224, teniendo un tamaño igual al anterior pero reducciones la capa de clasificación como lo veremos a continuación:

In [None]:
class Convolutional_Food101(nn.Module):
    def __init__(self, num_classes=101):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),

            nn.AdaptiveAvgPool2d((7, 7))
        )
        
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1), 
            nn.Linear(4096, 1024), 
            nn.ReLU(), 
            nn.Dropout(0.1), 
            nn.Linear(1024, 512), 
            nn.ReLU(), 
            nn.Dropout(0.1), 
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

En este modelo nuestra capa de clasificación es un poco más pequeña pero la capa convolucional mantiene la misma complejidad. Con estos arreglos intentamos correr pruebas, y para este caso si fue posible hacerlo pero con tiempos de 40-50 minutos por época, además teniendo presente el hecho de que todas las pruebas se corrieron usando GPU (NVIDIA RTX 3060 12GB), sin embargo el uso de la GPU al ser todo mediante pruebas locales no era el más óptimo debido a que el uso de la GPU se hace mediante una cola de ejecución.

### Conclusiones
A partir de todas las pruebas realizadas pudimos concluir distintas cosas, entre ellas las siguientes:
1. El planteamiento para el control del **staleness** en modelos asíncronos fue adecuado, ya que en las primeras pruebas con CIFAR10 vimos una base adecuada reflejada en el accuracy probando distintos hiperparámetros.
2. En las pruebas realizadas con CIFAR10 observamos que el learning rate con valores más altos obtiene mejores resultados y tiene bastante sentido, ya que si de entrada el learning rate tiene valores muy bajos no tendrá tantas diferencias el dividirlo sobre el **staleness**
3. En entrenamiento asíncrono el se debe tener mucho cuidado con el dropout y con el momentum, ya que en modelos síncronos nos pueden ayudar mucho pero en modelos asíncronos pueden generar pérdida en la tendencia que necesitamos captar siendo esto primordial para estos casos.
4. El uso de datasets más grandes si requiere el uso de cómputo distribuido real sea en la nube o de forma física para tener pruebas adecuadas, sin embargo la limitante mayormente por esto, por el acceso a capacidad de cómputo.


- *Santiago Martínez Varón - 1004521370*
- *Luis Miguel Salazar Londoño - 1192725457*