In [1]:
!pip install importnb 
!pip install ipywidgets 
#!jupyter nbextension enable --py widgetsnbextension
#!jupyter labextension install @jupyter-widgets/jupyterlab-manager




In [2]:
from importnb import Notebook
with __import__('importnb').Notebook():
    from scripts import *



In [3]:
%%capture
from tqdm.notebook import tqdm as tqdm

In [4]:
import json
import os
import sys
import pprint
import traceback
from datetime import datetime
import requests
import time

In [5]:
def get_current_job_id(flink_address):
    print("Running get_current_job_id in " + flink_address)
    out = subprocess.check_output("set -e pipefail && curl -s {flink_address}/jobs/ | jq '.jobs[].id' | tr -d '\"'".format(flink_address=flink_address),
                                   shell=True)
    print(out)
    print("Finished get_current_job_id is %s" % out.decode("UTF-8").strip())

    return out.decode("UTF-8").strip()

def rescale(number):
    print("Clean logs and old job artifacts")
    run_command("kubectl scale deployment -n default flink-jobmanager --replicas=0", shell=False)
    run_command("kubectl scale deployment -n default flink-taskmanager --replicas=0", shell=False)
    time.sleep(5)
    run_command("kubectl scale deployment -n default flink-jobmanager --replicas=1", shell=False)
    run_command("kubectl scale deployment -n default flink-taskmanager --replicas={}".format(number), shell=False)
    time.sleep(60)
    return 


In [6]:
def install_chart(path, name, params, timeout="180s"):
    with cd(path):
        param_str = ""
        for k in params.keys():
            param_str += "--set {}={} ".format(str(k).replace("!","."), str(params[k]).replace("!","."))
        command = "helm install {}  . {} --wait  --namespace default --timeout {} ".format(name, param_str, timeout)
        result = run_command(command, log = shell_log)
        return result   
def uninstall_chart(path, name, timeout="120s"):
    with cd(path):
        result = run_command("helm delete {} --namespace default".format(name), log = shell_log)
        print(result)
        if result == "0":
            sleep(timeout)
    return result    

In [7]:
def reinstall(install_name, version, params=None):
    uninstall_chart("../charts/flink", install_name)
    sleep(60)
    if params is None:
        params = {
            "image!tag":version
        }
    install_chart("../charts/flink", install_name, params)

In [8]:
def install(version, params=None):
    install_name = "flink" 
    if params is None:
        params = dict(default_params_flink)
    params["image!tag"] = version
    reinstall(install_name, version, params)
    
def test(chart_name,bucket=None, default_params=None, specific_params={}, taskmanager_quantity=30, no_rescale=False):
   
    if default_params is None:
        return None
    
    params = default_params.copy()
    for name in specific_params:
        params[name] = specific_params[name]
    print(params)
    params["bucket"] = bucket

    uninstall_chart("../charts/flink-job", chart_name)
    if not no_rescale:
        rescale(taskmanager_quantity)
        sleep(60)
    
    install_chart("../charts/flink-job", chart_name, params)
    ret = run_command("kubectl -n default wait --for=condition=complete --timeout={}s job/{}".format(600, chart_name + "-flink-job-retrieve"))
    
    return ret

In [9]:
shell_log = False
default_params_flink = {
    "taskmanager!replicaCount": 1,
    "hdfs!persistence!nameNode!storageClass": "local-path",
    "hdfs!persistence!dataNode!storageClass": "local-path",
    "hdfs!persistence!nameNode!size": "5Gi",
    "hdfs!persistence!dataNode!size": "5Gi",
    "zookeeper!persistence!storageClass": "local-path",
    "kafka!persistence!storageClass": "local-path",
    "hdfs!enabled":"false"
}

