<a href="https://colab.research.google.com/github/GawainGan/Machine-Learning-Engineering-for-Production-Specialization/blob/main/3-Machine%20Learning%20Modeling%20Pipelines%20in%20Production/W2/C3W3_Colab_Lab1_Distributed_Training_With_Note.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Ungraded lab: Distributed Strategies with TF and Keras
------------------------



Welcome, during this ungraded lab you are going to perform a distributed training strategy using TensorFlow and Keras, specifically the [`tf.distribute.MultiWorkerMirroredStrategy`](https://www.tensorflow.org/api_docs/python/tf/distribute/MultiWorkerMirroredStrategy).

With the help of this strategy, a Keras model that was designed to run on single-worker can seamlessly work on multiple workers with minimal code change. In particular you will:


1. Perform training with a single worker.
2. Understand the requirements for a multi-worker setup (`tf_config` variable) and using context managers for implementing distributed strategies.
3. Use magic commands to simulate different machines.
4. Perform a multi-worker training strategy.

This notebook is based on the official [Multi-worker training with Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras) notebook, which covers some additional topics in case you want a deeper dive into this topic.

[Distributed Training with TensorFlow](https://www.tensorflow.org/guide/distributed_training) guide is also available for an overview of the distribution strategies TensorFlow supports for those interested in a deeper understanding of `tf.distribute.Strategy` APIs.

Let's get started!

## Setup

First, some necessary imports.

In [1]:
import os
import sys
import json
import time

# Log additional outputs from TF's C++ backend
# '0'表示输出所有日志，包括信息日志、警告日志、错误日志。这样做的目的是为了能看到更多底层的日志信息，便于调试。
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'

Before importing TensorFlow, make a few changes to the environment.

- Disable all GPUs. This prevents errors caused by the workers all trying to use the same GPU. **For a real application each worker would be on a different machine.**


- Add the current directory to python's path so modules in this directory can be imported.

In [2]:
# Disable GPUs
# 禁用所有的GPU。这样做是因为在分布式训练中，
#       每个工作节点（worker）都可能尝试使用相同的GPU资源，从而导致冲突和错误。
# 在实际应用中，每个工作节点应该位于不同的机器上，各自使用不同的GPU资源。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# Add current directory to path
if '.' not in sys.path:
  sys.path.insert(0, '.')

The previous step is important since this notebook relies on writting files using the magic command `%%writefile` and then importing them as modules.(写入代码并在sys.path当前路径下生成.py文件)

Now that the environment configuration is ready, import TensorFlow.


In [3]:
import tensorflow as tf

# Ignore warnings
tf.get_logger().setLevel('ERROR')

### Dataset and model definition

Next create an `mnist.py` file with a simple model and dataset setup. This python file will be used by the worker-processes in this tutorial.

The name of this file derives from the dataset you will be using which is called [mnist](https://keras.io/api/datasets/mnist/) and consists of 60,000 28x28 grayscale images of the first 10 digits.

In [4]:
%%writefile mnist.py

# import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  # 加载数据集，归一化，x_train转换为float32，y_train转化为int64，
  # 使用tf.data.Dataset.from_tensor_slices 创建TF数据集object，随机打乱数据，且无限重复，最后确定batch大小
  # Load the data
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # Normalize pixel values for x_train and cast to float32
  x_train = x_train / np.float32(255)
  # Cast y_train to int64
  y_train = y_train.astype(np.int64)
  # Define repeated and shuffled dataset
  train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset


def build_and_compile_cnn_model():
  # Define simple CNN model using Keras Sequential
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  # Compile model
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])

  return model

Writing mnist.py


Check that the file was succesfully created:

In [26]:
# linux commend 确认该.py文件是否已经创建
!ls mnist.py

mnist.py


Import the mnist module you just created and try training the model for a small number of epochs to observe the results of a single worker to make sure everything works correctly.

In [6]:
# Import your mnist model
import mnist

# Set batch size
batch_size = 64

# Load the dataset
single_worker_dataset = mnist.mnist_dataset(batch_size)

# Load compiled CNN model
single_worker_model = mnist.build_and_compile_cnn_model()

# As training progresses, the loss should drop and the accuracy should increase.
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.src.callbacks.History at 0x7f3b37da7cd0>

Everything is working as expected!

Now you will see how multiple workers can be used as a distributed strategy.

## Multi-worker Configuration

Now let's enter the world of multi-worker training. In TensorFlow, the `TF_CONFIG` environment variable is required for training on multiple machines, each of which possibly has a different role. `TF_CONFIG` is a JSON string used to specify the cluster configuration on each worker that is part of the cluster.

```
当进行多机器训练时，TensorFlow需要TF_CONFIG环境变量来指定每台机器的角色以及集群配置。TF_CONFIG是一个JSON字符串，包含cluster和task两个组成部分
```

There are two components of `TF_CONFIG`: `cluster` and `task`.

Let's dive into how they are used:

`cluster`:
- **It is the same for all workers** and provides information about the training cluster, which is a dict consisting of different types of jobs such as `worker`.

```
cluster部分对于所有工作节点来说是相同的。它提供了关于训练集群的信息，这是一个字典，包含了不同类型的作业（job），比如worker。
```

- In multi-worker training with `MultiWorkerMirroredStrategy`, there is usually one `worker` that takes on a little more responsibility like saving checkpoint and writing summary file for TensorBoard in addition to what a regular `worker` does.

```
常有一个worker会承担更多的责任，如保存检查点（checkpoint）和为TensorBoard写入摘要文件（summary file），除了执行常规worker的任务。
```

- Such a worker is referred to as the `chief` worker, and it is customary that the `worker` with `index` 0 is appointed as the chief `worker` (in fact this is how `tf.distribute.Strategy` is implemented).

```
这样的worker被称为chief（主要的）工作节点。通常习惯上，索引为0的worker被指定为chief工作节点（实际上，这也是tf.distribute.Strategy的实现方式）。
```

`task`:
- Provides information of the current task and is different on each worker. It specifies the `type` and `index` of that worker.

```
task部分在每个工作节点上是不同的，它提供了当前任务的信息，指定了该工作节点的type（类型）和index（索引）。
```

Here is an example configuration:

In [7]:
tf_config = {
    # 集群配置，设定了两个工作节点
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    # 当前任务配置：当前节点被配置为 worker，index为0 即第一个工作节点
    'task': {'type': 'worker', 'index': 0}
}

Here is the same `TF_CONFIG` serialized as a JSON string:

In [8]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0}}'

