# Entrenamiento Multi-worker con Keras

## Resumen

Este tutorial demuestra el entrenamiento distribuido de múltiples trabajadores con el modelo Keras usando la API `tf.distribute.Strategy`, específicamente `tf.distribute.MultiWorkerMirroredStrategy`. Con la ayuda de esta estrategia, un modelo Keras que fue diseñado para ejecutarse en un solo trabajador puede trabajar sin problemas en múltiples trabajadores con un cambio mínimo de código.


## Setup

Primero, algunas importaciones necesarias.

In [None]:
import json
import os
import sys

Antes de importar TensorFlow, haz algunos cambios en el entorno.

Desactiva todas las GPUs. Esto evita los errores causados por los trabajadores que intentan utilizar la misma GPU. Para una aplicación real cada trabajador estaría en una máquina diferente.

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Restablece la variable de entorno `TF_CONFIG`, verás más sobre esto más adelante.

In [None]:
os.environ.pop('TF_CONFIG', None)

Asegúrese de que el directorio actual está en la ruta de python. Esto permite al cuaderno importar los archivos escritos por `%%writefile` más tarde.


In [None]:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Ahora importamos TensorFlow.

In [None]:
import tensorflow as tf

### Conjunto de datos y definición del modelo

A continuación, cree un archivo `mnist.py` con un modelo simple y la configuración del conjunto de datos. Este archivo python será utilizado por los procesos de los trabajadores en este tutorial:

In [None]:
%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

Writing mnist.py


Prueba a entrenar el modelo durante un pequeño número de épocas y observa los resultados de un solo trabajador para asegurarte de que todo funciona correctamente. A medida que avanza el entrenamiento, la pérdida debería disminuir y la precisión debería aumentar.

In [None]:
import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7fdc45609a90>

## Configuración Multi-worker

Ahora vamos a entrar en el mundo del entrenamiento en múltiples máquinas. En TensorFlow, la variable de entorno `TF_CONFIG` es necesaria para entrenar en múltiples máquinas, cada una de las cuales posiblemente tenga un rol diferente. `TF_CONFIG` es una cadena JSON utilizada para especificar la configuración del cluster en cada trabajador que forma parte del cluster.
Here is an example configuration:

In [None]:
tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Aquí está el mismo `TF_CONFIG` serializado como una cadena JSON:

In [None]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0}}'

Hay dos componentes de `TF_CONFIG`: `cluster` y `task`.

* El `cluster` es el mismo para todos los trabajadores y proporciona información sobre el cluster de entrenamiento, que es un dictado que consiste en diferentes tipos de trabajos como `worker`. En el entrenamiento multitrabajador con `MultiWorkerMirroredStrategy`, normalmente hay un `trabajador` que asume un poco más de responsabilidad como guardar el punto de control y escribir el archivo de resumen para TensorBoard además de lo que hace un `trabajador` normal. A este trabajador se le denomina "trabajador jefe", y es habitual que el "trabajador" con "índice" 0 sea designado como "trabajador jefe" (de hecho, así es como se implementa "tf.distribute.Strategy").

* `task` proporciona información de la tarea actual y es diferente en cada trabajador. Especifica el `tipo` y el `índice` de ese trabajador. 

In this example, you set the task `type` to `"worker"` and the task `index` to `0`. This machine is the first worker and will be appointed as the chief worker and do more work than the others. Note that other machines will need to have the `TF_CONFIG` environment variable set as well, and it should have the same `cluster` dict, but different task `type` or task `index` depending on what the roles of those machines are.


A modo de ilustración, este tutorial muestra cómo se puede establecer un `TF_CONFIG` con 2 trabajadores en `localhost`.  En la práctica, los usuarios crearían múltiples trabajadores en direcciones IP/puertos externos, y configurarían el `TF_CONFIG` en cada trabajador apropiadamente.

En este ejemplo se utilizarán 2 workers, el "TF_CONFIG" del primer worker se muestra arriba. Para el segundo trabajador, se establecerá `tf_config['task']['index']=1`.

Arriba, `tf_config` es sólo una variable local en python. Para usarlo realmente para configurar el entrenamiento, este diccionario necesita ser serializado como JSON, y colocado en la variable de entorno `TF_CONFIG`.

### Variables de entorno y subprocesos en los cuadernos