duration = 120
default_params_ar = {
    #"testName":test_name,
    #"bucket":"increasing-length3",
    #"algorithm": algorithm,
    "image!tag": "1.7.2-ar",
    "installKafka": "false",
    "job!jobClass":"me.florianschmidt.microbenchmark.jobs.IncreasingLengthJob",
    "job!length":1,
    "job!duration": duration,
    "job!mapParallelism": "2",
    "algorithm!checkpointing":"false",
    "algorithm!stateBackend":"rocksdb",
    "algorithm!idleMarks": "true",
    "job!sharingGroup":"true",
    "installHdfs":"true",
    "job!bufferTimeout": 5
}  
default_params_vanilla = {
    #"testName":test_name,
    #"bucket":"increasing-length3",
    "image!tag": "1.7.2-baseline",
    "installKafka": "false",
    "algorithm!type": "VANILLA",
    "job!jobClass":"me.florianschmidt.microbenchmark.jobs.IncreasingLengthJob",
    "job!length":1,
    "job!duration": duration,
    "job!mapParallelism": "2",
    "algorithm!checkpointing":"true",
    "algorithm!stateBackend":"rocksdb",
    "algorithm!idleMarks": "false",
    "job!sharingGroup":"true",
    "killTaskManager!waitForKill": "True",
    "installHdfs":"true",
    "job!bufferTimeout": 5    
}

## parameters initialization

In [10]:
# VANILLA NO CHECKPOINTING
params_vanilla_nocp = dict(default_params_vanilla)
params_vanilla_nocp["name_algorithm"] = "nc"
params_vanilla_nocp["algorithm!type"] = "VANILLA_NOCP"
params_vanilla_nocp["algorithm!checkpointing"] = "false"
# VANILLA CHECKPOINTING
params_vanilla = dict(default_params_vanilla)
params_vanilla["name_algorithm"] = "cp"
params_vanilla["algorithm!type"] = "VANILLA"
params_vanilla["installHdfs"] = "true"
# ACTIVE_REPLICATION NO_ORDERING
params_ar_noordering = dict(default_params_ar)
params_ar_noordering["name_algorithm"] = "no"
params_ar_noordering["algorithm!type"] = "NO_ORDERING"
# ACTIVE_REPLICATION BETTER_BIAS
params_ar_betterbias = dict(default_params_ar)
params_ar_betterbias["name_algorithm"] = "tm"
params_ar_betterbias["algorithm!type"] = "BETTER_BIAS"
params_ar_betterbias["algorithm!idleMarksInterval"] = 25
# ACTIVE_REPLICATION BETTER_BIAS 
params_ar_betterbias_nosp = dict(default_params_ar)
params_ar_betterbias_nosp["name_algorithm"] = "tm"
params_ar_betterbias_nosp["algorithm!type"] = "BETTER_BIAS"
params_ar_betterbias_nosp["algorithm!idleMarks"] = "false"

# ACTIVE_REPLICATION LEADER_KAFKA
params_ar_leader_kafka = dict(default_params_ar)
params_ar_betterbias_nosp["algorithm!idleMarks"] = "false"
params_ar_leader_kafka["name_algorithm"] = "ka"
params_ar_leader_kafka["algorithm!type"] = "LEADER_KAFKA"
params_ar_leader_kafka["installKafka"] = "true"

# kill Leader
params_ar_leader_kafka_kill_leader = dict(params_ar_leader_kafka)
params_ar_leader_kafka_kill_leader["killTaskManager!kazooTarget"] = "leader"
params_ar_leader_kafka_kill_follower = dict(params_ar_leader_kafka)
params_ar_leader_kafka_kill_follower["killTaskManager!kazooTarget"] = "follower"

# liverobin
params_ar_liverobin = dict(default_params_ar)
params_ar_liverobin["algorithm!type"] = "LIVE_ROBIN" 
params_ar_liverobin["algorithm!liveRobinMarks"] = "true" 
params_ar_liverobin["algorithm!idleMarks"] = "false" 
params_ar_liverobin["name_algorithm"] = "lr"
params_ar_liverobin["installKafka"] = "true"
params_ar_liverobin["algorithm!heartbeatEmitter!enabled"] = "true"
params_ar_liverobin["algorithm!liveRobinMarksInterval"] = 10

In [11]:
# helm dep update


# test high 99th percentile

In [14]:
bucket = "test"
length = 1

test_maps = [params_vanilla]

interval=0
for test_map in tqdm(test_maps):
    flink_params = dict(default_params_flink)
    flink_params["kafka!enabled"] = test_map["installKafka"]
    flink_params["zookeeper!enabled"] = test_map["installKafka"]
    flink_params["hdfs!enabled"] = test_map["installHdfs"]    
    install(test_map["image!tag"], flink_params)
    for length in tqdm([1]):    
        for index in tqdm(range(1)):
            test("test-{}-{}-{}-{}".format(test_map["name_algorithm"], length, interval, index),
                 bucket,
                 test_map,
                 {
                   "job!length":length,
                    "algorithm!liveRobinMarksInterval" : interval,
                    "job!latencyTrackingInterval": "25",
                 },
                 no_rescale=False,
                 taskmanager_quantity=2
                )  
        

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

