# Experimentation scripts for Pprox macrobenchmark evaluation

## Global parameters

In [1]:
shell_log=True
hostAddress = "10-0-0-33.nip.io"

### Mongo db & collection initialization

In [2]:
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import seaborn as sns

client = MongoClient("manager-mongo-manager")

db = client["private-recsys"]
collection_experiment = db["experiments"]



### Low level scripting functions

In [3]:
import subprocess
import shlex
import datetime
from dateutil.tz import tzlocal
import pandas as pd
import glob
import time
from time import sleep 
def run_command_async(command):
    if (isinstance(command, list) == False):
        commands = [command]
    else:
        commands=command
    processes = [None] * len(commands)
    for i in range(len(commands)):
        processes[i] = subprocess.Popen(shlex.split(commands[i]), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return processes

def wait_for_command_async(processes):
    while processes:
        for i, process in enumerate(processes):
            output = process.stdout.readline().decode()
            if output == '' and process.poll() is not None:
                processes.remove(process)
                break
            if output:
                now=datetime.datetime.now(tzlocal())
                strnow = now.strftime("%Y-%m-%d %H:%M:%S")
                print ("Log {0} - {1} : ".format(i,strnow) + output.strip())
    
    rc = process.poll()
    return rc    
def run_command(command, shell=False, log=True):
    print(command)
    if (isinstance(command, list) == False):
        commands = [command]
    else:
        commands=command
    processes = [None] * len(commands)
    for i in range(len(commands)):
        if log == True:
            print (commands[i])
        processes[i] = subprocess.Popen(shlex.split(commands[i]), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=shell)
    while processes:
        for i, process in enumerate(processes):
            output = process.stdout.readline().decode()
            if output == '' and process.poll() is not None:
                processes.remove(process)
                break
            if output and log:
                now=datetime.datetime.now(tzlocal())
                strnow = now.strftime("%Y-%m-%d %H:%M:%S")
                print ("Log {0} - {1} : ".format(i,strnow) + output.strip())
    
    rc = process.poll()
    return rc
import os

class cd:
    """Context manager for changing the current working directory"""
    def __init__(self, newPath):
        self.newPath = os.path.expanduser(newPath)

    def __enter__(self):
        self.savedPath = os.getcwd()
        os.chdir(self.newPath)

    def __exit__(self, etype, value, traceback):
        os.chdir(self.savedPath)
def get_strnow():
    now=datetime.datetime.now(tzlocal())
    return now.strftime("%Y%m%d%H%M%S")


### Module scope functions

In [4]:
def wait_for_http_status(url, code, duration):
    import requests
    import time
    print
    req_code = None
    start_time = time.time()
    end_time= time.time()
    while req_code != code and (end_time - start_time) < duration :
        try:
            time.sleep(1)
            r = requests.head(url)
            req_code = r.status_code
            print("\rWaiting status {} for {} seconds            ".format(code, end_time - start_time), end="\r", flush=True)
            return 0
        except requests.ConnectionError:
            print("failed to connect")
            return 1
        end_time = time.time()    

def wait_for_job(name, duration):
    with cd(".."):
        command = "./wait_for_job.sh {} {}".format(name, duration)
        result = run_command(command, log = shell_log)
        return result
        
def install_recsys(params):
    
    #run_command("helm init --client-only", log = shell_log)
    #launch commands manually
    #with cd("PrivateRecSys/charts/private-recsys"):
    #   run_command("helm repo add main https://kubernetes-charts.storage.googleapis.com/")
    #    run_command("helm repo add elastic https://helm.elastic.co")
    #    run_command("helm dep update") # can't work https://github.com/helm/helm/issues/2998 : it should be done on the node
        
    with cd("../../charts/private-recsys"):
        run_command("kubectl apply -f ..", log = shell_log)
        param_str = ""
        for k in params.keys():
            param_str += "--set {}={} ".format(str(k).replace("!","."), str(params[k]).replace("!","."))
        command = "helm install recsys . --namespace default {} --wait --timeout {}s ".format(param_str, params["timeout"])
        result = run_command(command, log = shell_log)
        return result    
    
def uninstall_recsys():
    with cd("../../charts/private-recsys"):
        result = run_command("helm --namespace default delete recsys", log = shell_log)
        if result == 0:
            sleep(120)
    return result

def install_stub(params):
    
    #run_command("helm init --client-only")
    #launch commands manually
    #with cd("PrivateRecSys/charts/private-recsys"):
    #   run_command("helm repo add main https://kubernetes-charts.storage.googleapis.com/")
    #    run_command("helm repo add elastic https://helm.elastic.co")
    #    run_command("helm dep update") # can't work https://github.com/helm/helm/issues/2998 : it should be done on the node
        
    with cd("../../charts/stub-recsys"):
        param_str = ""
        for k in params.keys():
            param_str += "--set {}={} ".format(str(k).replace("!","."), str(params[k]).replace("!","."))
        command = "helm install stub . --namespace default {} --wait --timeout {}s ".format(param_str, params["timeout"])
        result = run_command(command, log = shell_log)
        return result    

def uninstall_stub():
    with cd("../../charts/stub-recsys"):
        result = run_command("helm --namespace default delete stub", log = shell_log)
        if result == 0:
            sleep(30)
    return result

def launch_injector(params):
    with cd("../../charts/injector"):
        param_str = ""
        for k in params.keys():
            param_str += "--set {}={} ".format(str(k).replace("!","."), str(params[k]).replace("!","."))
        command = "helm install injector . --namespace default {} --wait --timeout {}s".format(param_str, params["timeout"])
        result = run_command(command, log = shell_log)
    
    if result == 0:
        #result = run_command("kubectl wait --namespace default --for=condition=complete --timeout={}s job/injector-injector".format(params["timeout"]+600), log = shell_log)
        result = wait_for_job("injector-injector", params["timeout"]+600)
    return result
    
def uninstall_injector():
    with cd("../../charts/injector"):
        result = run_command("helm --namespace default delete injector", log = shell_log)
        if result == 0:
            sleep(30)
    return result


### Test scope functions

In [5]:
def is_harness_result_ok(collection):
    row = collection.find_one({"http_status":201})
    if row is not None:
        #print(row["content"]["result"])
        if len(row["content"]["result"]) > 0:
            return True
    else:
        print("Empty db")
    return False
def deploy_test(params):
    # uninstall previous launches
    print(uninstall_stub())
    print(uninstall_injector())
    print(uninstall_recsys())

    # init naming for experiment
    strnow = get_strnow()
    params_recsys = params["params_recsys"]
    params_injector = params["params_injector"]
    
    params["name"] = params["name"] + strnow
    params_injector["name"] = params_injector["name"] + strnow
    params_recsys["name"] = params_recsys["name"] + strnow
    
    # add labels with names
    params_injector["experiment"] = params["name"] 
    params_recsys["experiment"] = params["name"] 
    params_recsys["proxy-sgx!experiment"] = params["name"]
    params_recsys["harness!experiment"] = params["name"]
    
    mongo_id = collection_experiment.insert_one(params).inserted_id
    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)

    # init step recsys
    params_recsys["start_time"] = time.time()
    params["status"] = "recsys"
    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)
    
    cpt = 0
    err = None
    while (err is None or err != 0) and cpt < 5:
        
        if params["stub"] != "1":
            err=install_recsys(params_recsys)
        else:
            err=install_stub(params_recsys)
        print("err={}".format(err))
        if err != 0:
            if params["stub"] != "1":
                uninstall_recsys()
            else:
                uninstall_stub()
        cpt += 1

    params_recsys["end_time"] = time.time()
    params["status"] = "recsys"
    if err != 0:
        params["status"] = "recsys_error"

    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)
    return params["status"]    
