In [1]:
### get current timestamp
import datetime
from pytz import timezone

curent_timestamp = datetime.datetime.now().astimezone(timezone('US/Pacific')).strftime("%Y_%m_%d_%H_%M_%S")

### Create horovod_benchmark folder under mounted s3 bucket
import os
final_output_dir = "/dbfs/mnt/wendao_test2/horovod_benchmark/logs/"
output_dir = 'logs/logs/horovod_logs/'

output_dir = output_dir + f"{curent_timestamp}/"
model_output_dir = output_dir + "model/"

for _path in [output_dir, final_output_dir, model_output_dir]:
  try:
    os.makedirs(_path)
    print(f"{_path} - file created successfully")
  except Exception as ex:
    if FileExistsError:
      print(f"{_path} - file already exist")
    else:
      raise ex

In [2]:
import logging
import sys
import datetime
import glob


### TO DO 
### ADDING SINGLE_INSTANCE, REPEAT INTO LOGGER STRING FORMAT
original_stdout = sys.stdout
original_stderr = sys.stderr

def redirect_stdout(log_filename):
  ### Databricks has weird behavior that need to define redirect stdout in every cell. -- Need follow up with databricks
  ### https://stackoverflow.com/questions/34248908/how-to-prevent-logging-of-pyspark-answer-received-and-command-to-send-messag
  ### logging flushed with py4j error messages so to set logging level to ERROR to silence error
  ### Also leave pyspark/matplotlib silence code just in case
  logging.getLogger("py4j").setLevel(logging.ERROR)
  ##logging.getLogger('pyspark').setLevel(logging.ERROR)
  ##matplotlib_logger = logging.getLogger("matplotlib").setLevel(logging.ERROR)
  
  class StreamToLogger:
    def __init__(self, logger, level):
        # self.level is really like using log.debug(message)
        # at least in my case
        self.level = level
        self.logger = logger

    def write(self, message):
        # if statement reduces the amount of newlines that are
        # printed to the logger
        if (message != '\n') & (message != ''):
          for mess in message.splitlines():
            self.logger.log(self.level, mess.rstrip())
            
    def flush(self):
      pass
#     def flush(self):
#         # create a flush method so things can be flushed when
#         # the system wants to. Not sure if simply 'printing'
#         # sys.stderr is the correct way to do it, but it seemed
#         # to work properly for me.
#         self.level(sys.stderr)
        
  ## Change to pdt timezone (Simply implementation won't consider daylight savings)
  def pdt_timezone(sec, what):
    pdt_time = datetime.datetime.now() - datetime.timedelta(hours=8)
    return pdt_time.timetuple()
  logging.Formatter.converter = pdt_timezone
  
    
  logging.basicConfig(
     level=logging.DEBUG,
     ## format='%(asctime)-%(levelname)s-%(name)s-%(message)s',
     format=('%(asctime)s - %(name)s - %(levelname)s - %(message)s'),
     datefmt="%Y-%m-%d %H:%M:%S",
     filename = log_filename,
     filemode = 'w'
  )
         
  stdout_logger = logging.getLogger('STDOUT')
  sl = StreamToLogger(stdout_logger, logging.INFO)
  sys.stdout = sl
  
  stderr_logger = logging.getLogger('STDERR')
  sle = StreamToLogger(stderr_logger, logging.ERROR)
  sys.stderr = sle
  
def reset_stdout():
  sys.stdout = original_stdout
  sys.stderr = original_stderr
  temp_logger = logging.getLogger()
  temp_logger.handlers = []
  
  
def move_log_to_s3():
  ### Moving all the driver logs to mounted s3 bucket. 
  import glob
  from distutils import dir_util 
  ### Shutile will not overwrit the file so use distutils instead

  list_dir = glob.glob(output_dir)

  for path in list_dir:
    dir_name = os.path.basename(os.path.dirname(path))
    dest = final_output_dir + dir_name
    destination = dir_util.copy_tree(path, dest) 

def save_model_single(filename):
  import shutil
  dest_dir = output_dir + str(np_setup) + "/" 
  if not os.path.exists(dest_dir):
    os.makedirs(dest_dir)
  shutil.copy(filename, dest_dir + filename)
  
