In [None]:
import boto3
# the code below assumes that you configure boto3 with your AWS account
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html
ec2 = boto3.resource('ec2')
client = boto3.client('ec2')

In [None]:
experiment_name = 'demo-run'
# ^-- must be unique per experiment

coordinator_type = "c5n.xlarge"
dht_port = 31337
worker_type = "g4dn.xlarge"

num_workers = 64  # adjust per your limits
# note: you will be able to add more workers after the fact

worker_cpus = 2
root_disk_name = "/dev/xvda"
disk_size_gib = 125

aws_key_name = <YOUR AWS KEY NAME>
image_id = "ami-0db67995cd75f5a9f"
subnet = "subnet-18f5b654"
security_group = "sg-a75591d4"

repo_path = "https://github.com/icml2021-submit/moshpit-sgd"
data_path = <URL TO YOUR REPLICA OF PREPROCESSED BOOKCORPUS>

In [None]:
# check that the experiment name is unique.
# disable this if you want to add more instances to an existing experiment
existing_instances = ec2.instances.filter(Filters=[
    {'Name': 'instance-state-name', 'Values': ['running']},
    {'Name': 'tag:experiment', 'Values': [experiment_name]},
])

In [None]:
if list(existing_instances):
    print(f"Already running {experiment_name}: {list(existing_instances)}")

In [None]:
# to remove all instances and spot requests, run this:
existing_instances.terminate()
requests_to_shutdown = []
for request in client.describe_spot_instance_requests()['SpotInstanceRequests']:
    if request['State'] == 'active' and any(
        tag['Key'] == 'experiment' and tag['Value'] == experiment_name
        for tag in request['Tags']):
        requests_to_shutdown.append(request['SpotInstanceRequestId'])
if requests_to_shutdown:
    client.cancel_spot_instance_requests(
        SpotInstanceRequestIds=requests_to_shutdown)

### Stage 1: run coordinator

Coordinator is an instance that welcomes new peers into a decentralized training run. If coordinator is down, new peers can still join by initializing with one of the existing peers.

In [None]:
coordinator_script = f'''#!/bin/bash -ex
exec > >(tee /var/log/user-command.log|logger -t user-data -s 2>/dev/console) 2>&1


# NOTE: docker run must be called without --it as there is no tty
# check machine's /var/log/user-command.log for details

docker run --name trainer_run --ipc=host --net=host anonymoussubmit/moshpit-sgd bash -c """
set -euxo pipefail

git clone {repo_path} moshpit-sgd
cd moshpit-sgd
pip install -r requirements.txt
pip install -e .


cd albert
pip install whatsmyip
python run_first_peer.py --listen_on [::]:{dht_port}

"""
'''

In [None]:
coordinator, = ec2.create_instances(
    ImageId=image_id, InstanceType=coordinator_type,
    MinCount=1, MaxCount=1,
    SecurityGroupIds=[security_group], SubnetId=subnet,
    KeyName=aws_key_name, UserData=coordinator_script,
    TagSpecifications=[{'ResourceType': 'instance', 'Tags': [
        {'Key':'experiment', 'Value': experiment_name},
        {'Key':'role', 'Value': 'first_peer'}
    ]}]
)
coordinator.wait_until_running()
coordinator, = list(ec2.instances.filter(InstanceIds=[coordinator.id]))
coordinator_endpoint = f"{coordinator.public_ip_address}:{dht_port}"

In [None]:
print(coordinator.public_ip_address)

In [None]:
import time
import src

probe = await src.DHTNode.create(listen=False)
for i in range(20):
    ping_response = await probe.protocol.call_ping(coordinator_endpoint)
    if ping_response is not None:
        print("Coordinator is now accessible to workers!")
        break
    else:
        print("Coordinator is not accessible yet, will retry in 30s...")
        time.sleep(30)
else:
    print("Coordinator failed to launch for some reason.")
    print("Check /var/log/user-command.log at ec2-user@{coordinator_endpoint}")
    
# this should normally take 7-12 retries depending on the will of Bezos

### Stage 1: run workers

Workers are preemptible GPU instances that run compute gradients and perform Moshpit averaging. In this example, each worker is a single tesla T4 instance.

In [None]:
worker_script = f'''#!/bin/bash -ex
exec > >(tee /var/log/user-command.log|logger -t user-data -s 2>/dev/console) 2>&1

set -euxo pipefail
cd ~

docker run --name new_run --gpus all --ipc=host --net=host anonymoussubmit/moshpit-sgd bash -c """

wget {data_path}
mkdir -p ~/data
tar xvfz archive.tar.gz -C ~/data

git clone {repo_path} moshpit-sgd
cd moshpit-sgd
pip install -r requirements.txt
pip install -e .

cd albert
ln -s ~/data ./data

python run_trainer.py \
  --output_dir ./outputs --overwrite_output_dir \
  --logging_dir ./logs --logging_first_step --logging_steps 100 \
  --initial_peers {coordinator_endpoint} --seed 0

"""
'''


In [None]:
ec2.create_instances(
    ImageId=image_id, InstanceType=worker_type,
    MinCount=num_workers, MaxCount=num_workers,
    UserData=worker_script,
    SecurityGroupIds=[security_group], SubnetId=subnet,
    KeyName=aws_key_name,
    BlockDeviceMappings=[{"DeviceName": root_disk_name, "Ebs" : { "VolumeSize" : disk_size_gib }}],
    InstanceMarketOptions={
        "MarketType": "spot",
        "SpotOptions": {
            "SpotInstanceType": "persistent",
            "InstanceInterruptionBehavior": "stop"
        }
    },
    TagSpecifications=[{'ResourceType': 'instance', 'Tags': [
        {'Key':'experiment', 'Value': experiment_name},
        {'Key':'role', 'Value': 'gpu_worker'}
    ]}, {'ResourceType': 'spot-instances-request', 'Tags': [
        {'Key':'experiment', 'Value': experiment_name},
        {'Key':'role', 'Value': 'gpu_worker'}
    ]}],
    CpuOptions={
          'CoreCount': worker_cpus,
          'ThreadsPerCore': 2
      },
)

In [None]:
for instance in list(ec2.instances.all()):
    print(instance, instance.public_ip_address, instance.state)

To check the system progress, connect to any running trainer instance via ssh and `tail -f /var/log/user-command.log`