helm delete flink --namespace default
0
helm install flink  . --set taskmanager.replicaCount=1 --set hdfs.persistence.nameNode.storageClass=local-path --set hdfs.persistence.dataNode.storageClass=local-path --set hdfs.persistence.nameNode.size=5Gi --set hdfs.persistence.dataNode.size=5Gi --set zookeeper.persistence.storageClass=local-path --set kafka.persistence.storageClass=local-path --set hdfs.enabled=true --set kafka.enabled=false --set zookeeper.enabled=false --set image.tag=1.7.2-baseline  --wait  --namespace default --timeout 180s 


HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

{'image!tag': '1.7.2-baseline', 'installKafka': 'false', 'algorithm!type': 'VANILLA', 'job!jobClass': 'me.florianschmidt.microbenchmark.jobs.IncreasingLengthJob', 'job!length': 1, 'job!duration': 120, 'job!mapParallelism': '2', 'algorithm!checkpointing': 'true', 'algorithm!stateBackend': 'rocksdb', 'algorithm!idleMarks': 'false', 'job!sharingGroup': 'true', 'killTaskManager!waitForKill': 'True', 'installHdfs': 'true', 'job!bufferTimeout': 5, 'name_algorithm': 'cp', 'algorithm!liveRobinMarksInterval': 0, 'job!latencyTrackingInterval': '25'}
helm delete test-cp-1-0-0 --namespace default
1
Clean logs and old job artifacts
kubectl scale deployment -n default flink-jobmanager --replicas=0
kubectl scale deployment -n default flink-jobmanager --replicas=0
Log 0 - 2021-08-09 20:22:39 : deployment.apps/flink-jobmanager scaled
kubectl scale deployment -n default flink-taskmanager --replicas=0
kubectl scale deployment -n default flink-taskmanager --replicas=0
Log 0 - 2021-08-09 20:22:39 : deploym

# liverobintest

In [12]:
bucket = "liverobin-testt"
length = 1
test_maps = [params_ar_liverobin]
#test_maps = [params_ar_leader_kafka]
for test_map in tqdm(test_maps):
    flink_params = dict(default_params_flink)
    flink_params["kafka!enabled"] = test_map["installKafka"]
    flink_params["zookeeper!enabled"] = test_map["installKafka"]
    flink_params["hdfs!enabled"] = test_map["installHdfs"]    
    install(test_map["image!tag"], flink_params)
    for length in tqdm(range(1,5)):    
        for interval in tqdm([5,10,50,100]):
            for index in tqdm(range(5)):
                test("mb1-{}-{}-{}-{}".format(test_map["name_algorithm"], length, interval, index),
                     bucket,
                     test_map,
                     {
                       "job!length":length,
                        "algorithm!liveRobinMarksInterval" : interval                     
                     },
                     no_rescale=False,
                     taskmanager_quantity=29
                    )  
        

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

helm delete flink --namespace default
1
helm install flink  . --set taskmanager.replicaCount=1 --set hdfs.persistence.nameNode.storageClass=local-path --set hdfs.persistence.dataNode.storageClass=local-path --set hdfs.persistence.nameNode.size=5Gi --set hdfs.persistence.dataNode.size=5Gi --set zookeeper.persistence.storageClass=local-path --set kafka.persistence.storageClass=local-path --set hdfs.enabled=true --set kafka.enabled=true --set zookeeper.enabled=true --set image.tag=1.7.2-ar  --wait  --namespace default --timeout 180s 


HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=5.0), HTML(value='')))