def run_test(params):
    # uninstall previous launches
    print(uninstall_injector())

    # init naming for experiment
    strnow = get_strnow()
    params_recsys = params["params_recsys"]
    params_injector = params["params_injector"]
    
    params["name"] = params["name"] + strnow
    params_injector["name"] = params_injector["name"] + strnow
    params_recsys["name"] = params_recsys["name"] + strnow
    
    # add labels with names
    params_injector["experiment"] = params["name"] 
    params_recsys["experiment"] = params["name"] 
    params_recsys["proxy-sgx!experiment"] = params["name"]
    params_recsys["harness!experiment"] = params["name"]
    
    mongo_id = collection_experiment.insert_one(params).inserted_id
    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)
   
    params_injector["start_time"] = time.time()
    params["status"] = "injector"
    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)
    if params["stub"] != "1":
        time.sleep(120)
    cpt = 0

    err = launch_injector(params_injector)
    print("Injector deployment err : {}".format(err))
    #uninstall_injector()

    if err != 0:
        if err == 1:
            # failed injector, we assume it is harness
            params["status"] = "harness_error"
        return params["status"]
    
    params_injector["end_time"] = time.time()
    params["status"] = "injector"
    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)

    params_injector["error"] = err
    params_injector["end_time"] = time.time()
    #now=datetime.datetime.now(tzlocal())
    #strnow = now.strftime("%Y%m%d%H%M%S")



    coll_name = "private-recsys.{}.locust_request_result".format(params_injector["name"]) 
    coll = db[coll_name]
    params["status"] = "finished"
    
    #if params["stub"] != "1":
    #    if is_harness_result_ok(coll) == False:
    #        params["status"] = "harness_error"
    #print("{} - {}".format(coll_name, is_harness_result_ok(coll)))
        
    params["end_time"] = time.time()
    collection_experiment.update_one({'_id':mongo_id}, {"$set": params}, upsert=False)
    return params["status"]


