In [1]:
import os
import pandas as pd
import numpy as np
import seaborn as sns
from math import pi
import time
import datetime
import sys
import os
utils_dir = '../../src/utils'
sys.path.insert(0, utils_dir) # add utils dir to path
import testbed_utils as tu

from sklearn import preprocessing
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import normalize, MinMaxScaler, RobustScaler
from scipy.stats import pearsonr
from sklearn.model_selection import train_test_split

import json
import math
import random

%matplotlib inline
min_max_scaler = preprocessing.MinMaxScaler()

sns.set(style="ticks")
sns.set_style({"xtick.direction": "in","ytick.direction": "in","xtick.top":True,"ytick.right":True,"axes.grid":True})

In [6]:
# configure the MACROs here
EXPERIMENT = 'atc-classification'
DATA_DIR = '../../data/{}'.format(EXPERIMENT)
USER = 'yzy'
GLOBAL_CONF = tu.common.json_read_file('../../config/global_conf.json')
ROOT_DIR = GLOBAL_CONF['dir']['root']
# local machine hostname
# LOCAL_MACHINE = LOCAL_MACHINE[0]
LOCAL_MACHINE = '10.60.16.17'
SERVER_IPS = GLOBAL_CONF['net']['physical_server_ip']
# local machine ID
LOCAL_ID = SERVER_IPS.index(LOCAL_MACHINE)
REMOTE_IDS = [i for i, v in enumerate(SERVER_IPS) if v != LOCAL_MACHINE]
SHM_LAYOUT = tu.common.json_read_file("../../src/lb/dev/shm_layout_base.json")
FEATURE_AS_CNT = [_[1] for _ in SHM_LAYOUT["vpp"]["struct"]["as_stat"][1:]] # counter features gathered for each AS in as_stat_t
FEATURE_AS_RES = [_[1] for _ in SHM_LAYOUT["vpp"]["struct"]["reservoir_as"]] # features gathered for each AS w/ reservoir sampling
RES_FEATURE_ENG = ["avg", "90", "std", "avg_decay", "90_decay"]
FEATURE_AS_ALL = FEATURE_AS_CNT + ["_".join((a, b)) for a in FEATURE_AS_RES for b in RES_FEATURE_ENG]
GT = ["cpu", "memory", "apache", "asid"]

In [7]:
def run_pipeline(
    lb_method = 'aquarius_ecmp',
    trace='wiki',
    experiment='offline',
    sample='hour0.csv',
    from_orig=False,
    config_file_prefix='1lb-36core',
    clip_n=20000,
    n_episode=1,
    episode_base=0,
    remote_servers=[0, 2, 3],
    clt_server=0,
    lb_gather_usage=False,
):
    '''
    @brief: run a set of experiments with a specific setup
    @params:
        lb_method: method name defined in config/lb_methods.json
        trace: trace type as in data/trace/*
    '''
    global USER, ROOT_DIR, LOCAL_ID, REMOTE_IDS, SERVER_IPS
    assert set(remote_servers) - set(REMOTE_IDS) == set()
    config_file = config_file_prefix+'-'+str(LOCAL_ID)
    config_file_remote = {i: config_file_prefix+'-{}'.format(i) for i in REMOTE_IDS}

    for episode in range(episode_base, episode_base+n_episode):
    
        task_name, task_dir, nodes = tu.init_task_info(
            experiment=experiment,
            lb_method=lb_method,
            trace=trace,
            sample=sample,
            cluster_config=config_file+'.json',
            alias=config_file_prefix,
        )

        print(">> run task {} - episode {}".format(task_name, episode))

        #--- spin up ---#
        for server_id in remote_servers:
            if clip_n and server_id == 0:
                clip_n_option = ' -n {}'.format(clip_n)
            else:
                clip_n_option = ''

            cmd = 'ssh -t {}@{} "python3 {}/src/utils/run_server.py --experiment {} -m {} --tr {} --sample {} -f {}.json{}"'.format(
                USER, SERVER_IPS[server_id], ROOT_DIR, experiment, lb_method, trace, sample, config_file_remote[server_id], clip_n_option)

            tu.subprocess.Popen(cmd, shell=True)
        

        tu.prepare_img(lb_method=lb_method, from_orig=from_orig, debug_node=False)

        tu.runall()
        time.sleep(10)
        
        #--- check network ---#

        net_ok = False
        while not net_ok:
            try:
                tu.gt_socket_check()
                net_ok = True
            except:
                print('error')
                time.sleep(1)

        time.sleep(3)

        # start gathering at LB node
        for lb in tu.NODES['lb']:
            lb.run_init_bg()
            if lb_gather_usage:
                lb.gather_usage()

        # run traffic
        t0 = time.time()
        cmd = 'ssh -t {}@{} "python3 {}/src/utils/run_traffic.py --experiment {} -m {} --tr {} --sample {} -f {}.json"'.format(
            USER, SERVER_IPS[clt_server], ROOT_DIR, experiment, lb_method, trace, sample, config_file_remote[0])
        tu.subprocess_cmd(cmd)
        print("Trace replay over w/ total time: {:.3f}s".format(time.time()-t0))
        time.sleep(5)

        # fetch results from nodes
        for server_id in remote_servers:
            cmd = 'ssh -t {}@{} "python3 {}/src/utils/shutdown_server.py --experiment {} -m {} --tr {} --sample {} -f {}.json --episode {}"'.format(
                USER, SERVER_IPS[server_id], ROOT_DIR, experiment, lb_method, trace, sample, config_file_remote[server_id], episode)
            tu.subprocess.Popen(cmd, shell=True)

        for lb in tu.NODES['lb']:
            lb.fetch_result(task_dir, episode)

        tu.shutall()

