In [1]:
#results_estimation=["variable-results/q5-20230513114908"]
results_estimation=["search-results/q5-20230604063557"]
results_estimation=["search-results/query-20230607203251"]
#results_estimation=["variable-results/q11-final-20230423060007", "variable-results/q11-final-20230424142915"]
#results_estimation=["variable-results/q8-final-20230422200802"]
kind=False
g5k=True
# infra
job_managers_qty = 8
kafka_qty = 8
kafka_partitions=32
kafka_replicas=8
kafka_node_selector="kafka"
reset_kafka_data = True
model_type = "log"
flink_limit_sources = True

cpu= 16
task_slots_per_task_manager = 16
task_managers_qty = 32
ratio_tm = 80 / 128

source_parallelism = 32
source_capacity = 150000
memory_range = [65536] 

datagen_configuration = {
    "parallelism": 32,
    "cpu": 8,
    "memory" : 16384,
    "task_managers_qty": 8,
    "task_slots": 8,
    "timeout": 600,
    "notebook": "/xp_datagen"
}
throughput_ratios = [1., 1.2, 1.5]

results_path = "regression-results/"

notebook = "/xp_intro_q5_kafka"
filter_data = ""
warmup = 120
sensitivity = 0.01
ratio = 1

xp_name = "query"
task_slots_limits = []
timeout = 1800
throughputs = [2500000]

compute_reg = 15

ratio_overhead_throughput = 1.10

In [None]:
import json
import yaml
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import PolynomialFeatures, FunctionTransformer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import LeaveOneOut, cross_val_score
from sklearn.pipeline import make_pipeline
import seaborn as sns
from matplotlib import pyplot as plt
import logging
import numpy as np
from time import sleep
import sys
%run ../common/scripts.ipynb
if "../streambed" not in sys.path:
    sys.path.append("../streambed")
import streambed
streambed.setup_logging(default_path="./logging.yaml", default_level=logging.WARN)
logger = logging.getLogger('streambed')
if kind:
    streambed.init_kind()
if g5k:
    streambed.init_g5k()

logging.info("Execute notebook")

In [None]:
reg = streambed.InfrastructureRegression()

for result_file in results_estimation:
    reg.load("{}.csv".format(result_file))
    reg.load_details("{}-details.yaml".format(result_file))
reg.filter_data(filter_data)

reg.data[["used_task_slots", "memory", "observed_source_rate"]]


reg.heatmap()

sqrt_regression = make_pipeline(FunctionTransformer(np.sqrt, validate=True), LinearRegression())
_, score = reg.generic_model("sqrt", sqrt_regression)
throughput = reg.predict("sqrt", task_managers_qty*cpu, 65536)
print("Sqrt: {} - {}".format(score, throughput))
logarithmic_regression = make_pipeline(FunctionTransformer(np.log1p, validate=True), LinearRegression())
_, score = reg.generic_model("log", logarithmic_regression)
throughput = reg.predict("log", task_managers_qty*cpu, 65536)
print("Log: {} - {}".format(score, throughput))

linear_regression = make_pipeline(PolynomialFeatures(degree=1), LinearRegression())
_, score = reg.generic_model("linear", linear_regression)
throughput = reg.predict("linear", task_managers_qty*cpu, 65536)
print("Linear: {} - {}".format(score, throughput))


In [None]:
pipelines = {
    "linear": make_pipeline(PolynomialFeatures(degree=1),  LinearRegression()),
    "log": make_pipeline(FunctionTransformer(np.log1p, validate=True), LinearRegression()),
    "sqrt": make_pipeline(FunctionTransformer(np.sqrt, validate=True), LinearRegression())
}

for pipeline in pipelines:
    print("******** {} *******".format(pipeline))
    (models, scores) = reg.generic_models(pipelines[pipeline],5, 20)
    low_score = None
    low_index = None
    for i in range(len(models)): 
        if (low_score is None) or (low_score < scores[i]):
            low_index = i
            low_score = scores[i]
            reg.set_model(pipeline, models[i])
        display()
        logger.debug("{} {} {}".format(i, scores[i], reg.compute_x(pipeline, throughputs[0], range(10,1000), [65536])))
    print("Selected: {} - {}".format(low_index, scores[low_index], reg.compute_x("sqrt", throughputs[0], range(10,1000), [65536])))
    logger.info("Selected: {} - {}".format(low_index, scores[low_index], reg.compute_x("sqrt", throughputs[0], range(10,1000), [65536])))

In [None]:
from sklearn.base import clone
limit_ts_train = reg.data["used_task_slots"].max() // 2
data = reg.data[["used_task_slots", "memory", "observed_source_rate"]]        
train = data.query("(used_task_slots < {})".format(limit_ts_train))
test = data.query("(used_task_slots >= {})".format(limit_ts_train))
X_train = train.iloc[:, :-1].values
y_train = train.iloc[:, -1].values      
X_test = test.iloc[:, :-1].values
y_test = test.iloc[:, -1].values    
for name in reg.models:
    print(name)
    model = clone(reg.models[name])
    loo = LeaveOneOut()
    scores = cross_val_score(model, X_train, y_train, cv=loo, scoring='neg_root_mean_squared_error')
    display(np.mean(scores))        

    model.fit(X_train,y_train)
    # Assess the trained model's performance on the validation set
    y_val_pred = model.predict(X_train)
    trained_rmse = np.sqrt(mean_squared_error(y_train, y_val_pred))
    print(f"Trained RMSE: {trained_rmse}")

    # Finally, you can evaluate the final model's performance on the test set.
    y_test_pred = model.predict(X_test)
    final_rmse = np.sqrt(mean_squared_error(y_test, y_test_pred))
    print(f"Final RMSE: {final_rmse}")  
    logger.info(f"Model {name} - Trained RMSE: {trained_rmse} - Final RMSE: {final_rmse}")   