### Test functions

In [18]:
import itertools
import collections
def unpack_dict(target):
    ar = collections.OrderedDict()
    for k in target.keys():
        if isinstance(target[k], list):
            ar[k] = target[k]
    print(ar.keys())
    combinations = [dict(zip(ar.keys(), i)) for i in itertools.product(*ar.values())]
    result = []
    for c in combinations:
        print(c)
        d = target.copy()
        d.update(c)
        result.append(d)
    return result, ar

def rename_db(old, new):
    import pymongo
    from pymongo import MongoClient
    client = MongoClient("manager-mongo-manager")
    client.admin.command('copydb',
                         fromdb=old,
                         todb=new)
    client.drop_database(old)
    
def loop_run_test(params_packed, retry=5):
    list_params_recsys, arrays_params_recsys = unpack_dict(params_packed["params_recsys"])
    list_params_injector, arrays_params_injector = unpack_dict(params_packed["params_injector"])

    strnow = get_strnow()
    print("db:" + "-".join(list(arrays_params_recsys))+"_"+"-".join(list(arrays_params_injector))+"_"+strnow)

    print(list_params_injector)
    first=True
    for params_recsys in list_params_recsys:
        params = params_packed.copy()
        params["params_recsys"] = params_recsys.copy()
        result = deploy_test(params.copy())
        
        if result != "recsys":
            break
        for params_injector in list_params_injector:
            for i in range(retry):
                params = params_packed.copy()
                params["params_recsys"] = params_recsys.copy()
                params["params_injector"] = params_injector.copy()
                if first == True:
                    print("First launch")
                    
                    result = run_test(params.copy())
                    if result != "finished":
                        print("Launch {} in error, redeploying recsys.".format(i))
                        result = deploy_test(params.copy())
                        
                    else:
                        first=False
                        break

                else:
                    print("Not first, no init/train needed")
                    params["params_injector"]["harness"] = False

                    result = run_test(params.copy())
                    if result != "harness_error":
                        break
    #rename database 
    #rename_db("private-recsys", "-".join(list(arrays_params_recsys))+"_"+"-".join(list(arrays_params_injector))+"_"+strnow)

# copy mongo database name_from to database name_to. 
def fusion_experiments_db(name_from, name_to, drop = False):
    import pymongo
    from pymongo import MongoClient
    client = MongoClient("manager-mongo-manager")    
    if drop == True:
        client.drop_database(name_to)
    from pymongo import MongoClient
    client = MongoClient("manager-mongo-manager")
    database_names = client.list_database_names()
    for database_name in database_names:
        if name_from in database_name:
            print(database_name)
            elements = list(client[database_name]["experiments"].find())
            client[name_to]["experiments"].insert_many(elements) 
            for coll_name in client[database_name].list_collection_names(nameOnly=True):
                if coll_name != "experiments":
                    elements = list(client[database_name][coll_name].find())
                    client[name_to][coll_name].insert_many(elements)
    
def remove_working_db():
    from pymongo import MongoClient
    client = MongoClient("manager-mongo-manager")   
    client.drop_database("private-recsys")