{'image!tag': '1.7.2-ar', 'installKafka': 'true', 'job!jobClass': 'me.florianschmidt.microbenchmark.jobs.IncreasingLengthJob', 'job!length': 1, 'job!duration': 120, 'job!mapParallelism': '2', 'algorithm!checkpointing': 'false', 'algorithm!stateBackend': 'rocksdb', 'algorithm!idleMarks': 'false', 'job!sharingGroup': 'true', 'installHdfs': 'true', 'job!bufferTimeout': 5, 'algorithm!type': 'LIVE_ROBIN', 'algorithm!liveRobinMarks': 'true', 'name_algorithm': 'lr', 'algorithm!heartbeatEmitter!enabled': 'true', 'algorithm!liveRobinMarksInterval': 5}
helm delete mb1-lr-1-5-0 --namespace default
1
Clean logs and old job artifacts
kubectl scale deployment -n default flink-jobmanager --replicas=0
kubectl scale deployment -n default flink-jobmanager --replicas=0
Log 0 - 2021-06-29 22:30:19 : deployment.apps/flink-jobmanager scaled
kubectl scale deployment -n default flink-taskmanager --replicas=0
kubectl scale deployment -n default flink-taskmanager --replicas=0
Log 0 - 2021-06-29 22:30:19 : deplo

KeyboardInterrupt: 

In [84]:
test("lr-{}-{}-{}".format(test_map["name_algorithm"], length, index),
     bucket,
     test_map,
     {
       "job!length":length,
       "job!rate":1000,
     },
     no_rescale=True,
     taskmanager_quantity=29
    )   

{'image!tag': '1.7.2-ar', 'installKafka': 'true', 'job!jobClass': 'me.florianschmidt.microbenchmark.jobs.IncreasingLengthJob', 'job!length': 1, 'job!duration': 120, 'job!mapParallelism': '2', 'algorithm!checkpointing': 'false', 'algorithm!stateBackend': 'rocksdb', 'algorithm!idleMarks': 'false', 'job!sharingGroup': 'true', 'installHdfs': 'true', 'algorithm!type': 'LIVE_ROBIN', 'algorithm!liveRobinMarks': 'true', 'name_algorithm': 'lr', 'algorithm!heartbeatEmitter!enabled': 'true', 'job!rate': 1000}
helm delete lr-lr-1-0 --namespace default
0
helm install lr-lr-1-0  . --set image.tag=1.7.2-ar --set installKafka=true --set job.jobClass=me.florianschmidt.microbenchmark.jobs.IncreasingLengthJob --set job.length=1 --set job.duration=120 --set job.mapParallelism=2 --set algorithm.checkpointing=false --set algorithm.stateBackend=rocksdb --set algorithm.idleMarks=false --set job.sharingGroup=true --set installHdfs=true --set algorithm.type=LIVE_ROBIN --set algorithm.liveRobinMarks=true --set n

0

In [1]:
bucket = "liverobintest"
length = 1
test_maps = [params_ar_liverobin]
#test_maps = [params_ar_leader_kafka]
for test_map in tqdm(test_maps):
    flink_params = dict(default_params_flink)
    flink_params["kafka!enabled"] = test_map["installKafka"]
    flink_params["zookeeper!enabled"] = test_map["installKafka"]
    flink_params["hdfs!enabled"] = test_map["installHdfs"]    
    install(test_map["image!tag"], flink_params)

            

NameError: name 'params_ar_liverobin' is not defined

In [63]:
for length in tqdm(range(2,3)):    
    for index in tqdm(range(1)):
        test("lr-{}-{}-{}".format(test_map["name_algorithm"], length, index),
             bucket,
             test_map,
             {
               "job!length":length,
                "retrieveScript!getAllSinks":"true",
                "job!jobClass":"me.florianschmidt.microbenchmark.jobs.ThreeSourcesJob",  
                "job!rate":1000,
                "job!rate2":1
             },
             no_rescale=False,
             taskmanager_quantity=29
            )   

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

{'image!tag': '1.7.2-ar', 'installKafka': 'true', 'job!jobClass': 'me.florianschmidt.microbenchmark.jobs.ThreeSourcesJob', 'job!length': 2, 'job!duration': 120, 'job!mapParallelism': '2', 'algorithm!checkpointing': 'false', 'algorithm!stateBackend': 'rocksdb', 'algorithm!idleMarks': 'false', 'job!sharingGroup': 'true', 'installHdfs': 'true', 'algorithm!type': 'LIVE_ROBIN', 'algorithm!liveRobinMarks': 'true', 'name_algorithm': 'lr', 'retrieveScript!getAllSinks': 'true', 'job!rate': 1, 'job!rate2': 1}
helm delete lr-lr-2-0 --namespace default
0
Clean logs and old job artifacts
kubectl scale deployment -n default flink-jobmanager --replicas=0
kubectl scale deployment -n default flink-jobmanager --replicas=0
Log 0 - 2021-04-07 08:09:51 : deployment.apps/flink-jobmanager scaled
kubectl scale deployment -n default flink-taskmanager --replicas=0
kubectl scale deployment -n default flink-taskmanager --replicas=0
Log 0 - 2021-04-07 08:09:51 : deployment.apps/flink-taskmanager scaled
kubectl sca

