In [None]:
# ATTENTION: If using non-distributed training, comment out this entire notebook
# cell. This only related to distributed training.

# ATTENTION: Beaker engineers have suggested they will expose the hostnames of
# all nodes in a future update, so this service discovery will no longer be
# needed. When they add this we will remove the service discovery referenced in
# the sample notebook. Removing this is ideal as we hardcode ports which may
# cause port synchronization issues with many running experiments. Until they
# add this, you will need to replicate the service discovery method implemented
# in this cell, but only if your distributed training
# jobs require explicit hostnames for all nodes. This reference service
# discovery implementation is setup for tensorflow, but if using something else
# you would just need to prepare the environment variable differently.

# ATTENTION: Change the below to your information.
BEAKER_WORKSPACE = ""
BEAKER_USERNAME = ""

from beaker import Beaker
import json
import os
import time
import random

print("Sleeping for 60 seconds to allow Beaker to start up")
time.sleep(60)

beaker = Beaker.from_env(default_workspace=BEAKER_WORKSPACE)
experiments = beaker.experiment.list(author=BEAKER_USERNAME)
experiment = experiments[0]

node_hostnames = []
for job in experiment.jobs:
    node = job.node
    print("job node: ", node)
    node = beaker.node.get(node)
    node_hostnames.append(node.hostname)

ports = [32916, 43727, 49122, 19253, 60973, 15021, 47468, 23282, 63942, 54812]

worker_addresses = [f"{host}:{port}" for host, port in zip(node_hostnames, ports)]

# Read BEAKER_REPLICA_RANK from environment variable, default to 0 if not set
replica_rank = int(os.getenv("BEAKER_REPLICA_RANK", 0))

# ATTENTION: This is tensorflow specific. Change if using something different.
tf_config = {
    "cluster": {
        "worker": worker_addresses
    },
    "task": {
        "type": "worker",
        "index": replica_rank
    }
}

# Convert to JSON string with pretty formatting
tf_config_json = json.dumps(tf_config)

# Print the export command to set the environment variable
print(f"export TF_CONFIG='{tf_config_json}'")
os.environ["TF_CONFIG"] = tf_config_json

In [None]:
# simulated work
for i in range(10):
    print(f"Worker {replica_rank} is running iteration {i}")
    time.sleep(60)