In [None]:
import mechanize
import numpy
import os
import queue
import random
import shutil
import socket
import string
import threading
import time
import urllib.request
from abc import ABC, abstractmethod
import pandas
import libcloud
import paramiko
from dataclasses import dataclass
from libcloud.compute.providers import get_driver
from libcloud.compute.types import Provider
from paramiko.buffered_pipe import PipeTimeout

In [None]:
SSH_USER = ''
GCLOUD_ACCOUNT = ''
GCLOUD_KEY_PATH = ''  # The path to the Service Account Key (a JSON file)
GCLOUD_PROJECT = ''  # GCloud project id
DESIGN_CSV = ''  # The CSV with the experiment design

In [None]:
ComputeEngine = get_driver(Provider.GCE)

driver = ComputeEngine(GCLOUD_ACCOUNT, GCLOUD_KEY_PATH,
                       project=GCLOUD_PROJECT)
location = [l for l in driver.list_locations() if l.id == '2000'][0]
network = [n for n in driver.ex_list_networks() if n.id ==
           '6096184313360012863'][0]

try:
    # To prevent problems with GCloud reusing IP addresses
    os.remove(f'/home/{SSH_USER}/.ssh/known_hosts')
except:
    pass

ex_id = ''.join(random.choice(string.ascii_lowercase)
                for i in range(8))  # Generate a random project id
EX_RUNTIME = 300  # seconds
print(f"Experiment ID: {ex_id}")

In [None]:


class Node(ABC):
    def __init__(self, driver, name, master=False, masterNode=None):
        self.driver = driver
        self.name = name
        if not master and masterNode == None:
            raise ValueError("Slave nodes need a master")
        self.master = masterNode
        self.disk = self.driver.create_volume(40, f"boot-{self.name}", image='opendl-2', location=location)
        self.node = self.driver.create_node(
            name, 'n1-standard-1', None, location=location, ex_boot_disk=self.disk)
        self.driver.wait_until_running([self.node])
        self.pubip = self.node.public_ips[0]
        self.privip = self.node.private_ips[0]
        self.connected = False

        for i in range(5):  # Try 5 times
            try:
                self.open_ssh()
                break
            except Exception as e:
                print(e)
                time.sleep(5)
        if not self.connected:
            raise RuntimeError(f"Can't connect to node {self.name}")
        self.start_type()

    def __del__(self):
        self.close_ssh()

    def open_ssh(self):
        self.ssh = paramiko.SSHClient()
        self.ssh.load_system_host_keys()
        self.ssh.set_missing_host_key_policy(paramiko.WarningPolicy())
        self.ssh.connect(self.pubip, port=22, username=SSH_USER)
        self.connected = True

    def close_ssh(self):
        self.connected = False
        self.ssh.close()

    @abstractmethod
    def start_type(self):
        pass


@dataclass
class JobOptions:
    batch_size: int
    max_epochs: int


class MasterNode(Node):
    def start_type(self):
        stdin, stdout, stderr = self.ssh.exec_command(f'echo "SPARK_MASTER_HOST=\'{self.privip}\'" >> /home/{SSH_USER}/bd/spark/conf/spark-env.sh')
        if (len(stderr.read()) > 0):
            print(stdout.read())
            print(stderr.read())

        stdin, stdout, stderr = self.ssh.exec_command(
            '/home/{SSH_USER}/bd/spark/sbin/start-master.sh')
        if (len(stderr.read()) > 0):
            print(stdout.read())
            print(stderr.read())

    def submit(self, options: JobOptions):

        stdin, stdout, stderr = self.ssh.exec_command(f"/home/{SSH_USER}/bd/spark/bin/spark-submit --master {self.privip} --driver-cores 1 "
                                                      f"--driver-memory 1G --total-executor-cores 1 --executor-cores 1 --executor-memory 1G "
                                                      f"--py-files \"/home/{SSH_USER}/bd/spark/lib/bigdl-0.8.0-python-api,/home/{SSH_USER}/bd/codes/bi-rnn.py\" "
                                                      f"--properties-file \"/home/{SSH_USER}/bd/spark/conf/spark-bigdl.conf\" "
                                                      f"--jars \"/home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.8.0-jar-with-dependencies.jar\" "
                                                      f"--conf \"spark.driver.extraClassPath=/home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.8.0-jar-with-dependencies.jar\" "
                                                      f"--conf \"spark.executer.extraClassPath=bigdl-SPARK_2.3-0.8.0-jar-with-dependencies.jar /home/{SSH_USER}/bd/codes/bi-rnn.py\" "
                                                      f"--action train --dataPath \"/tmp/mnist\" --batchSize {options.batch_size} --endTriggerNum {options.max_epochs} "
                                                      f"--learningRate 0.01 --learningrateDecay 0.0002")
        if (len(stderr.read()) > 0):
            print(stdout.read())
            print(stderr.read())

    def cancel(self):
        br = mechanize.Browser()
        br.open(f"http://{self.pubip}:8080")

        def select_form(form):
            return form.attrs.get('action', None) == 'app/kill/'
        try:
            br.select_form(predicate=select_form)
        except mechanize._mechanize.FormNotFoundError:
            print("FormNotFoundError")
        except Exception as e:
            print("An error occurred during cancelling.")
            print(e)
        br.submit()