def save_horovod_model():
  import shutil
  for path in glob.glob(checkpoint_dir+"/*"):
    desc_dir = model_output_dir + "NP" +str(np_setup) + "/"
    if not os.path.exists(desc_dir):
      os.makedirs(desc_dir)
    model_filename = os.path.basename(path)
    shutil.copy(path, desc_dir + model_filename)

In [3]:
dbutils.widgets.removeAll()
dbutils.widgets.text(name = "batch_size", defaultValue = "128")
dbutils.widgets.text(name = "learning_rate", defaultValue = "0.1")
dbutils.widgets.text(name = "epochs", defaultValue = "5")
dbutils.widgets.text(name = "repeat", defaultValue = "3")

In [4]:
batch_size = int(dbutils.widgets.get("batch_size"))
epochs = int(dbutils.widgets.get("epochs"))
repeat = int(dbutils.widgets.get("repeat"))
learning_rate = float(dbutils.widgets.get("learning_rate"))

In [5]:
import time
import os
import datetime
import time
import pandas as pd 
import numpy as np

checkpoint_dir = '/dbfs/ml/MNISTDemo/train/{}/'.format(time.time())

os.makedirs(checkpoint_dir)

In [6]:
# batch_size = 128
# epochs = 5
num_classes = 10

In [7]:
#tf.keras
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense,Dropout,GlobalAveragePooling2D

import horovod.tensorflow.keras as hvd
from tensorflow.keras import backend as K
import tensorflow as tf
#np.set_printoptions(threshold=np.inf)

#https://pypi.org/project/keras-rectified-adam/
# from keras_radam import RAdam

In [8]:


def get_dataset(num_classes, rank=0, size=1):
  (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data('MNIST-data-%d' % rank)
  x_train = x_train[rank::size]
  y_train = y_train[rank::size]
  x_test = x_test[rank::size]
  y_test = y_test[rank::size]
  x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
  x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
  x_train = x_train.astype('float32')
  x_test = x_test.astype('float32')
  x_train /= 255
  x_test /= 255
  y_train = keras.utils.to_categorical(y_train, num_classes)
  y_test = keras.utils.to_categorical(y_test, num_classes)
  return (x_train, y_train), (x_test, y_test)

In [9]:
from tensorflow.keras import models
from tensorflow.keras import layers

def get_model(num_classes):
  model = models.Sequential()
  model.add(layers.Conv2D(32, kernel_size=(3, 3),
                   activation='relu',
                   input_shape=(28, 28, 1)))
  model.add(layers.Conv2D(64, (3, 3), activation='relu'))
  model.add(layers.MaxPooling2D(pool_size=(2, 2)))
  model.add(layers.Dropout(0.25))
  model.add(layers.Flatten())
  model.add(layers.Dense(128, activation='relu'))
  model.add(layers.Dropout(0.5))
  model.add(layers.Dense(num_classes, activation='softmax'))
  return model

In [10]:
def train(learning_rate=1.0):
  (x_train, y_train), (x_test, y_test) = get_dataset(num_classes)
  model = get_model(num_classes)

  optimizer = keras.optimizers.Adadelta(lr=learning_rate)

  model.compile(optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy'])

  model.fit(x_train, y_train,
            batch_size=batch_size,
            epochs=epochs,
            verbose=2,
            validation_data=(x_test, y_test))
  
  return model


In [11]:
### Define output log file name 
env = "PROD"
instance_type = 'SINGLE_INSTANCE'
dataset = 'MNIST'
model_name = 'CNN'

(x_train, y_train), (x_test, y_test) = get_dataset(num_classes)

train_shape = x_train.shape[0]
validation_shape = x_test.shape[0]

def get_cluster_info(hvd_run=True):
  driver_type = sc.getConf().get("spark.databricks.driverNodeTypeId")
  worker_type = "None"
  num_workers = "None"
  if hvd_run == True:
    worker_type = sc.getConf().get("spark.databricks.workerNodeTypeId")
    num_workers = sc.getConf().get("spark.databricks.clusterUsageTags.clusterWorkers")
  return [driver_type, worker_type, num_workers]

driver_type, worker_type, num_workers = get_cluster_info()
np_setup = "NA"

In [12]:
filename = f"benchmark|{env}|{dataset}|{model_name}|{train_shape}|{validation_shape}|{instance_type}|{driver_type}|{worker_type}|{num_workers}|{np_setup}|{repeat}|{epochs}|{learning_rate}|{batch_size}.log"

In [13]:
reset_stdout()

In [14]:
print(output_dir+filename)

redirect_stdout(output_dir+filename)
for i in range(repeat):
  print(f"REPEAT {i+1}")
  model = train()
  model_file_name = filename.split(".log")[0] + ".h5"
  model.save(model_file_name)
save_model_single(model_file_name)
move_log_to_s3()
reset_stdout()


In [15]:
save_model_single(model_file_name)
move_log_to_s3()
reset_stdout()

In [16]:
%sh ls /dbfs/mnt/wendao_test2/horovod_benchmark/logs/2019_11_12_14_56_52

In [17]:
#fit_generator version of hvd
#https://github.com/horovod/horovod/blob/master/examples/keras_mnist_advanced.py
#btw, the advanced example used fancy adadelta optimizer, that's why there are so many call backs.
#https://docs.databricks.com/applications/deep-learning/distributed-training/mnist-tensorflow-keras.html
#my understanding is that the entire script of horovod exampe needs to be put in train_hvd fundtion

def train_hvd(learning_rate = learning_rate):
  # Horovod: initialize Horovod.
  hvd.init()

  # Horovod: pin GPU to be used to process local rank (one GPU per process)
#   config = tf.ConfigProto()
#   config.gpu_options.allow_growth = True
#   config.gpu_options.visible_device_list = str(hvd.local_rank())
#   K.set_session(tf.Session(config=config))

  
  # Collect get data time
  start_time = time.time()
  
  (x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size())
  
  end_time = round((time.time() - start_time),3)
  print(f"step - get_data - {end_time}")  

  
  # Model Compiles
  start_time = time.time()

#   model=get_model()
  model = get_model(num_classes)
  
  # Horovod: adjust learning rate based on number of GPUs.
  optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())
#   optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())
  
#   optimizer =RAdam(total_steps=5000, warmup_proportion=0.1, learning_rate=learning_rate*hvd.size(), min_lr=1e-5)

  # Horovod: add Horovod Distributed Optimizer.
  optimizer = hvd.DistributedOptimizer(optimizer)

  model.compile(optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy'])

  callbacks = [
      # Horovod: broadcast initial variable states from rank 0 to all other processes.
      # This is necessary to ensure consistent initialization of all workers when
      # training is started with random weights or restored from a checkpoint.
      hvd.callbacks.BroadcastGlobalVariablesCallback(0),
#       hvd.callbacks.MetricAverageCallback(),
#       hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1),
#       keras.callbacks.ReduceLROnPlateau(patience=10, verbose=1)

  ]

  # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
  if hvd.rank() == 0:
      callbacks.append(keras.callbacks.ModelCheckpoint(checkpoint_dir + 'checkpoint-{epoch}.ckpt', save_weights_only = True))

  end_time = round((time.time() - start_time),3)
  print(f"step - prep_model - {end_time}") 
  
  # Model Train
  start_time = time.time()
  
  model.fit(x_train, y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test)
         )
   
  
  end_time = round((time.time() - start_time),3)
  print(f"step - train_model - {end_time}")   
    

In [18]:
from sparkdl import HorovodRunner
instance_type = "HOROVOD_CLUSTER"

In [19]:
np_list = [-8, 1, 2, 4, 8, 16, 32]

In [20]:
np_list = [8, 16, 32]

In [21]:
for np_setup in np_list:
  
  start_time = time.time()
  reset_stdout()
  
  checkpoint_dir = '/dbfs/ml/MNISTDemo/train/{}/{}/'.format(np_setup, time.time())
  os.makedirs(checkpoint_dir)
  
  filename = f"benchmark|{env}|{dataset}|{model_name}|{train_shape}|{validation_shape}|{instance_type}|{driver_type}|{worker_type}|{num_workers}|{np_setup}|{repeat}|{epochs}|{learning_rate}|{batch_size}.log"
  print(output_dir+filename)
  redirect_stdout(output_dir+filename)

  for i in range(repeat):
    print(f"{instance_type} - REPEAT {i+1}")
    hr = HorovodRunner(np = np_setup)
    hr.run(train_hvd,  learning_rate=learning_rate)

  reset_stdout()
  end_time = round((time.time() - start_time),3)
  print(f"np{np_setup} - finshed in {end_time}") 
  
  print("Saving to s3....")
  save_horovod_model()
  move_log_to_s3()
  print("Saving to s3 finshed!")
  
  reset_stdout()