### Explaining the TF_CONFIG example

In this example you set a `TF_CONFIG` with 2 workers on `localhost`. In practice, users would create multiple workers on external IP addresses/ports, and set `TF_CONFIG` on each worker appropriately.

```
在这个TF_CONFIG示例中，您在localhost上设置了2个工作节点。
在实际使用中，用户会在不同的外部IP地址/端口创建多个工作节点，并且在每个工作节点上相应地设置TF_CONFIG
```

Since you set the task `type` to `"worker"` and the task `index` to `0`, **this machine is the first worker and will be appointed as the chief worker**.

```
因为您将任务type设置为"worker"并且任务index设置为0，这台机器就是第一个工作节点，并将被任命为主要工作节点（chief worker）
```

Note that other machines will need to have the `TF_CONFIG` environment variable set as well, and it should have the same `cluster` dict, but different task `type` or task `index` depending on what the roles of those machines are. For instance, for the second worker you would set `tf_config['task']['index']=1`.

```
注意，其他机器也需要设置TF_CONFIG环境变量，它们应该有相同的cluster字典，但是根据这些机器的角色，task部分的type或index应该不同。
例如，对于第二个工作节点，您应该设置tf_config['task']['index']=1
```

### Quick Note on Environment variables and subprocesses in notebooks

Above, `tf_config` is just a local variable in python. To actually use it to configure training, this dictionary needs to be serialized as JSON, and placed in the `TF_CONFIG` environment variable.

In the next section, you'll spawn new subprocesses for each worker using the `%%bash` magic command. Subprocesses inherit environment variables from their parent, so they can access `TF_CONFIG`.