In [None]:
X1 = []
X2 = []
y = []
mo = []
for model in reg.models:


    for i in range(1,task_managers_qty):
        for m in memory_range:
            t = cpu*i
            X1.append(t)
            X2.append(m)
            val = reg.predict(model, t,m)[0]
            y.append(val)
            mo.append(model)

df = pd.DataFrame({
    'ts': np.array(X1),
    'mem': np.array(X2),
    'tp' : np.array(y),
    'model' : mo
})
sns.relplot(data=df, x="ts", y="tp", hue="mem", col="model")
reg.get_parallelism_throughput(throughputs[0], memory_range[0], source_capacity, model_name=model_type)

In [None]:
import math
task_parallelism, task_slots, needed_sources = reg.get_parallelism_throughput(throughputs[0], memory_range[0], source_capacity, model_name=model_type)
task_parallelism, task_slots, needed_sources
if not flink_limit_sources:
    needed_sources = kafka_partitions
tm_qty = math.ceil((task_slots + needed_sources)/ task_slots_per_task_manager)
actual_needed_task_managers_qty = math.ceil(tm_qty / ratio_tm)
logger.info("Needed: {} ({} + {}) - needed TM : {} - needed TM with load: {}".format( task_slots + needed_sources, needed_sources, task_slots, tm_qty, actual_needed_task_managers_qty ))
"Needed: {} ({} + {}) - needed TM : {} - needed not fully loaded TM : {}".format( task_slots + needed_sources, needed_sources, task_slots, tm_qty, actual_needed_task_managers_qty )

In [None]:
#papermill_description=Reset_Kafka
init_label_nodes(jobmanagers_qty=job_managers_qty, kafka_qty=kafka_qty)
if reset_kafka_data:
    #papermill_description=Initialize libraries
 
    address = "127-0-0-1"
    port=30081

    !kubectl delete -n kafka kafka/my-cluster
    sleep(10)
    if kafka_qty > 0:
        #run_command("kubectl create -f ./kafka.yaml", shell=False)
        streambed.deploy_kafka(kafka_partitions, kafka_replicas, node_selector=kafka_node_selector, antiaffinity=g5k, retention_duration=5) # set to 5 minutes retention
        run_command("kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka", shell=False)
        # Kafka UI
        run_command("helm install -f values-kowl.yaml -n kafka kowl cloudhut/kowl", shell=False)
        #(ip_url, dns_url) = get_service_public_address("kafka", "manager", "kowl", 80)
        (manager_node, jobmanager_node, taskmanager_node) = get_label_nodes()
        print("Kafka Kowl: {} - Ingress: {}".format(manager_node, "http://kowl.{}.sslip.io:{}".format(address,port)))    
        !kubectl apply -f ./kafka-bridge.yaml
        !kubectl apply -f ./kafka-bridge-service.yaml

In [4]:
max(throughput_ratios)

1.5

In [None]:
list_xp=[]
for memory in memory_range:
    for throughput in throughputs:
        task_parallelism, task_slots, needed_sources = reg.get_parallelism_throughput(throughput * ratio_overhead_throughput, memory, source_capacity, model_name=model_type)
        if not flink_limit_sources:
            needed_sources = kafka_partitions     
        task_parallelism, task_slots, needed_sources
        tm_qty = math.ceil((task_slots + needed_sources)/ task_slots_per_task_manager)
        actual_needed_task_managers_qty = math.ceil(tm_qty / ratio_tm)
        logger.info("Needed: {} ({} + {}) - needed TM : {} - needed TM with load: {}".format( task_slots + needed_sources, needed_sources, task_slots, tm_qty, actual_needed_task_managers_qty ))
        display("Needed: {} ({} + {}) - needed TM : {} - needed not fully loaded TM : {}".format( task_slots + needed_sources, needed_sources, task_slots, tm_qty, actual_needed_task_managers_qty ))

        list_xp.append(
            {"cpu": cpu, 
            "memory": memory, 
            "run": 1, 
            "task_slots_per_task_manager": task_slots_per_task_manager, 
            "task_managers_qty": actual_needed_task_managers_qty,
            "source_parallelism": needed_sources, 
            "parallelism": 1, 
            "evenly_spread": "true", 
            "warmup": 120,
            "custom_memory": None, 
            "task_parallelism": task_parallelism.replace("\\\\\\\"", "\\\\\\\""),
            "dichotomic_mst_tuning": None,
            "task_slots_limit": 0,
            "throughputs": [int(x * throughput) for x in throughput_ratios],
            "params.TPS": throughput,
            "timeout": timeout,
            "params.EVENTS_NUM": str(int(timeout * throughput * max(throughput_ratios))),
            "params.PERSON_PROPORTION": "1",
            "params.AUCTION_PROPORTION": "3",
            "params.BID_PROPORTION": "46",
            "nb_runs_throughput": 1,
            "nb_runs_parallelism": 0,
            "ratio_overhead_throughput" : ratio_overhead_throughput,
            "sensitivity": sensitivity,
            "notebook": notebook})

In [None]:
#papermill_description=Loop_verification
import datetime            
now = datetime.datetime.now()
now_str = now.strftime("%Y%m%d%H%M%S")     
path= "{}/{}-{}".format(results_path, xp_name, now_str)

streambed.loop_verification(list_xp, datagen_configuration, results_path=path, detail_metrics=True)