## Remove deployment

In [7]:
# reset 

print(uninstall_stub())
print(uninstall_injector())
print(uninstall_recsys())

helm --namespace default delete stub
helm --namespace default delete stub
Log 0 - 2021-08-06 16:20:42 : Error: uninstall: Release not loaded: stub: release: not found
1
helm --namespace default delete injector
helm --namespace default delete injector
Log 0 - 2021-08-06 16:20:42 : Error: uninstall: Release not loaded: injector: release: not found
1
helm --namespace default delete recsys
helm --namespace default delete recsys
Log 0 - 2021-08-06 16:20:43 : release "recsys" uninstalled


KeyboardInterrupt: 

# Experiments

Actual experiments begin here. 
If successful, the result will be the following:
```
Job finished
Injector deployment err : 0
private-recsys
```

In case of errors, please check logs of all pods in default namespace. Please keep in mind that with this version of Elastic, half of the times Harness deployment fails when using 1 or an odd number of replicas for Elastic. In case of errors, Harness will be redeployed by the script (max 3 times).

For first tests, we advice to keep a look on the deployments with this kind of command: 
```
watch kubectl get pod
```

# Test Harness (without PProx)

In [36]:
remove_working_db()

for debug in ["1"]:
    for encryption in ["0"]: #
        for client in ["50"]:
            for identifier in range(1):
                for proxy in ["localhost:32000/sgx-proxy"]:
                    name = "proxy{}_{}".format(client, identifier)
                    params_recsys = {
                        "hostAddress": hostAddress,
                        "elastic!replicas":1,                        
                        "harness!replicaCount":1,
                        "proxy-sgx!replicaCount":1,
                        "proxy-sgx!proxyType1":"1",
                        "proxy-sgx!proxyType2":"2",
                        "proxy-sgx!targetPort":"9090",            
                        "proxy-sgx!enableEncryption":encryption,
                        "proxy-sgx!bufferShuffling":"10",
                        "proxy-sgx!responseTimeout":[999999],         
                        "proxy-sgx!debug":debug,
                        "proxy-sgx!image!repository":proxy,
                        "mongo!replicaSet!enabled": "false",
                        "elastic!esJavaOpts":'"-Xmx16g -Xms16g"',
                        "elastic!resources!requests!cpu":"1500m",
                        "elastic!resources!requests!memory":"17Gi",
                        "elastic!resources!limits!cpu":"2000m",
                        "elastic!resources!limits!memory":"17Gi",                        
                        "name":name,
                        "timeout":450,
                    #    "mongo!clusterDomain":"nuc.local"  # Louvain
                        "mongo!clusterDomain":"cluster.local" # Bordeaux
                    }
                    params_injector={
                        "duration":300,
                        "clients":[client],
                        "replicaCount":1,
                        #"targetHost":["http://stub-stub-recsys:8080"],   # stub      
                        "targetHost":["http://recsys-harness:9090"], # pprox                
                        "concurrency":10,
                        "timeout":1200,
                        "asyncFluentd":"false",
                        "enableEncryption":encryption,
                        "randomShift":"1",                
                        "name":name,
                        "harness":"true" # false if stub
                    }
                    params = {
                        "start_time":time.time(),
                        "params_recsys":params_recsys,
                        "params_injector":params_injector,
                        "name:":name,
                        "error_module":"",
                        "error":0,
                        "status":"created",
                        "name":name,
                        "stub":"0"
                    }

                    loop_run_test(params, retry=3)

# save working db (private-recsys) to debug-stub+only                    
fusion_experiments_db("private-recsys", "debug-harness-nopprox", drop=True)   


odict_keys(['proxy-sgx!responseTimeout'])
{'proxy-sgx!responseTimeout': 999999}
odict_keys(['clients', 'targetHost'])
{'clients': '50', 'targetHost': 'http://recsys-harness:9090'}
db:proxy-sgx!responseTimeout_clients-targetHost_20210808145836
[{'duration': 300, 'clients': '50', 'replicaCount': 1, 'targetHost': 'http://recsys-harness:9090', 'concurrency': 10, 'timeout': 1200, 'asyncFluentd': 'false', 'enableEncryption': '0', 'randomShift': '1', 'name': 'proxy50_0', 'harness': 'true'}]
helm --namespace default delete stub
helm --namespace default delete stub
Log 0 - 2021-08-08 14:58:37 : Error: uninstall: Release not loaded: stub: release: not found
1
helm --namespace default delete injector
helm --namespace default delete injector
Log 0 - 2021-08-08 14:58:37 : release "injector" uninstalled
0
helm --namespace default delete recsys
helm --namespace default delete recsys
Log 0 - 2021-08-08 14:59:08 : release "recsys" uninstalled
0
kubectl apply -f ..
kubectl apply -f ..
Log 0 - 2021-08-08