You would never really launch your jobs this way (as subprocesses of an interactive Python runtime), but it's how you will do it for the purposes of this tutorial.

## Choose the right strategy

In TensorFlow there are two main forms of distributed training:

* Synchronous training, where the steps of training are synced across the workers and replicas, and
* Asynchronous training, where the training steps are not strictly synced.

```
同步训练，训练的步骤在工作节点和副本之间同步，以及
异步训练，训练步骤并不严格同步。
```

`MultiWorkerMirroredStrategy`, which is the recommended strategy for synchronous multi-worker training is the one you will be using.

```
MultiWorkerMirroredStrategy在所有工作节点的每个设备上创建模型层中所有变量的副本。它使用CollectiveOps（一个用于集体通信的TensorFlow操作）来聚合梯度并保持变量同步
```

To train the model, use an instance of `tf.distribute.MultiWorkerMirroredStrategy`.



In [9]:
strategy = tf.distribute.MultiWorkerMirroredStrategy()

`MultiWorkerMirroredStrategy` creates copies of all variables in the model's layers on each device across all workers.  It uses `CollectiveOps`, a TensorFlow op for collective communication, to aggregate gradients and keep the variables in sync.  The [official TF distributed training guide](https://www.tensorflow.org/guide/distributed_training) has more details about this.


### Implement Distributed Training via Context Managers

To distribute the training to multiple-workers all you need to do is to enclose the model building and `model.compile()` call inside `strategy.scope()`.

The distribution strategy's scope dictates how and where the variables are created, and in the case of `MultiWorkerMirroredStrategy`, the variables created are `MirroredVariable`s, and they are replicated on each of the workers.


In [10]:
# Implementing distributed strategy via a context manager
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

Note: `TF_CONFIG` is parsed and TensorFlow's GRPC servers are started at the time `MultiWorkerMirroredStrategy()` is called, so the `TF_CONFIG` environment variable must be set before a `tf.distribute.Strategy` instance is created.

**Since `TF_CONFIG` is not set yet the above strategy is effectively single-worker training**.



## Train the model

### Create training script

To actually run with `MultiWorkerMirroredStrategy` you'll need to run worker processes and pass a `TF_CONFIG` to them.

Like the `mnist.py` file written earlier, here is the `main.py` that each of the workers will run:

In [11]:
%%writefile main.py

import os
import json

import tensorflow as tf
import mnist # Your module

# Define batch size 定义每个工作节点的batch size
per_worker_batch_size = 64

# Get TF_CONFIG from the env variables and save it as JSON
# # 从环境变量中获取TF_CONFIG并将其解析为JSON
tf_config = json.loads(os.environ['TF_CONFIG'])

# Infer number of workers from tf_config
# 从tf_config中找出工作节点的数量，从设定上我们有两个workers
num_workers = len(tf_config['cluster']['worker'])

# Define strategy
# 定义分布式策略
strategy = tf.distribute.MultiWorkerMirroredStrategy()

# Define global batch size # 定义全局批处理大小
global_batch_size = per_worker_batch_size * num_workers

# Load dataset
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

# Create and compile model following the distributed strategy
# 在分布式策略范围内创建并编译模型
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

# Train the model
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Writing main.py


In the code snippet above note that the `global_batch_size`, which gets passed to `Dataset.batch`, is set to `per_worker_batch_size * num_workers`. This ensures that each worker processes batches of `per_worker_batch_size` examples regardless of the number of workers.

```
在上述代码片段中，请注意传递给Dataset.batch的global_batch_size设置为per_worker_batch_size * num_workers。
这确保了每个工作节点处理的批次大小为per_worker_batch_size，不管有多少工作节点。

这样做的原因是在多工作节点训练中，每个工作节点都需要处理相同数量的数据，以确保每一步的训练数据在所有工作节点之间保持同步。
设置全局批次大小为所有工作节点的总和，可以保证每次迭代都能处理相同数量的数据，从而优化训练过程并提高模型的训练效率。
```
---
```
per_worker_batch_size和global_batch_size的区别在于它们各自控制的数据量范围不同——前者控制的是单个工作节点的数据量，而后者控制的是整个分布式系统的数据量。

在实践中，正确设置这两个参数对于实现高效、可扩展的分布式训练至关重要。
它们确保了分布式训练过程中数据的一致性和平衡，有助于优化模型训练的性能和准确性。
```