Los subprocesos heredan las variables de entorno de su padre. Así que si usted establece una variable de entorno en este proceso `jupyter notebook`:

In [None]:
os.environ['GREETINGS'] = 'Hello TensorFlow!'

You can access the environment variable from a subprocesses:

In [None]:
%%bash
echo ${GREETINGS}

Hello TensorFlow!


En la siguiente sección, usarás esto para pasar el `TF_CONFIG` a los subprocesos de los trabajadores. Realmente nunca lanzarás tus trabajos de esta manera, pero es suficiente para los propósitos de este tutorial: Demostrar un ejemplo mínimo de multi-trabajadores.

## Elegir la estrategia adecuada

En TensorFlow hay dos formas principales de entrenamiento distribuido:

* Formación sincrónica, en la que los pasos de la formación se sincronizan entre los trabajadores y las réplicas, y
* Entrenamiento asíncrono, donde los pasos de entrenamiento no están estrictamente sincronizados.

En esta guía se mostrará `MultiWorkerMirroredStrategy`, que es la estrategia recomendada para el entrenamiento síncrono de varios trabajadores.
Para entrenar el modelo, utilice una instancia de `tf.distribute.MultiWorkerMirroredStrategy`.

MultiWorkerMirroredStrategy" crea copias de todas las variables en las capas del modelo en cada dispositivo a través de todos los trabajadores.  Utiliza `CollectiveOps`, una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas.  

In [None]:
strategy = tf.distribute.MultiWorkerMirroredStrategy()

INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO


Note: `TF_CONFIG` is parsed and TensorFlow's GRPC servers are started at the time `MultiWorkerMirroredStrategy()` is called, so the `TF_CONFIG` environment variable must be set before a `tf.distribute.Strategy` instance is created. Since `TF_CONFIG` is not set yet the above strategy is effectively single-worker training.