In [8]:
method = 'aquarius_ecmp'
samples = ['hour{}.csv'.format(i) for i in [8, 12]]
config_prefix = ['conf0{}'.format(i) for i in [1]]
trace = 'wiki'
from_orig=None
clip_n = 200000
n_episode = 2
episode_base = 0
remote_servers = [0]
clt_server = 0

In [11]:
lb_method=method
trace=trace
experiment=EXPERIMENT
sample=samples[0]
from_orig=from_orig
config_file_prefix=config_prefix[0]
clip_n=clip_n
n_episode=n_episode
episode_base=episode_base
remote_servers=remote_servers
clt_server=clt_server
lb_gather_usage=False

In [12]:
assert set(remote_servers) - set(REMOTE_IDS) == set()
config_file = config_file_prefix+'-'+str(LOCAL_ID)
config_file_remote = {i: config_file_prefix+'-{}'.format(i) for i in REMOTE_IDS}

episode = 0

In [13]:
task_name, task_dir, nodes = tu.init_task_info(
    experiment=experiment,
    lb_method=lb_method,
    trace=trace,
    sample=sample,
    cluster_config=config_file+'.json',
    alias=config_file_prefix,
)

print(">> run task {} - episode {}".format(task_name, episode))

>> run task wiki-aquarius_ecmp-hour8-conf01 - episode 0


In [14]:
#--- spin up ---#
for server_id in remote_servers:
    if clip_n and server_id == 0:
        clip_n_option = ' -n {}'.format(clip_n)
    else:
        clip_n_option = ''

    cmd = 'ssh -t {}@{} "python3 {}/src/utils/run_server.py --experiment {} -m {} --tr {} --sample {} -f {}.json{}"'.format(
        USER, SERVER_IPS[server_id], ROOT_DIR, experiment, lb_method, trace, sample, config_file_remote[server_id], clip_n_option)

    print(cmd)
    tu.subprocess.Popen(cmd, shell=True)

ssh -t yzy@10.60.16.12 "python3 /home/yzy/aquarius/src/utils/run_server.py --experiment atc-classification -m aquarius_ecmp --tr wiki --sample hour8.csv -f conf01-0.json -n 200000"


In [15]:
tu.prepare_img(lb_method=lb_method, from_orig=from_orig, debug_node=False)

tu.runall()
time.sleep(10)

base img for aquarius_ecmp does not exist, create one...
node_lb_0 ready: ssh -p 8900 cisco@localhost
node_lb_1 ready: ssh -p 8901 cisco@localhost
node_server_0 ready: ssh -p 9000 cisco@localhost
node_server_1 ready: ssh -p 9001 cisco@localhost
node_server_2 ready: ssh -p 9002 cisco@localhost
node_server_3 ready: ssh -p 9003 cisco@localhost
node_server_4 ready: ssh -p 9004 cisco@localhost
node_server_5 ready: ssh -p 9005 cisco@localhost
node_server_6 ready: ssh -p 9006 cisco@localhost


In [16]:
#--- check network ---#

net_ok = False
while not net_ok:
    try:
        tu.gt_socket_check()
        net_ok = True
    except:
        print('error')
        time.sleep(1)

time.sleep(3)

LB Node 0: pass
LB Node 1: pass


In [17]:
# start gathering at LB node
for lb in tu.NODES['lb']:
    lb.run_init_bg()
    if lb_gather_usage:
        lb.gather_usage()

In [18]:
# run traffic
t0 = time.time()
cmd = 'ssh -t {}@{} "python3 {}/src/utils/run_traffic.py --experiment {} -m {} --tr {} --sample {} -f {}.json"'.format(
    USER, SERVER_IPS[clt_server], ROOT_DIR, experiment, lb_method, trace, sample, config_file_remote[0])
tu.subprocess_cmd(cmd)
print("Trace replay over w/ total time: {:.3f}s".format(time.time()-t0))
time.sleep(5)

KeyboardInterrupt: 

In [19]:
# fetch results from nodes
for server_id in remote_servers:
    cmd = 'ssh -t {}@{} "python3 {}/src/utils/shutdown_server.py --experiment {} -m {} --tr {} --sample {} -f {}.json --episode {}"'.format(
        USER, SERVER_IPS[server_id], ROOT_DIR, experiment, lb_method, trace, sample, config_file_remote[server_id], episode)
    tu.subprocess.Popen(cmd, shell=True)

for lb in tu.NODES['lb']:
    lb.fetch_result(task_dir, episode)

tu.shutall()

In [20]:
cmd

'ssh -t yzy@10.60.16.12 "python3 /home/yzy/aquarius/src/utils/shutdown_server.py --experiment atc-classification -m aquarius_ecmp --tr wiki --sample hour8.csv -f conf01-0.json --episode 0"'