# Laboratorio no evaluado: Estrategias distribuidas con TF y Keras
------------------------



Bienvenidos, durante este laboratorio no calificado van a realizar una estrategia de entrenamiento distribuido usando TensorFlow y Keras, específicamente la [`tf.distribute.MultiWorkerMirroredStrategy`](https://www.tensorflow.org/api_docs/python/tf/distribute/MultiWorkerMirroredStrategy). 

Con la ayuda de esta estrategia, un modelo Keras que fue diseñado para ejecutarse en un solo trabajador puede funcionar sin problemas en múltiples trabajadores con un cambio mínimo de código. En concreto lo hará:


1. Realizar el entrenamiento con un único worker.
2. Comprender los requisitos para una configuración multi-worker (variable `tf_config`) y el uso de gestores de contexto para implementar estrategias distribuidas.
3. Utilizar comandos mágicos para simular diferentes máquinas.
4. Realizar una estrategia de entrenamiento multi-trabajador.

Este cuaderno está basado en el cuaderno oficial [Multi-worker training with Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras), que cubre algunos temas adicionales por si quieres profundizar en este tema.

La guía [Distributed Training with TensorFlow](https://www.tensorflow.org/guide/distributed_training) también está disponible para una visión general de las estrategias de distribución que soporta TensorFlow para aquellos interesados en una comprensión más profunda de las APIs `tf.distribute.Strategy`.

¡Vamos a empezar!

## Setup

First, some necessary imports.

In [1]:
import os
import sys
import json
import time

Antes de importar TensorFlow, haz algunos cambios en el entorno.

- Desactiva todas las GPUs. Esto evita errores causados por los trabajadores todos tratando de utilizar la misma GPU. **Para una aplicación real cada trabajador estaría en una máquina diferente.


- Añade el directorio actual a la ruta de python para que los módulos de este directorio puedan ser importados.

In [2]:
# Disable GPUs
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# Add current directory to path
if '.' not in sys.path:
  sys.path.insert(0, '.')

El paso anterior es importante ya que este notebook se basa en escribir archivos usando el comando mágico `%%writefile` y luego importarlos como módulos.

Ahora que la configuración del entorno está lista, importa TensorFlow.


In [3]:
import tensorflow as tf

# Ignore warnings
tf.get_logger().setLevel('ERROR')

### Dataset and model definition

A continuación, cree un archivo `mnist.py` con un modelo simple y un conjunto de datos. Este archivo python será utilizado por los procesos de trabajo de este tutorial.

El nombre de este archivo se deriva del conjunto de datos que va a utilizar que se llama [mnist](https://keras.io/api/datasets/mnist/) y consta de 60.000 28x28 imágenes en escala de grises de los 10 primeros dígitos.

In [4]:
%%writefile mnist.py

# import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  # Load the data
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # Normalize pixel values for x_train and cast to float32
  x_train = x_train / np.float32(255)
  # Cast y_train to int64
  y_train = y_train.astype(np.int64)
  # Define repeated and shuffled dataset
  train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset


def build_and_compile_cnn_model():
  # Define simple CNN model using Keras Sequential
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  # Compile model
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  
  return model

Overwriting mnist.py


Compruebe que el archivo se ha creado correctamente:

In [5]:
!ls *.py

main.py  mnist.py


Importa el módulo mnist que acabas de crear y prueba a entrenar el modelo durante un pequeño número de epochs para observar los resultados de un único trabajador y asegurarte de que todo funciona correctamente.

In [6]:
# Import your mnist model
import mnist

# Set batch size
batch_size = 64

# Load the dataset
single_worker_dataset = mnist.mnist_dataset(batch_size)

# Load compiled CNN model
single_worker_model = mnist.build_and_compile_cnn_model()

# As training progresses, the loss should drop and the accuracy should increase.
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7f3adcb7b550>

Todo funciona como se esperaba. 

Ahora verás cómo se pueden utilizar múltiples trabajadores como estrategia distribuida.

## Configuración multi-trabajador

Ahora vamos a entrar en el mundo de la formación multi-trabajador. En TensorFlow, la variable de entorno `TF_CONFIG` es necesaria para el entrenamiento en múltiples máquinas, cada una de las cuales posiblemente tenga un rol diferente. TF_CONFIG` es una cadena JSON utilizada para especificar la configuración del cluster en cada trabajador que forma parte del cluster.

Hay dos componentes de `TF_CONFIG`: `cluster` y `task`. 

Veamos cómo se utilizan:

`cluster`:
- **Es el mismo para todos los trabajadores** y proporciona información sobre el cluster de entrenamiento, que es un dictado formado por diferentes tipos de trabajos como `worker`.

- En el entrenamiento multi-trabajador con `MultiWorkerMirroredStrategy`, normalmente hay un `worker` que asume un poco más de responsabilidad, como guardar el punto de control y escribir el archivo de resumen para TensorBoard, además de lo que hace un `worker` normal. 
-A este trabajador se le denomina `trabajador jefe`, y es habitual que el `trabajador` con `índice` 0 sea designado como `trabajador jefe` (de hecho así es como se implementa `tf.distribute.Strategy`).

tarea`:
- Proporciona información de la tarea actual y es diferente en cada trabajador. Especifica el `type` y el `index` de ese trabajador. 

Aquí tienes un ejemplo de configuración:

In [7]:
tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí está el mismo `TF_CONFIG` serializado como una cadena JSON:

In [8]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0}}'

### Explicación del ejemplo TF_CONFIG

En este ejemplo estableces un `TF_CONFIG` con 2 workers en `localhost`. En la práctica, los usuarios crearían múltiples trabajadores en direcciones IP/puertos externos, y configurarían `TF_CONFIG` en cada trabajador apropiadamente.

Dado que has establecido la tarea `type` como `"worker"` y la tarea `index` como `0`, **esta máquina es el primer trabajador y será designada como trabajador jefe**. 

Ten en cuenta que otras máquinas necesitarán tener la variable de entorno `TF_CONFIG` también establecida, y debería tener el mismo dict `cluster`, pero diferente `type` de tarea o `index` de tarea dependiendo de cuáles sean los roles de esas máquinas. Por ejemplo, para el segundo trabajador se establecería `tf_config['task']['index']=1`.


### Quick Note on Environment variables and subprocesses in notebooks

Above, `tf_config` is just a local variable in python. To actually use it to configure training, this dictionary needs to be serialized as JSON, and placed in the `TF_CONFIG` environment variable.

In the next section, you'll spawn new subprocesses for each worker using the `%%bash` magic command. Subprocesses inherit environment variables from their parent, so they can access `TF_CONFIG`. 

You would never really launch your jobs this way (as subprocesses of an interactive Python runtime), but it's how you will do it for the purposes of this tutorial.

## Elegir la estrategia adecuada

En TensorFlow hay dos formas principales de entrenamiento distribuido:

* Entrenamiento síncrono, donde los pasos de entrenamiento están sincronizados entre los trabajadores y las réplicas, y
* Entrenamiento asíncrono, donde los pasos de entrenamiento no están estrictamente sincronizados.

La `MultiWorkerMirroredStrategy`, que es la estrategia recomendada para el entrenamiento síncrono multitrabajador, es la que vas a utilizar.

Para entrenar el modelo, utiliza una instancia de `tf.distribute.MultiWorkerMirroredStrategy`.



In [9]:
strategy = tf.distribute.MultiWorkerMirroredStrategy()

MultiWorkerMirroredStrategy crea copias de todas las variables en las capas del modelo en cada dispositivo a través de todos los trabajadores.  Utiliza `CollectiveOps`, una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas.  La [guía oficial de entrenamiento distribuido de TF](https://www.tensorflow.org/guide/distributed_training) tiene más detalles sobre esto.


### Implementar la formación distribuida mediante gestores de contexto

Para distribuir el entrenamiento a múltiples trabajadores todo lo que necesitas hacer es encerrar la construcción del modelo y la llamada a `model.compile()` dentro de `strategy.scope()`. 

El ámbito de la estrategia de distribución dicta cómo y dónde se crean las variables, y en el caso de `MultiWorkerMirroredStrategy`, las variables creadas son `MirroredVariable`s, y se replican en cada uno de los trabajadores.


In [10]:
# Implementing distributed strategy via a context manager
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

Nota: `TF_CONFIG` se analiza y los servidores GRPC de TensorFlow se inician en el momento en que se llama a `MultiWorkerMirroredStrategy()`, por lo que la variable de entorno `TF_CONFIG` debe estar configurada antes de crear una instancia de `tf.distribute.Strategy`. 

**Dado que la variable `TF_CONFIG` aún no está definida, la estrategia anterior es de hecho un entrenamiento de un único trabajador**.



## Entrenar el modelo

### Crear script de entrenamiento

Para ejecutar realmente con `MultiWorkerMirroredStrategy` necesitarás ejecutar procesos worker y pasarles un `TF_CONFIG`.

Al igual que el fichero `mnist.py` escrito anteriormente, aquí tienes el `main.py` que ejecutará cada uno de los workers:

In [11]:
%%writefile main.py

import os
import json

import tensorflow as tf
import mnist # Your module

# Define batch size
per_worker_batch_size = 64

# Get TF_CONFIG from the env variables and save it as JSON
tf_config = json.loads(os.environ['TF_CONFIG'])

# Infer number of workers from tf_config
num_workers = len(tf_config['cluster']['worker'])

# Define strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()

# Define global batch size
global_batch_size = per_worker_batch_size * num_workers

# Load dataset
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

# Create and compile model following the distributed strategy
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

# Train the model
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Overwriting main.py


En el fragmento de código anterior, observe que el `global_batch_size`, que se pasa a `Dataset.batch`, se establece en `per_worker_batch_size * num_workers`. Esto garantiza que cada trabajador procese lotes de ejemplos de `per_worker_batch_size` independientemente del número de trabajadores.

The current directory should now contain both Python files:

In [12]:
!ls *.py

main.py  mnist.py


### Establecer la variable de entorno TF_CONFIG

Ahora json-serializa el `TF_CONFIG` y añádelo a las variables de entorno:

In [13]:
# Set TF_CONFIG env variable
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Y termina todos los procesos en segundo plano:

In [14]:
# first kill any previous runs
%killbgscripts

All background processes were killed.


### Lanzar el primer trabajador

Ahora, puedes lanzar un proceso worker que ejecutará el `main.py` y usará el `TF_CONFIG`:

In [15]:
%%bash --bg
python main.py &> job_0.log

Hay algunas cosas a tener en cuenta sobre el comando anterior:

1. Utiliza el `%%bash` que es un [cuaderno "mágico"](https://ipython.readthedocs.io/en/stable/interactive/magics.html) para ejecutar algunos comandos bash.
2. 2. Utiliza la bandera `--bg` para ejecutar el proceso `bash` en segundo plano, porque este trabajador no terminará. Espera a todos los workers antes de arrancar.

El proceso worker en segundo plano no imprimirá la salida en este cuaderno, así que el `&>` redirige su salida a un fichero, para que puedas ver lo que ha pasado.

Así que espera unos segundos a que el proceso se inicie:

In [16]:
# Wait for logs to be written to the file
time.sleep(10)

Ahora mira lo que se ha producido en el archivo de registro del trabajador hasta el momento utilizando el comando `cat`:

In [17]:
%%bash
cat job_0.log

2023-01-27 22:47:45.030717: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


La última línea del archivo de registro debería decir: `Started server with target: grpc://localhost:12345`. El primer trabajador ya está listo, y está esperando a que todos los demás trabajadores estén listos para continuar.

### Launch the second worker

Now update the `tf_config` for the second worker's process to pick up:

In [18]:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora lanza el segundo trabajador. Esto iniciará la formación ya que todos los trabajadores están activos (por lo que no hay necesidad de poner en segundo plano este proceso):

In [19]:
%%bash
python main.py

Epoch 1/3
Epoch 2/3
Epoch 3/3


2023-01-27 22:47:55.091586: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2023-01-27 22:47:56.066533: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    

Now if you recheck the logs written by the first worker you'll see that it participated in training that model:

In [21]:
%%bash
cat job_0.log

2023-01-27 22:47:45.030717: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2023-01-27 22:47:56.066533: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    

Unsurprisingly this ran _slower_ than the the test run at the beginning of this tutorial. **Running multiple workers on a single machine only adds overhead**. The goal here was not to improve the training time, but only to give an example of multi-worker training.

-----------------------------
**Congratulations on finishing this ungraded lab!** Now you should have a clearer understanding of how to implement distributed strategies with Tensorflow and Keras. 

Although this tutorial didn't show the true power of a distributed strategy since this will require multiple machines operating under the same network, you now know how this process looks like at a high level. 

In practice and especially with very big models, distributed strategies are commonly used as they provide a way of better managing resources to perform time-consuming tasks, such as training in a fraction of the time that it will take without the strategy.

**Keep it up!**