class SlaveNode(Node):
    def start_type(self):
        stdin, stdout, stderr = self.ssh.exec_command(f'/home/{SSH_USER}/bd/spark/sbin/start-slave.sh spark://{self.master.privip}:7077')
        if (len(stderr.read()) > 0):
            print(stdout.read())
            print(stderr.read())

In [None]:
master = MasterNode(driver, f"master-{ex_id}-1", master=True)

In [None]:
experiments = pandas.read_csv(DESIGN_CSV)
experiments.columns.values[0] = 'Index'
experiments.set_index('Index')

if 'failure_rate' not in experiments:
    experiments['failure_rate'] = 0
if 'failure_duration' not in experiments:
    experiments['failure_duration'] = 0

print(experiments)
num_slaves = max(experiments['num_nodes'])
print(f"Number of slaves: {num_slaves}")

os.makedirs(f"raw/{ex_id}", exist_ok=True)
shutil.copyfile(DESIGN_CSV, f"raw/{ex_id}/design.csv")

In [None]:
slaves = [SlaveNode(driver, f"slave-{ex_id}-{i}", masterNode=master) for i in range(1, int(num_slaves)+1)]

In [None]:
for idx, row in experiments.iterrows():
    print(f"Experiment {row['Index']}")
    options = JobOptions(int(row['batch_size']), 50)

    for slave in slaves:
        slave.start_failure_worker(
            row['failure_rate'], row['failure_duration'])

    filename = f"{int(row.Index)}-nodes{int(row.num_nodes)}-batch{options.batch_size}-epochs{options.max_epochs}-frate{row.failure_rate}-duration{row.failure_duration}-time{EX_RUNTIME}.log"

    stdin, stdout, stderr = master.ssh.exec_command(f"/home/{SSH_USER}/bd/spark/bin/spark-submit --master spark://{master.privip}:7077 --driver-cores 1 "
                                                    f"--driver-memory 1G --total-executor-cores {int(row.num_nodes)} --executor-cores 1 --executor-memory {int(row.memory_size)}M "
                                                    f"--py-files /home/{SSH_USER}/bd/spark/lib/bigdl-0.8.0-python-api,/home/{SSH_USER}/bd/codes/bi-rnn.py "
                                                    f"--properties-file /home/{SSH_USER}/bd/spark/conf/spark-bigdl.conf "
                                                    f"--jars /home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.8.0-jar-with-dependencies.jar "
                                                    f"--conf spark.driver.extraClassPath=/home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.8.0-jar-with-dependencies.jar "
                                                    f"--conf spark.executer.extraClassPath=bigdl-SPARK_2.3-0.8.0-jar-with-dependencies.jar /home/{SSH_USER}/bd/codes/bi-rnn.py "
                                                    f"--action train --dataPath /tmp/mnist --batchSize {options.batch_size} --endTriggerNum {options.max_epochs} "
                                                    f"--learningRate 0.01 --learningrateDecay 0.0002 > {ex_id}-{filename}", timeout=EX_RUNTIME)

    try:
        print(stdout.read())
        print(stderr.read())
    except PipeTimeout:
        print("PipeTimeout")
    except socket.timeout:
        print("Socket timeout")
    master.cancel()
    sftp = master.ssh.open_sftp()
    sftp.get(f'{ex_id}-{filename}', f'raw/{ex_id}/{filename}')

    for slave in slaves:
        slave.stop_failure_worker()