The current directory should now contain both Python files:

In [27]:
!ls *.py

main.py  mnist.py


### Set TF_CONFIG environment variable

Now json-serialize the `TF_CONFIG` and add it to the environment variables:

```
tf_config = {
    # 集群配置，设定了两个工作节点
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    # 当前任务配置：当前节点被配置为 worker，index为0 即第一个工作节点
    'task': {'type': 'worker', 'index': 0}
}
```

In [13]:
# Set TF_CONFIG env variable
os.environ['TF_CONFIG'] = json.dumps(tf_config)

And terminate all background processes:

In [14]:
# first kill any previous runs
%killbgscripts

All background processes were killed.


Before launching the first worker you can check that port `12345` is free at the time:

In [15]:
# This should not print anything at the moment
!lsof -i :12345

在分布式训练的设置中，TF_CONFIG环境变量提供了必要的配置信息，其中包括每个工作节点（worker）的角色和位置。在一个典型的分布式训练任务中，您可能会设置多个工作节点，每个节点都有一个唯一的索引来标识它在集群中的位置。

当准备启动各个工作节点时：

对于第一个工作节点（通常被视为主节点或chief worker），我们会在TF_CONFIG中设置'index': 0。当这个工作节点启动并读取TF_CONFIG时，它会识别自己是集群中的第一个节点。

当我们想要启动第二个或第n个工作节点时，我们需要更新TF_CONFIG中的'task': {'index': n}来反映这个节点的唯一位置，并再次将更新后的TF_CONFIG设置到环境变量中。这样，每个节点在启动时都会读取自己的配置，了解自己在集群中的角色和索引。

因此，对于每个工作节点，特别是非第一个工作节点，我们需要明确指定它的索引，并更新TF_CONFIG环境变量，以确保每个节点都能准确地了解自己在整个训练过程中的角色和位置。这是分布式训练中协调各个节点工作的关键步骤。

### Launch the first worker

Now, you can launch a worker process that will run the `main.py` and use the `TF_CONFIG`:

In [16]:
%%bash --bg
python main.py &> job_0.log

There are a few things to note about the above command:

