## Distributed training

In [2]:
import shutil
import os
import json
import shutil
import logging
import numpy
import pprint
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from tensor2tensor.utils import trainer_lib
from tensor2tensor import problems
from tensor2tensor.utils import registry
from tensor2tensor.data_generators import allen_brain
from tensor2tensor.data_generators import allen_brain_utils

import tensorflow as tf

from tensorflow.contrib.eager.python import tfe
tfe.enable_eager_execution()
Modes = tf.estimator.ModeKeys

from tk.util import hack_dict_to_cli_args
from tk import experiment
from tk import util

from tensorboard.backend.event_processing import event_file_loader
from protobuf_to_dict import protobuf_to_dict

logging.getLogger().setLevel(logging.INFO)


Instructions for updating:
Use the retry module or similar alternatives.


### Utils

In [3]:

def _stage(local_app_root, remote_app_root):
    
    if not os.path.exists(local_app_root):
        raise ValueError("Can't stage from a non-existent source, "
                         "saw %s" % local_app_root)

    shutil.copytree(local_app_root, remote_app_root)


def _configure_experiment(base_name, num_gpu_per_worker=1,
                          problem="img2img_allen_brain_dim8to32",
                          model="img2img_transformer",
                          hparams_set="img2img_transformer2d_tiny",
                          batch_size=4,
                          num_steps=100000,
                          num_workers=0,
                          num_ps=0,
                          ps_gpu=1):

    app_root = "/mnt/nfs-east1-d/work/tk"
    
    output_dir = os.path.join(app_root, "output")

    job_name = util.generate_job_name(base_name)

    train_args = {
        "problem": problem,
        "model": model,
        "hparams_set": hparams_set,
        "data_dir": "/mnt/nfs-east1-d/data",
        "output_dir": output_dir,
        "train_steps": num_steps,
        "schedule": "train",
        "profile": False,
        "log_device_placement": False,
        "worker_gpu": num_gpu_per_worker,
        "ps_gpu": ps_gpu,
        "save_checkpoints_secs": 1800,
        "dbgprofile": False, # Saves profiling timelines, viewable in chrome://tracing
        "ssd_mount_path": "/mnt/disks/ssd0",
        "worker_gpu_memory_fraction": 0.95,
        "hparams": "'batch_size=%s'" % batch_size
    }

    args = {
        "job_name": job_name,
        "volume_claim_id": "nfs-east1-d",
        "app_root": app_root,
        "gcp_project": "foo",
        "namespace": "kubeflow",
        "image": "tensorflow/tensorflow:latest-gpu",
        "smoke": True,
        "batch": False,
        "train_args": train_args,
        "cpu": 7,
        "memory": "40Gi",
        "num_gpu": num_gpu_per_worker,
        
        # DEV
        "master_gpu": num_gpu_per_worker,
        "ps_gpu": ps_gpu,
        "worker_gpu": num_gpu_per_worker,
        # --

        "num_local_ssd": 1,
        "no_wait": True,
        "num_worker_replicas": num_workers,
        "num_ps_replicas": num_ps,
        "selector_labels": {
          "cloud.google.com/gke-nodepool": "train-gpu-preemptible-%sx-hm" % num_gpu_per_worker,
          "cloud.google.com/gke-accelerator": "nvidia-tesla-k80"
        }
    }

    local_app_root = args["app_root"]

    testing_storage_base = "/mnt/nfs-east1-d/comparisons/%s" % base_name
    
    remote_app_root = "%s/%s" % (testing_storage_base,
                                 args["job_name"])

    #args["train_args"]["output_dir"] = os.path.join(remote_app_root,
    #                                                "output")
    
    output_dir_root = "gs://kubeflow-rl-checkpoints/comparisons/%s" % base_name
    args["train_args"]["output_dir"] = os.path.join(output_dir_root,
                                                    args["job_name"])
    
    for job_type in ["master", "ps"]:
        
        with open(os.path.join(local_app_root, "%s-job.sh" % job_type), "w") as f:
          f.write("ls /mnt\n")
          f.write("cp -r /mnt/nfs-east1-d/data/* /mnt/ssd0/\n")
          f.write("pip install -e %s/vendor/tensor2tensor\n" % remote_app_root)
          f.write("pip install -e %s\n" % remote_app_root)
          f.write("nvidia-smi\n")
          f.write("python -c 'from tensorflow.python.client import device_lib; print(device_lib.list_local_devices())'\n")
          f.write("echo ${TF_CONFIG}\n")
          f.write("cd %s\n" % remote_app_root)
          cmd = ["python", "-m", "tk.experiment"]
        
          # Can this be accomplished by tk.experiment, detecting task_type from TF_CONFIG?
          #if job_type == "master":
          #  args["train_args"]["schedule"] = "train"
          #elif job_type == "ps":
          #  args["train_args"]["schedule"] = "run_std_server"
          # --

          cmd.extend(hack_dict_to_cli_args(args["train_args"]))
          f.write(" ".join(cmd) + "\n")
          f.write("nvidia-smi\n")
          logging.info(local_app_root)
    
    _stage(local_app_root, remote_app_root)
    args["app_root"] = remote_app_root
    args["batch"] = True

    return args


### Training

Previous

In [4]:
bsize = 1

problem_name = "img2img_allen_brain_dim8to32"

args = _configure_experiment("dist-gcs-shared-3",
                             problem=problem_name,
                             num_gpu_per_worker=1,
                             batch_size=bsize,
                             num_steps=1000,
                             num_workers=0,
                             num_ps=2)

job = experiment.T2TExperiment(**args)

job.run()

[2018-08-23 20:29:08,444] /mnt/nfs-east1-d/work/tk
[2018-08-23 20:29:08,454] /mnt/nfs-east1-d/work/tk
[2018-08-23 20:30:36,732] smoke: True


Experimenting with more GPUs per worker.

In [4]:
bsize = 1

problem_name = "img2img_allen_brain_dim8to32"

args = _configure_experiment("dist-b%s" % bsize,
                             problem=problem_name,
                             num_gpu_per_worker=1,
                             batch_size=bsize,
                             num_steps=10000,
                             num_workers=0,
                             num_ps=2,
                             ps_gpu=1)

job = experiment.T2TExperiment(**args)

job.run()

[2018-08-30 16:46:43,486] /mnt/nfs-east1-d/work/tk
[2018-08-30 16:46:43,520] /mnt/nfs-east1-d/work/tk
[2018-08-30 16:48:44,419] smoke: True