In [51]:
bucket = "test-bettercloud"
length = 1
#docker_tag = "1.7.2-ar"
#test_maps = [params_vanilla, params_ar_betterbias, params_ar_leader_kafka, params_ar_leader_kafka, params_ar_leader_kafka]
test_maps = [params_ar_liverobin]
#test_maps = [params_vanilla, params_ar_betterbias]

for test_map in tqdm(test_maps):
    flink_params = dict(default_params_flink)
    flink_params["kafka!enabled"] = test_map["installKafka"]
    flink_params["zookeeper!enabled"] = test_map["installKafka"]
    flink_params["hdfs!enabled"] = test_map["installHdfs"]    
    install(test_map["image!tag"], flink_params)    
    for interval in [25]:
        for user_rate in tqdm([100]):
            for index in tqdm(range(1)):    
                test_name = format("{}-{}-{}-{}".format(datetime.today().strftime('%Y-%m-%d-%H-%M-%S'), bucket, test_map["name_algorithm"], index))
                test("test{}".format(index),
                     bucket,
                     test_map,
                     {
                        "experimentationName": test_name,             
                        "workload!length": 1,
                        "workload!replicationFactor": 1,
                        "retrieveScript!getAllSinks":"true",                     
                        "job!jobClass":"me.florianschmidt.examples.bettercloud.Job",
                        "injector!enabled":"true",
                        "injector!consumerRate":user_rate,
                        "injector!controlRate":5,
                        "injector!seed": index + 1,
                        "algorithm!idleMarksInterval": interval,                     
                        "job!duration": 300,                 
                        "killTaskManager!enabled": "true",
                        "killTaskManager!taskName": "Qualifier",
                        "killTaskManager!killDelay": 150,
                        "killTaskManager!replicaIndex": 0,
                        "killTaskManager!operatorIndex": 0,
                        "killTaskManager!gracePeriod": 0             
                     },
                     no_rescale=False,
                     taskmanager_quantity=29
                    )

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

helm delete flink --namespace default
0
helm install flink  . --set taskmanager.replicaCount=1 --set hdfs.persistence.nameNode.storageClass=local-path --set hdfs.persistence.dataNode.storageClass=local-path --set hdfs.persistence.nameNode.size=5Gi --set hdfs.persistence.dataNode.size=5Gi --set zookeeper.persistence.storageClass=local-path --set kafka.persistence.storageClass=local-path --set hdfs.enabled=true --set kafka.enabled=false --set zookeeper.enabled=false --set image.tag=1.7.2-ar  --wait  --namespace default --timeout 180s 


HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=1.0), HTML(value='')))

{'image!tag': '1.7.2-ar', 'installKafka': 'false', 'job!jobClass': 'me.florianschmidt.examples.bettercloud.Job', 'job!length': 1, 'job!duration': 300, 'job!mapParallelism': '2', 'algorithm!checkpointing': 'false', 'algorithm!stateBackend': 'rocksdb', 'algorithm!idleMarks': 'false', 'job!sharingGroup': 'true', 'installHdfs': 'true', 'algorithm!type': 'LIVE_ROBIN', 'algorithm!liveRobinMarks': 'true', 'name_algorithm': 'lr', 'experimentationName': '2021-04-06-11-47-08-test-bettercloud-lr-0', 'workload!length': 1, 'workload!replicationFactor': 1, 'retrieveScript!getAllSinks': 'true', 'injector!enabled': 'true', 'injector!consumerRate': 100, 'injector!controlRate': 5, 'injector!seed': 1, 'algorithm!idleMarksInterval': 25, 'killTaskManager!enabled': 'true', 'killTaskManager!taskName': 'Qualifier', 'killTaskManager!killDelay': 150, 'killTaskManager!replicaIndex': 0, 'killTaskManager!operatorIndex': 0, 'killTaskManager!gracePeriod': 0}
helm delete test0 --namespace default
0
Clean logs and old