In [1]:
import os
import sys
import json
import time

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'

In [2]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

if '.' not in sys.path:
  sys.path.insert(0, '.')

In [3]:
import tensorflow as tf

tf.get_logger().setLevel('ERROR')

In [4]:
%%writefile mnist.py

import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset


def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])

  return model

Writing mnist.py


In [5]:
!ls *.py

mnist.py


In [6]:
import mnist
batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.src.callbacks.History at 0x7fca19fcefb0>

In [7]:
tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

In [8]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0}}'

In [9]:
strategy = tf.distribute.MultiWorkerMirroredStrategy()

In [10]:
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

In [11]:
%%writefile main.py

import os
import json

import tensorflow as tf
import mnist
per_worker_batch_size = 64

tf_config = json.loads(os.environ['TF_CONFIG'])

num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers

multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Writing main.py


In [12]:
!ls *.py

main.py  mnist.py


In [13]:
os.environ['TF_CONFIG'] = json.dumps(tf_config)

In [14]:
%killbgscripts

All background processes were killed.


In [15]:
!lsof -i :12345

In [16]:
%%bash --bg
python main.py &> job_0.log

In [17]:
time.sleep(10)

In [18]:
!lsof -i :12345

COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
python3 2736 root    5u  IPv4 100847      0t0  TCP *:12345 (LISTEN)
python3 2736 root   12u  IPv4 101840      0t0  TCP localhost:49630->localhost:12345 (ESTABLISHED)
python3 2736 root   13u  IPv4 101841      0t0  TCP localhost:12345->localhost:49630 (ESTABLISHED)


In [19]:
%%bash
cat job_0.log

2024-07-12 05:29:17.599158: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-12 05:29:17.599260: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-12 05:29:17.601819: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-12 05:29:17.615552: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-12 05:29:21.404556: I tensorflow/core/distrib

In [20]:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

In [21]:
%%bash
cat job_0.log

2024-07-12 05:29:17.599158: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-12 05:29:17.599260: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-12 05:29:17.601819: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-12 05:29:17.615552: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-12 05:29:21.404556: I tensorflow/core/distrib