# Test Harness with PProx

In [34]:
remove_working_db()

for debug in ["1"]:
    for encryption in ["1"]:              
        for client in ["50"]:
            for identifier in range(1):
                for proxy in ["localhost:32000/sgx-proxy"]:
                    name = "proxy{}_{}".format(client, identifier)
                    params_recsys = {
                        "hostAddress": hostAddress,
                        "elastic!replicas":1,                        
                        "harness!replicaCount":1,
                        "proxy-sgx!replicaCount":1,
                        "proxy-sgx!proxyType1":"1",
                        "proxy-sgx!proxyType2":"2",
                        "proxy-sgx!targetPort":"9090",            
                        "proxy-sgx!enableEncryption":encryption,
                        "proxy-sgx!bufferShuffling":"10",
                        "proxy-sgx!responseTimeout":[999999],         
                        "proxy-sgx!debug":debug,
                        "proxy-sgx!image!repository":proxy,
                        "mongo!replicaSet!enabled": "false",
                        "elastic!esJavaOpts":'"-Xmx16g -Xms16g"',
                        "elastic!resources!requests!cpu":"1500m",
                        "elastic!resources!requests!memory":"17Gi",
                        "elastic!resources!limits!cpu":"2000m",
                        "elastic!resources!limits!memory":"17Gi",                        
                        "name":name,
                        "timeout":450,
                    #    "mongo!clusterDomain":"nuc.local"  # Louvain
                        "mongo!clusterDomain":"cluster.local" # Bordeaux
                    }
                    params_injector={
                        "duration":300,
                        "clients":[client],
                        "replicaCount":1,
                        #"targetHost":["http://stub-stub-recsys:8080"],   # stub      
                        "targetHost":["http://recsys-proxy-sgx-1:8080"], # pprox                
                        "concurrency":10,
                        "timeout":1200,
                        "asyncFluentd":"false",
                        "enableEncryption":encryption,
                        "randomShift":"1",                
                        "name":name,
                        "harness":"true" # false if stub
                    }
                    params = {
                        "start_time":time.time(),
                        "params_recsys":params_recsys,
                        "params_injector":params_injector,
                        "name:":name,
                        "error_module":"",
                        "error":0,
                        "status":"created",
                        "name":name,
                        "stub":"0"
                    }

                    loop_run_test(params, retry=5)

# save working db (private-recsys) to debug-stub+only                    
fusion_experiments_db("private-recsys", "debug-harness", drop=True)   


odict_keys(['proxy-sgx!responseTimeout'])
{'proxy-sgx!responseTimeout': 999999}
odict_keys(['clients', 'targetHost'])
{'clients': '50', 'targetHost': 'http://recsys-proxy-sgx-1:8080'}
db:proxy-sgx!responseTimeout_clients-targetHost_20210808132840
[{'duration': 300, 'clients': '50', 'replicaCount': 1, 'targetHost': 'http://recsys-proxy-sgx-1:8080', 'concurrency': 10, 'timeout': 1200, 'asyncFluentd': 'false', 'enableEncryption': '1', 'randomShift': '1', 'name': 'proxy50_0', 'harness': 'true'}]
helm --namespace default delete stub
helm --namespace default delete stub
Log 0 - 2021-08-08 13:28:40 : Error: uninstall: Release not loaded: stub: release: not found
1
helm --namespace default delete injector
helm --namespace default delete injector
Log 0 - 2021-08-08 13:28:40 : Error: uninstall: Release not loaded: injector: release: not found
1
helm --namespace default delete recsys
helm --namespace default delete recsys
Log 0 - 2021-08-08 13:28:41 : release "recsys" uninstalled
0
kubectl apply 