In [None]:
import mechanize
import numpy
import os
import queue
import random
import shutil
import socket
import string
import threading
import time
import urllib.request
from abc import ABC, abstractmethod
import pandas
import libcloud
import paramiko
from dataclasses import dataclass
from libcloud.compute.providers import get_driver
from libcloud.compute.types import Provider
from paramiko.buffered_pipe import PipeTimeout

### System config

In [None]:
from system_config import \
    SSH_USER, GCLOUD_ACCOUNT, GCLOUD_KEY_PATH, GCLOUD_PROJECT, IMAGE_NAME, LOCAL_SSH_PUBLIC_KEY_PATH, SSH_KEY_USERNAME

### Experiment config

In [None]:
DESIGN_CSV = 'design.csv'  # The CSV with the experiment design, tab seperated
MACHINE_TYPE_MASTER = 'e2-standard-4'
MACHINE_TYPE_SLAVES = 'e2-standard-16'
NUM_NODES = 3 # amount of slaves
BATCH_SIZE = 128
EX_RUNTIME = 60 * 60  # seconds

# Start ComputeEngine Driver

In [None]:
ComputeEngine = get_driver(Provider.GCE)

driver = ComputeEngine(GCLOUD_ACCOUNT, GCLOUD_KEY_PATH, project=GCLOUD_PROJECT)
location = [l for l in driver.list_locations() if l.id == '2210'][0]
# network = [n for n in driver.ex_list_networks() if n.id == '8043342384481294734'][0]

ex_id = ''.join(random.choice(string.ascii_lowercase) for i in range(8))  # Generate a random project id
print(f"Experiment ID: {ex_id}")

# Start MasterNode instance

In [None]:
from nodes import MasterNode

master = MasterNode(driver, location, f"master-{ex_id}-1", MACHINE_TYPE_MASTER, master=True)

# Import Design

In [None]:
# Read experiment config and put it in the /raw/ direcory
experiments = pandas.read_csv(DESIGN_CSV, sep='\t')
experiments.columns.values[0] = 'Index'
experiments.set_index('Index')

os.makedirs(f"raw/{ex_id}", exist_ok=True)
shutil.copyfile(DESIGN_CSV, f"raw/{ex_id}/design.csv")

# Start Slave instances

In [None]:
from nodes import SlaveNode

print(f"Starting {NUM_NODES} slaves")
slaves = [SlaveNode(driver, location, f"slave-{ex_id}-{i}", MACHINE_TYPE_SLAVES, masterNode=master) for i in range(NUM_NODES)]

# Run Experiments

In [None]:
for idx, row in experiments.iterrows():
    max_epochs = int(row['max_epochs'])
    model = str(row['model'])
    executor_memory = int(row['executor_memory'])
    executor_cores = int(row['executor_cores'])
    total_executor_cores = NUM_NODES * executor_cores # to keep 1 executor per node
    print(f"Experiment {row['Index']}/{ex_id} - epochs: {max_epochs}, model: {model}, ex. mem: {executor_memory}, ex. cores: {executor_cores}, tot. ex. cores: {total_executor_cores}")

    filename = f"{int(row.Index)}-model{model}-epochs{max_epochs}-executor_memory{executor_memory}-executor_cores{executor_cores}.log"

    stdin, stdout, stderr = master.ssh.exec_command(f"sudo /home/{SSH_USER}/bd/spark/bin/spark-submit --master spark://{master.privip}:7077 --driver-cores 2 "
                                                    f"--driver-memory 2G --total-executor-cores {total_executor_cores} --executor-cores {executor_cores} --executor-memory {executor_memory}G "
                                                    f"--py-files /home/{SSH_USER}/bd/spark/lib/bigdl-0.11.0-python-api.zip,/home/{SSH_USER}/bd/codes/{model}.py "
                                                    f"--properties-file /home/{SSH_USER}/bd/spark/conf/spark-bigdl.conf "
                                                    f"--jars /home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.11.0-jar-with-dependencies.jar "
                                                    f"--conf spark.driver.extraClassPath=/home/{SSH_USER}/bd/spark/lib/bigdl-SPARK_2.3-0.11.0-jar-with-dependencies.jar "
                                                    f"--conf spark.executer.extraClassPath=bigdl-SPARK_2.3-0.11.0-jar-with-dependencies.jar /home/{SSH_USER}/bd/codes/{model}.py "
                                                    f"--action train --dataPath /tmp/mnist --batchSize {BATCH_SIZE * NUM_NODES} --endTriggerNum {max_epochs} "
                                                    f"--learningRate 0.01 --learningrateDecay 0.0002 > {ex_id}-{filename}", timeout=EX_RUNTIME)

    try:
        print(stdout.read())
        print(stderr.read())
    except PipeTimeout:
        print("PipeTimeout")
    except socket.timeout:
        print("Socket timeout")
    sftp = master.ssh.open_sftp()
    sftp.get(f'{ex_id}-{filename}', f'raw/{ex_id}/{filename}')

# Export

In [None]:
from datetime import datetime
import os
import pandas as pd
import re

FOLDER_NAME = ex_id # Enter the name of the folder to export

dir = os.scandir('raw/')
experiments = list(filter(lambda x: x.is_dir() and x.name[0] != '.' and x.name == FOLDER_NAME, dir))
assert len(experiments) > 0, f'The folder {FOLDER_NAME} does not exist!'

def experiment_to_csv(path):
    jobs = pd.read_csv(f'{experiment.path}/design.csv', sep='\t')
    jobs.columns.values[0] = 'Index'
    jobs = jobs[jobs.Index >= 0]
    
    jobs['accuracy'] = 0.0
    jobs['epochs'] = 0
    pat = re.compile("accuracy: (\d+\.\d+)")
    pat_clock = re.compile("Wall clock time is (\d+\.\d+) ms")
    found, not_existing = 0, 0
    for idx, row in jobs.iterrows():
        filename = f"{int(row.Index)}-model{row.model}-epochs{row.max_epochs}-executor_memory{row.executor_memory}-executor_cores{row.executor_cores}.log"
        
        try:
            with open(f'{experiment.path}/{filename}', 'r') as file:
                content = file.read()
                result = pat.findall(content)
                result_wallclock = float(pat_clock.findall(content)[-1])/1000
                jobs.at[idx, 'time'] = result_wallclock
                jobs.at[idx, 'accuracy'] = result[-1] if len(result) > 0 else 0
                jobs.loc[idx, 'epochs'] = len(result)
                found += 1
        except FileNotFoundError as e:
#             print(e)
            jobs.at[idx, 'time'] = -1
            jobs.at[idx, 'accuracy'] = -1
            jobs.loc[idx, 'epochs'] = -1
            not_existing += 1
    
    time = os.stat(f'{experiment.path}/design.csv').st_mtime
    dt = datetime.fromtimestamp(time)
    jobs.to_csv(f'ex-{dt.strftime("%Y-%m-%d_%H:%M")}.csv')
    return (jobs, f'ex-{dt.strftime("%Y-%m-%d_%H:%M")}.csv', found, not_existing)

for experiment in experiments:
    if os.path.exists(f'{experiment.path}/design.csv'):
        jobs, path, found, not_existing = experiment_to_csv(experiment.path)
        print(f'Results of {experiment} stored in {path}')
        print(f'{found} of the {found + not_existing} expected log files are present. The other experiments are probably missing.\n')
        df = pd.read_csv(path)
        df = df[['max_epochs', 'executor_memory', 'executor_cores', 'model', 'accuracy', 'time']]

In [None]:
df