`MultiWorkerMirroredStrategy` provides multiple implementations via the [`CommunicationOptions`](https://www.tensorflow.org/api_docs/python/tf/distribute/experimental/CommunicationOptions) parameter.  `RING` implements ring-based collectives using gRPC as the cross-host communication layer.  `NCCL` uses [Nvidia's NCCL](https://developer.nvidia.com/nccl) to implement collectives.  `AUTO` defers the choice to the runtime.  The best choice of collective implementation depends upon the number and kind of GPUs, and the network interconnect in the cluster.

## Train the model

With the integration of `tf.distribute.Strategy` API into `tf.keras`, the only change you will make to distribute the training to multiple-workers is enclosing the model building and `model.compile()` call inside `strategy.scope()`. The distribution strategy's scope dictates how and where the variables are created, and in the case of `MultiWorkerMirroredStrategy`, the variables created are `MirroredVariable`s, and they are replicated on each of the workers.


In [None]:
with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()



Nota: Actualmente hay una limitación en `MultiWorkerMirroredStrategy` donde las operaciones TensorFlow necesitan ser creadas después de que la instancia de la estrategia sea creada. Si ves `RuntimeError: Collective ops must be configured at program startup`, prueba a crear la instancia de `MultiWorkerMirroredStrategy` al principio del programa y pon el código que pueda crear ops después de que se instancie la estrategia.

Para ejecutar realmente con `MultiWorkerMirroredStrategy` necesitarás ejecutar procesos worker y pasarles un `TF_CONFIG`.

Al igual que el archivo `mnist.py` escrito anteriormente, aquí está el `main.py` que ejecutará cada uno de los trabajadores:

In [None]:
%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Writing main.py


En el fragmento de código anterior, observe que el `tamaño_de_lote` global, que se pasa a `Dataset.batch`, se establece en `tamaño_de_lote_por_trabajador * número_de_trabajadores`. Esto asegura que cada trabajador procesa lotes de ejemplos de `per_worker_batch_size` independientemente del número de trabajadores.

El directorio actual contiene ahora ambos archivos Python:

In [None]:
%%bash
ls *.py

main.py
mnist.py


Así que json-serializa el `TF_CONFIG` y lo añade a las variables de entorno:

In [None]:
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora, puedes lanzar un proceso trabajador que ejecutará el `main.py` y utilizará el `TF_CONFIG`:

In [None]:
# first kill any previous runs
%killbgscripts

All background processes were killed.


In [None]:
%%bash --bg
python main.py &> job_0.log

Starting job # 0 in a separate thread.


Hay que tener en cuenta algunas cosas sobre el comando anterior:

1. Utiliza el `%%bash` que es un [cuaderno "mágico"](https://ipython.readthedocs.io/en/stable/interactive/magics.html) para ejecutar algunos comandos bash.
2. Utiliza la bandera `--bg` para ejecutar el proceso `bash` en segundo plano, ya que este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.

El proceso trabajador en segundo plano no imprimirá la salida en este cuaderno, por lo que el `&>` redirige su salida a un archivo, para que puedas ver lo que sucedió.

Entonces, espera unos segundos para que el proceso se inicie:

In [None]:
import time
time.sleep(10)

Ahora mira lo que ha salido en el archivo de registro del trabajador hasta ahora:

In [None]:
%%bash
cat job_0.log

2021-06-12 20:10:34.289549: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-12 20:10:35.769201: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-12 20:10:35.786211: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-12 20:10:35.786283: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (e8c0ca859ec5): /proc/driver/nvidia/version does not exist
2021-06-12 20:10:35.786993: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06

La última línea del archivo de registro debería decir: `Started server with target: grpc://localhost:12345`. El primer trabajador está ahora listo, y está esperando que todos los demás trabajadores estén listos para proceder.

Así que actualiza el `tf_config` para que el proceso del segundo trabajador lo recoja:

In [None]:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ahora lance el segundo trabajador. Esto iniciará la formación ya que todos los trabajadores están activos (por lo que no es necesario poner en segundo plano este proceso):

In [None]:
%%bash
python main.py

Epoch 1/3
Epoch 2/3
Epoch 3/3


2021-06-12 20:10:44.358042: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-12 20:10:45.786099: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-12 20:10:45.799037: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-12 20:10:45.799158: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (e8c0ca859ec5): /proc/driver/nvidia/version does not exist
2021-06-12 20:10:45.799871: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06

Ahora, si vuelves a revisar los registros escritos por el primer trabajador, verás que participó en el entrenamiento de ese modelo:

In [None]:
%%bash
cat job_0.log

2021-06-12 20:10:34.289549: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-12 20:10:35.769201: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-12 20:10:35.786211: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-12 20:10:35.786283: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (e8c0ca859ec5): /proc/driver/nvidia/version does not exist
2021-06-12 20:10:35.786993: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06

No es de extrañar que esto funcione mas lento que la prueba realizada al principio de este tutorial. Ejecutar múltiples trabajadores en una sola máquina sólo añade sobrecarga. El objetivo aquí no era mejorar el tiempo de entrenamiento, sino sólo dar un ejemplo de entrenamiento con múltiples trabajadores.

In [None]:
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts

All background processes were killed.


## Entrenamiento profundo del Multi worker

Hasta ahora este tutorial ha demostrado una configuración básica de multi-trabajadores. El resto de este documento examina en detalle otros factores que pueden ser útiles o importantes para casos de uso real.

### Desagregación de conjuntos de datos

En el entrenamiento con varios trabajadores, es necesario fragmentar los conjuntos de datos para garantizar la convergencia y el rendimiento.

El ejemplo de la sección anterior se basa en el autosharding por defecto proporcionado por la API `tf.distribute.Strategy`. Puedes controlar la fragmentación configurando el parámetro `tf.data.experimental.AutoShardPolicy` de la API `tf.data.experimental.DistributeOptions`.

Aquí hay un ejemplo rápido de cómo desactivar la fragmentación automática, para que cada réplica procese cada ejemplo (no se recomienda):


In [None]:
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

### Evaluación

Si pasas `datos_de_validación` a `model.fit`, se alternará entre el entrenamiento y la evaluación para cada época. La evaluación que toma `validation_data` se distribuye entre el mismo conjunto de trabajadores y los resultados de la evaluación se agregan y están disponibles para todos los trabajadores. Al igual que en el caso del entrenamiento, el conjunto de datos de validación se fragmenta automáticamente a nivel de archivo. Es necesario establecer un tamaño de lote global en el conjunto de datos de validación y establecer `validation_steps`. También se recomienda un conjunto de datos repetido para la evaluación.

Como alternativa, también puede crear otra tarea que lea periódicamente los puntos de control y ejecute la evaluación. Esto es lo que hace Estimator. Pero esta no es una forma recomendada de realizar la evaluación y por ello se omiten sus detalles.

### Rendimiento

Ahora tienes un modelo Keras que está configurado para ejecutarse en múltiples trabajadores con `MultiWorkerMirroredStrategy`.  Puedes probar las siguientes técnicas para ajustar el rendimiento del entrenamiento multitrabajador con `MultiWorkerMirroredStrategy`.

*   `MultiWorkerMirroredStrategy` proporciona múltiples implementaciones de comunicación colectiva.  `RING` implementa colectivos basados en anillos utilizando gRPC como capa de comunicación entre hosts.  NCCL" utiliza NCCL de Nvidia para implementar los colectivos.  El programa `AUTO` deja la elección en manos del tiempo de ejecución.  La mejor elección de la implementación colectiva depende del número y tipo de GPUs, y de la interconexión de red en el cluster.  Para anular la elección automática, especifica el parámetro `communication_options` del constructor de `MultiWorkerMirroredStrategy`, por ejemplo `communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)`.
*    Convierta las variables en `tf.float` si es posible.  El modelo oficial de ResNet incluye un ejemplo de cómo hacerlo.


### Tolerancia a los fallos

En el entrenamiento síncrono, el clúster fallaría si uno de los trabajadores falla y no existe ningún mecanismo de recuperación de fallos. El uso de Keras con `tf.distribute.Strategy` tiene la ventaja de la tolerancia a fallos en caso de que los trabajadores mueran o sean inestables. Esto se consigue preservando el estado de entrenamiento en el sistema de archivos distribuido que elijas, de forma que al reiniciar la instancia que previamente ha fallado o se ha adelantado, el estado de entrenamiento se recupera.

Cuando un trabajador no está disponible, otros trabajadores fallarán (posiblemente después de un tiempo de espera). En estos casos, el trabajador no disponible debe ser reiniciado, así como los otros trabajadores que han fallado.

#### ModelCheckpoint callback

El callback `ModelCheckpoint` ya no proporciona la funcionalidad de tolerancia a fallos, por favor, utilice el callback `BackupAndRestore` en su lugar.

La llamada de retorno `ModelCheckpoint` puede seguir utilizándose para guardar los puntos de control. Pero con esto, si el entrenamiento fue interrumpido o terminado con éxito, para continuar el entrenamiento desde el punto de control, el usuario es responsable de cargar el modelo manualmente.

Opcionalmente el usuario puede elegir guardar y restaurar el modelo/pesos fuera del callback `ModelCheckpoint`.

### Model saving and loading

Para guardar tu modelo usando `model.save` o `tf.saved_model.save`, el destino para guardar tiene que ser diferente para cada trabajador. En los trabajadores que no son jefes, tendrás que guardar el modelo en un directorio temporal, y en el jefe, tendrás que guardar en el directorio del modelo proporcionado. Los directorios temporales en el trabajador necesitan ser únicos para evitar errores resultantes de múltiples trabajadores tratando de escribir en la misma ubicación. El modelo guardado en todos los directorios es idéntico y típicamente sólo el modelo guardado por el jefe debe ser referenciado para restaurar o servir. Deberías tener alguna lógica de limpieza que borre los directorios temporales creados por los trabajadores una vez que tu entrenamiento haya terminado.

La razón por la que necesitas guardar en el jefe y los trabajadores al mismo tiempo es porque podrías estar agregando variables durante el checkpointing, lo que requiere que tanto el jefe como los trabajadores participen en el protocolo de comunicación allreduce. Por otro lado, dejar que el jefe y los trabajadores guarden en el mismo directorio del modelo dará lugar a errores debido a la contención.

Con `MultiWorkerMirroredStrategy`, el programa se ejecuta en cada trabajador, y para saber si el trabajador actual es el jefe, aprovecha el objeto resolutor del cluster que tiene los atributos `task_type` y `task_id`. El `task_type` te dice cuál es el trabajo actual (por ejemplo, `trabajador`), y el `task_id` te dice el identificador del trabajador. El trabajador con id 0 es designado como el trabajador principal.

En el siguiente fragmento de código, `write_filepath` proporciona la ruta del archivo a escribir, que depende del id del trabajador. En el caso del jefe (trabajador con id 0), escribe en la ruta de archivo original; para los demás, crea un directorio temporal (con id en la ruta de directorio) para escribir en él:

In [None]:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con eso, ya estás listo para ahorrar:

In [None]:
multi_worker_model.save(write_model_path)

INFO:tensorflow:Assets written to: /tmp/keras-model/assets


Como se ha descrito anteriormente, más adelante el modelo sólo debe cargarse desde la ruta en la que guardó el jefe, así que vamos a eliminar los temporales que guardaron los trabajadores no jefes:

In [None]:
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ahora, cuando es el momento de cargar, vamos a utilizar la conveniente API `tf.keras.models.load_model`, y continuar con el trabajo posterior. Aquí, asumimos que sólo se utiliza un trabajador para cargar y continuar el entrenamiento, en cuyo caso no se llama a `tf.keras.models.load_model` dentro de otra `strategy.scope()`.

In [None]:
loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fdc40681450>

### Checkpoint saving and restoring

Por otro lado, el checkpointing te permite guardar los pesos del modelo y restaurarlos sin tener que guardar todo el modelo. Aquí, crearás un `tf.train.Checkpoint` que rastrea el modelo, el cual es gestionado por un `tf.train.CheckpointManager` para que sólo se conserve el último checkpoint. 

In [None]:
checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una vez que el `CheckpointManager` está configurado, ahora estás listo para guardar, y eliminar los puntos de control que los trabajadores no jefes guardaron.

In [None]:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ahora, cuando necesites restaurar, puedes encontrar el último punto de control guardado utilizando la práctica función `tf.train.latest_checkpoint`. Después de restaurar el punto de control, puedes continuar con el entrenamiento.

In [None]:
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fdc3fab38d0>

#### BackupAndRestore callback

La llamada de retorno BackupAndRestore proporciona una funcionalidad de tolerancia a fallos haciendo una copia de seguridad del modelo y del número de época actual en un archivo de control temporal bajo el argumento `backup_dir` de `BackupAndRestore`. Esto se hace al final de cada época.

Cuando los trabajos se interrumpen y se reinician, la llamada de retorno restaura el último punto de control, y el entrenamiento continúa desde el principio de la época interrumpida. Cualquier entrenamiento parcial ya realizado en la época inacabada antes de la interrupción será descartado, para que no afecte al estado final del modelo.

Para utilizarlo, proporciona una instancia de `tf.keras.callbacks.experimental.BackupAndRestore` en la llamada `tf.keras.Model.fit()`.

Con MultiWorkerMirroredStrategy, si un trabajador se interrumpe, todo el cluster se detiene hasta que el trabajador interrumpido se reinicie. Otros trabajadores también se reiniciarán, y el trabajador interrumpido se reincorpora al clúster. Todos los trabajadores entonces leen el archivo de puntos de control previamente guardado y recogen su estado anterior, permitiendo que el cluster vuelva a estar sincronizado. La formación continúa entonces.

La llamada de retorno `BackupAndRestore` utiliza el `CheckpointManager` para guardar y restaurar el estado de entrenamiento, que genera un archivo llamado checkpoint que rastrea los puntos de control existentes junto con el más reciente. Por esta razón, `backup_dir` no debe ser reutilizado para almacenar otros puntos de control con el fin de evitar la colisión de nombres.

Actualmente, la llamada de retorno `BackupAndRestore` soporta un solo trabajador sin estrategia, MirroredStrategy, y multitrabajador con MultiWorkerMirroredStrategy.
A continuación se muestran dos ejemplos para la formación de varios trabajadores y de un solo trabajador.

In [None]:
# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7fdc3f9429d0>

Si inspeccionas el directorio de `backup_dir` que has especificado en `BackupAndRestore`, puedes notar algunos archivos de checkpoint generados temporalmente. Estos archivos son necesarios para recuperar las instancias perdidas anteriormente, y serán eliminados por la biblioteca al final de `tf.keras.Model.fit()` al salir con éxito de su entrenamiento.

Nota: Actualmente BackupAndRestore sólo soporta el modo eager. En el modo gráfico, considere el uso de Save/Restore Model mencionado anteriormente, y proporcionando `initial_epoch` en `model.fit()`.