1. It uses the `%%bash` which is a [notebook "magic"](https://ipython.readthedocs.io/en/stable/interactive/magics.html) to run some bash commands.
2. It uses the `--bg` flag to run the `bash` process in the background, because this worker will not terminate. It waits for all the workers before it starts.

The backgrounded worker process won't print output to this notebook, so the `&>` redirects its output to a file, so you can see what happened.

```
它使用了%%bash，这是一个notebook "magic"，用于运行一些bash命令。

它使用了--bg标志来在后台运行bash进程，因为这个工作节点不会自行终止。
它会等待所有工作节点准备好之后才开始。

后台的工作节点进程不会将输出打印到这个notebook中，所以&>被用来将其输出重定向到一个文件中，这样您就可以查看发生了什么。
```

So, wait a few seconds for the process to start up:

In [17]:
# Wait for logs to be written to the file
time.sleep(10)

Now you can check again at the status of port `12345`:

In [18]:
!lsof -i :12345
# 输出内容应包括python3进程正在监听端口12345。

COMMAND PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
python3 659 root    5u  IPv4  34125      0t0  TCP *:12345 (LISTEN)
python3 659 root   12u  IPv4  34131      0t0  TCP localhost:35472->localhost:12345 (ESTABLISHED)
python3 659 root   13u  IPv4  34132      0t0  TCP localhost:12345->localhost:35472 (ESTABLISHED)




这个输出显示了有一个Python进程正在使用12345端口进行通信。具体来说：

- 有一个Python进程（进程ID为659），由`root`用户启动，正在监听（等待其他进程连接）12345端口。
- 同时，这个进程还与本地的另一个端口35472建立了双向通信连接。这意味着两个本地进程正在互相通信，其中一个使用12345端口，另一个使用35472端口。
  - 这两行显示的是同一个连接的两个方向，说明12345端口的服务与另一个本地服务（使用35472端口）进行了通信，并且连接已经建立（ESTABLISHED）。

简而言之，一个Python进程（PID为659），以root用户身份运行，正在监听12345端口，并且已经与本地的另一个服务（使用35472端口）建立了TCP连接。这通常是分布式训练或其他基于网络的应用中的常见情况，其中不同的进程或服务之间需要进行通信。

Now look what's been output to the worker's logfile so far using the `cat` command:

In [19]:
%%bash
cat job_0.log

2024-04-05 05:15:09.174309: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-05 05:15:09.174432: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-05 05:15:09.176157: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-04-05 05:15:09.186289: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-04-05 05:15:12.650015: I tensorflow/core/distrib

The last line of the log file should say: `Started server with target: grpc://localhost:12345`. The first worker is now ready, and is waiting for all the other worker(s) to be ready to proceed.

### Launch the second worker

Now update the `tf_config` for the second worker's process to pick up:

In [20]:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Now launch the second worker. This will start the training since all the workers are active (so there's no need to background this process):

In [21]:
%%bash
python main.py

Epoch 1/3
Epoch 2/3
Epoch 3/3


2024-04-05 05:15:19.284948: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-05 05:15:19.285145: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-05 05:15:19.286540: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-04-05 05:15:19.294786: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-04-05 05:15:22.763820: I tensorflow/core/distrib

Now if you recheck the logs written by the first worker you'll see that it participated in training that model:

In [22]:
%%bash
cat job_0.log

2024-04-05 05:15:09.174309: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-05 05:15:09.174432: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-05 05:15:09.176157: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-04-05 05:15:09.186289: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-04-05 05:15:12.650015: I tensorflow/core/distrib

Unsurprisingly this ran _slower_ than the the test run at the beginning of this tutorial. **Running multiple workers on a single machine only adds overhead**. The goal here was not to improve the training time, but only to give an example of multi-worker training.

### worker 0 & 1 & model training的步骤(by log)

1. **第一个工作节点的准备就绪**：
   - 第一个工作节点（`worker index = 0`）启动后，它在`localhost`的`12345`端口上启动了一个gRPC服务器，准备好接收来自其他工作节点的连接。这表明第一个工作节点已经准备好了，正在等待其他工作节点的加入。
   - 日志中的`/job:worker/replica:0/task:0 has connected to coordination service`表明第一个工作节点已经与协调服务建立了连接，准备进行分布式训练。

2. **第二个工作节点的启动**：
   - 当第二个工作节点（`worker index = 1`）启动并运行`main.py`时，它也与协调服务建立了连接（如日志中的`/job:worker/replica:0/task:1 has connected to coordination service`所示）。这表明第二个工作节点也准备好了，集群中的所有工作节点都已就绪。

3. **分布式训练的开始**：
   - 一旦所有工作节点都准备好，分布式训练就开始了。训练过程中，模型的每个训练周期（epoch）的损失（loss）和准确率（accuracy）被记录下来，显示了训练过程的进展。

总之，TensorFlow的分布式训练需要所有工作节点都准备就绪后才能开始。第一个工作节点启动后进入等待状态，直到所有工作节点都就绪。在您的例子中，第二个工作节点的加入触发了训练过程的开始，接着模型按照指定的训练周期进行了训练。

-----------------------------
**Congratulations on finishing this ungraded lab!** Now you should have a clearer understanding of how to implement distributed strategies with Tensorflow and Keras.

Although this tutorial didn't show the true power of a distributed strategy since this will require multiple machines operating under the same network, you now know how this process looks like at a high level.

In practice and especially with very big models, distributed strategies are commonly used as they provide a way of better managing resources to perform time-consuming tasks, such as training in a fraction of the time that it will take without the strategy.

**Keep it up!**