In [None]:
# Software Name : HSLinUCB
# SPDX-FileCopyrightText: Copyright (c) 2021 Orange
# SPDX-License-Identifier: GPL-2.0
#
# This software is distributed under the GNU General Public License v2.0 license
#
# Author: David DELANDE <david.delande@orange.com> et al.

In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

## Import lib

In [None]:
%load_ext autoreload
%autoreload 2
import time
import seaborn as sns
import json
import numpy as np
import math
from math import *
import scipy.stats as stats
from numpy.linalg import inv
%matplotlib notebook
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from IPython.display import clear_output
import matplotlib
import random
import numpy.linalg
import copy
import pandas as pd
pd.set_option('display.max_columns', None)
from tqdm import trange
plt.rcParams.update({'figure.max_open_warning': 0})
import sys
np.set_printoptions(threshold=sys.maxsize)
np.random.seed(1)

import pickle
from collections import deque , OrderedDict
from numpy import asarray
from numpy import save
import h5py
from lib.CogscalingLib import Orchestrator

## Dataset

In [None]:
sessionFile="my_new_dataset.h5" #Set the dataset to use. If the dataset does not exist and self.record == True in environment (see below) the new dataset will be created
environment_orchestrator = Orchestrator(debug=False, sessionFile=sessionFile)

## Orchestrator set up

In [None]:
print("Activate debug on orchestrator thread")
status, message = environment_orchestrator.activateDebug(component='orchestrator')
print("message:", message)

print("Set front-dynamic-component deployment limit to min=1 and max=15")
status, message = environment_orchestrator.setDeploymentLimit(deployment="front-dynamic-component", min=1, max=15)
print("message:", message)

print("set zipkin thread to collect latency from istio ingress and from front-dynamic-component on namespace default")
status, message = environment_orchestrator.setZipkinService(services=['istio-ingressgateway','front-dynamic-component.default'])
print("message:", message)

print("set prometheus thread to collect system metric for front-dynamic-component deployment")
status, message = environment_orchestrator.setPrometheusService(deployments=['front-dynamic-component'])
print("message:", message)

print("set prometheus thread history buffer size to keep 1 system metrics history")
status, message = environment_orchestrator.changePrometheusTraceNumber(trace_number=1)
print("message:", message)

print("Set the number of measures taken from prometheus before raising the event scaling done")
status, message = environment_orchestrator.setSampleNumberBeforeAcceptScaling(sampleNumber=1)
print("message:", message)

print("set zipkin thread history buffer size to keep 4000 latency metrics history")
status, message = environment_orchestrator.changeZipkinTraceNumber(trace_number=4000)
print("message:", message)

print("set zipkin thread lookback time window to 10 seconds")
status, message = environment_orchestrator.setZipkinLookback(lookback=12)
print("message:", message)

print("Display orchestrator configuration")
status, message = environment_orchestrator.getConfig()
print("configuration:", message)

## Environment

In [None]:
class EnvTest(): 
    def __init__(self, narms, level=1,max_level = 25, latency_ref = 600, debug = False):
        self.latency_ref = latency_ref
        self.min_level = 1
        self.max_level = max_level
        self.level = level
        self.debug = debug
        self.narms = narms
        self.loads = [5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,100]
        self.levels = [1,2,3,4,5,6,7,8,9,10]
        self.sessionFile=sessionFile
        self.oracle_cheat = np.full((np.max(self.loads) +1, len(self.levels) + 1),2)
        self.previous_states = []
        self.current_states = []
        self.previous_duration = 0
        self.current_duration = 0
        self.previous_rq = 0
        self.current_rq = 0
        self.targetPattern = []
        self.rounds = 0
        self.replay = False # Set to True to read context from the dataset. replay and record are mutually exclusive.
        self.record = False # Set to True to record context in the dataset. replay and record are mutually exclusive.
        super().__init__()
        return
    
    def reset(self, narms, target, level,max_level, latency_ref):
        self.level = level
        self.rounds = 0
        self.step = 0
                    
    def moving_mean(self,measure,order):
        result = []
        if order%2 == 0:
            m = order / 2
            m = int(m)
            for index in range(0, len(measure)):
                if index < m or index + m + 1 > len(measure):
                    continue
                sum1 = 0
                sum2 = 0
                for index_value in range(index -m, index + m):
                    sum1 = sum1 + measure[index_value]
                    sum2 = sum2 + measure[index_value + 1]
                mean1 = (1/(2*m)) * sum1
                mean2 = (1/(2*m)) * sum2
                result.append((mean1 + mean2) / 2)
        else:
            m = (order - 1) / 2
            m = int(m)
            for index in range(0, len(measure)):
                if index < m or index + m + 1 > len(measure):
                    continue   
                sum1 = 0
                for index_value in range(index - m, index + m + 1):
                    sum1 = sum1 + measure[index_value]
                result.append((1/((2*m)+1)) * sum1)
        return result
    
    def save_PatternModel(self,file = ''):
        #Save the target pattern to a json file
        if file == '':
            file = 'pattern'
        with open(file + '.json', 'w') as filehandle:
            json.dump(self.targetPattern, filehandle)

    def load_PatternModel(self, file = ''):
        #Load the target pattern from a json file
        if file == '':
            file = 'pattern'
        with open(file + '.json') as json_file:
            self.targetPattern = json.load(json_file)
        return self.targetPattern
            
    def display_PatternModel(self):
        #Generate the target pattern graph
        f = plt.figure()
        patternGraph = f.add_subplot(111)
        patternGraph.plot(self.targetPattern)
        plt.ylabel("Number of users",fontsize=16)
        plt.xlabel("Steps",fontsize=16)
        plt.title("Workload injection pattern");
        plt.grid();
        plt.show()
    
    def generateRealProgressiveTargetPattern(self,nrounds,step,target_list):
        #Generate the target pattern
        self.targetPattern = []
        patternChangeNumber = math.floor(nrounds/step)

        target = target_list[0]
        target_list_index = 0
        mode = 1 #1 for increase, 0 for decrease
        p_c = 1
        for n in range(nrounds): 
                if n < (p_c * step):
                    self.targetPattern.append(target)
                elif p_c <= patternChangeNumber:
                    p_c = p_c + 1
                    if mode == 1:
                        if target_list_index < (len(target_list) - 1):
                            target_list_index += 1
                        else:
                            mode = 0
                    else:
                        if target_list_index > 0:
                            target_list_index -= 1
                        else:
                            mode = 1
                    target = target_list[target_list_index]
                    self.targetPattern.append(target)
                else:
                    self.targetPattern.append(target)
        return self.targetPattern
        
    def changeInjector(self):
        user_level = self.targetPattern[self.rounds]
        if self.debug:
            print("injector level:", user_level)
        if self.replay == False:
            status, message = environment_orchestrator.setLocustUser(user=int(user_level),spawn_rate=1)
            print("message:", message)
                                                            
    def getContext(self):
        states = []
        if len(self.previous_states) == 0:
            state = environment_orchestrator.getAgregatedState(components=[{'prometheus': 'front-dynamic-component','zipkin': 'front-dynamic-component-service.default.svc.cluster.local:80/*'}],replay=self.replay, record=self.record,load=self.targetPattern[self.rounds], level=self.level,useMetricServer = True)
            self.previous_states.append(state['lastcomponentNumber'].to_numpy().astype(float)[0])
            self.previous_states.append(state['duration'].to_numpy().astype(float)[0])
            self.previous_duration = state['duration'].to_numpy().astype(float)[0]
            self.previous_states.append(state['req_perc_sec'].to_numpy().astype(float)[0])
            self.previous_rq = state['req_perc_sec'].to_numpy().astype(float)[0]
            self.previous_states.append(state['cpu_usage_mean'].to_numpy().astype(float)[0]) 
        elif len(self.previous_states) != 0 and len(self.current_states) != 0:
            self.previous_states = self.current_states
            self.previous_duration = self.current_duration
            self.previous_rq = self.current_rq
          
        self.current_states = []
        state = environment_orchestrator.getAgregatedState(components=[{'prometheus': 'front-dynamic-component','zipkin': 'front-dynamic-component-service.default.svc.cluster.local:80/*'}],replay=self.replay, record=self.record,load=self.targetPattern[self.rounds], level=self.level,useMetricServer = True)
        response_times = state['response_times'][0] 
        num_reqs_perc_sec = state['num_reqs_per_sec'][0]
        num_fail_per_sec = state['num_fail_per_sec'][0]
        self.current_states.append(state['lastcomponentNumber'].to_numpy().astype(float)[0])  
        self.current_duration = state['duration'].to_numpy().astype(float)[0]
        self.current_rq = state['req_perc_sec'].to_numpy().astype(float)[0]
        self.current_states.append(state['cpu_usage_mean'].to_numpy().astype(float)[0])
        
        states = self.current_states
        states = np.asmatrix(states)
        if self.replay == False:
            self.level = int(self.current_states[0])
        if self.debug:
            print("state returned:", states)
        return states
        
    def armStay(self,context):
        if self.debug:
            print("In action stay function")
            print("level in armStay:", self.level)
        previous_context = context
        reward = self.computeReward(previous_context=previous_context,context = context, action = "stay",actionStep = 0)
        return reward, context

            
    def armUp(self,context,actionStep = 1):
        previous_context = context
        if self.debug:
            print("In action up function")
            print("actionStep:", actionStep)
            print("level in armUp:", self.level)
        if self.level + actionStep > self.max_level:
            if self.debug:
                print("upper limit reached")
            reward = 0
        else:
            if self.replay:
                self.level += actionStep
            else:
                status, message = environment_orchestrator.incrementalKubernetesDeploymentScale(deployment="front-dynamic-component",step=actionStep,waitKubernetes=True,waitPrometheus=True,useMetricServer = False)
                print("message:", message)
            context = self.getContext()
            reward = self.computeReward(previous_context = previous_context,context = context, action = "up",actionStep = actionStep)
        return reward,context
            
    def armDown(self,context, actionStep = 1):
        previous_context = context
        if self.debug:
            print("In action down function")
            print("actionStep:", actionStep)
            print("level in armDown:", self.level)
        if self.level + actionStep < self.min_level:
            if self.debug:
                print("lower limit reached")
            reward = 0
        else:
            if self.debug:
                print("lower limit not reached")
            if self.replay:
                self.level += actionStep
            else:
                status, message = environment_orchestrator.incrementalKubernetesDeploymentScale(deployment="front-dynamic-component",step=actionStep,waitKubernetes=True,waitPrometheus=True,useMetricServer = False)
                print("message:", message)
            context = self.getContext()
            reward = self.computeReward(previous_context = previous_context,context = context, action = "down",actionStep = actionStep)
        return reward,context
        
    def computeReward(self,previous_context,context, action,actionStep = 1):
        if self.debug:
            print("in compute reward function")
        if action == "up":    
            if self.debug:
                print("compute reward for action up")
                print("previous latency for reward computation:", self.previous_duration)
                print("latency for reward computation:", self.current_duration)
            if self.previous_duration > self.latency_ref:
                reward = 1
            else:
                reward = 0

        if action == "down":
            if self.debug:
                print("compute reward for action down")
                print("previous latency for reward computation:", self.previous_duration)
                print("latency for reward computation:", self.current_duration)
            if self.current_duration <= self.latency_ref:
                reward = 1
            else:
                reward = 0
                    
        if action == "stay":
            if self.debug:
                print("Compute reward for action stay")
                print("previous latency for reward computation:", self.previous_duration)
                print("latency for reward computation:", self.current_duration)
            if self.current_duration <= self.latency_ref:
                reward = 1
            else:
                reward = 0
        if self.debug:
            print("returned reward:", reward)
        return reward
        
    def computeOracle(self):
        with h5py.File(self.sessionFile, "r") as f:
            for load in self.loads:
                bestset = False
                for level in self.levels:
                    load_grp = f.get(str(load))
                    component_grp = load_grp.get('front-dynamic-component')
                    level_grp = component_grp.get(str(level))
                    d = level_grp.get('measure')
                    data = np.asarray(d)
                    duration = data[:,3].astype(float)
                     
                    if np.mean(duration) <= self.latency_ref and np.max(duration) <= self.latency_ref:
                        if not bestset:
                            self.oracle_cheat[load,level] = 1
                            bestset = True
                        else:
                            self.oracle_cheat[load,level] = 0
            if self.debug:
                print("oracle cheat:", self.oracle_cheat)
                
    def oracle(self):
        #Return the Oracle best action
        return self.oracle_cheat[int(self.targetPattern[self.rounds]),int(self.current_states[0])]
    
    def oracleOptimalLevel(self):
        #Return the Oracle best action
        return np.where(self.oracle_cheat[int(self.targetPattern[self.rounds])] == 1)[0][0]

## Display Wikibench workload pattern

In [None]:
#Generate the environment
Environment = EnvTest(narms=3,level=1,max_level = 4)
#Load pattern from a json file(pattern.json). This method has to be called each time an environment needs to use the target pattern
Environment.load_PatternModel()
#Display graphically the target pattern model
Environment.display_PatternModel()

In [None]:
def Evaluate(Environment, Agent, nrounds=None, tie_break_mode = "random",DisplayCumulativeRewardGraph = False, debug = False):
    environment_level = []
    latency_history = []
    latency_ref_history = []
    T = 0
    latency_error = 0
    injector_level = np.zeros(nrounds)
    pattern = Environment.targetPattern[0:nrounds]
    assert Agent._K%2 != 0, 'Arms number must be impair'
    arm_action_map = np.zeros(Agent._K)
    for i in range(int((Agent._K-1)/2)):
        arm_action_map[i] = int((-(Agent._K-1)/2) + i)
    for i in range(int((Agent._K-1)/2)):
        arm_action_map[i + 1 + (int((Agent._K-1)/2))] = int(i +1)
    if debug:
        print("arm action map:", arm_action_map)

    for i in range(nrounds):
        Environment.rounds = i
        Environment.changeInjector()     
        context = Environment.getContext()
        action, desiredReplicas = Agent.select(context,tie_break_mode)
        desiredActionStep = desiredReplicas - Environment.current_states[0]       
        if action < int((Agent._K-1)/2) :
            reward,next_context = Environment.armDown(context = context,actionStep = int(desiredActionStep))
        if action == int((Agent._K-1)/2):     
            reward,next_context = Environment.armStay(context = context)
        if action > int((Agent._K-1)/2):
            reward,next_context = Environment.armUp(context = context,actionStep = int(desiredActionStep))

        latency_history.append(Environment.current_duration)
        injector_level[i] = Environment.targetPattern[i]
        latency_ref_history.append(Environment.latency_ref)
        if Environment.current_duration > Environment.latency_ref:
            latency_error += 1
        print("latency error:", latency_error)
        environment_level.append(Environment.current_states[0])
        T +=1
        if (DisplayCumulativeRewardGraph and T %5 == 0):
            %matplotlib inline
            clear_output(True)
            fig, ax = plt.subplots(figsize=(6, 4), nrows=1, ncols=1)
            plt.xlabel('steps',fontsize=16)
            plt.ylabel("Number of users",fontsize=16)
            plt.xticks(fontsize=13,fontweight='normal')
            plt.yticks(fontsize=13,fontweight='normal')
            ax.plot(pattern,label="pattern")
            ax.plot(injector_level,label='current')
            ax.grid()
            ax.set_title('Real injection pattern')
            plt.tight_layout()
            plt.show()
            fig, ax = plt.subplots(figsize=(6, 4), nrows=1, ncols=1)
            plt.xlabel('steps',fontsize=16)
            plt.ylabel("Number of containers",fontsize=16)
            plt.xticks(fontsize=13,fontweight='normal')
            plt.yticks(fontsize=13,fontweight='normal')
            ax.plot(environment_level,label='Threshold')
            ax.grid()
            ax.set_title('Real environment level')
            plt.tight_layout()
            plt.legend(loc = 'upper right',prop={'size':20})
            plt.show()
            fig, ax = plt.subplots(figsize=(6, 4), nrows=1, ncols=1)
            plt.xlabel('steps',fontsize=16)
            plt.ylabel("Latency(ms)",fontsize=16)
            plt.xticks(fontsize=13,fontweight='normal')
            plt.yticks(fontsize=13,fontweight='normal')
            ax.plot(latency_history,label='Threshold')
            ax.plot(latency_ref_history,label=r'$l^{*}$')
            ax.grid()
            ax.set_title('Real latency')
            plt.tight_layout()
            plt.legend(loc = 'upper right',prop={'size':20})
            plt.show()   

    return injector_level,latency_ref_history,latency_history, environment_level

## Hpa

In [None]:
class Hpa():
    
    def __init__(self,K, alpha = 1):
        self._K = K
        self._alpha = alpha

    def select(self, context, tie_break_mode = "random"):

        desiredReplicas = math.ceil(context[0,0] * ( (context[0,1]) / self._alpha ))
        print("desired replica:", desiredReplicas)
        if desiredReplicas < context[0,0]:
            print("choose down")
            return 0,desiredReplicas
            
        if desiredReplicas == context[0,0]:
            print("choose stay")
            return 1,desiredReplicas
            
        if desiredReplicas > context[0,0]:
            print("choose up")
            return 2,desiredReplicas


## Launch Hpa threshold on real environment

In [None]:
debug = False
latency_ref = 600
starting_level = 1 #The environement will start at this specified level
max_level = 10 #10 #The number of levels that can be managed in the environment
#Agent setup
nrounds = 912
Arms=3 #Number of actions (must be impair)
exploration = 470  #Threshold level
tie_break_mode = "random" #Action selection mode when tied. Possible value: "random", "min", "max"
displayDynamicGraph = True #If True display regret and environment change dynamicaly. This dramatically increases the simulation time. Change this value require a full restart of the notebook as the graphic rendering engine change.
ExperimentNumber = 10
environment_level_exp = np.empty((ExperimentNumber, nrounds))
latency_exp = np.empty((ExperimentNumber, nrounds))
latency_reference_exp = np.empty((ExperimentNumber, nrounds))
injector_level_exp = np.empty((ExperimentNumber, nrounds))
for experiment in trange(ExperimentNumber):
    #Create environment
    Environment = EnvTest(Arms,level=starting_level,max_level = max_level,latency_ref = latency_ref, debug = debug)
    Environment.load_PatternModel()
    #Create Agent
    Agent = Hpa(Arms, alpha = exploration)
    #Start simulation
    print("set current locust user to 45 (injection is started automatically if currently stopped)")
    status, message = environment_orchestrator.setLocustUser(user=45,spawn_rate=1)
    print("message:", message)
    print("set current container number to 5")
    status, message = environment_orchestrator.setKubernetesDeploymentScale(deployment="front-dynamic-component",number=5,waitKubernetes=True,waitPrometheus=True, useMetricServer = False)
    print("message:", message)
    print("Wait 5 seconds for injector to send some requests..")
    time.sleep(5)
    injector_level,latency_ref_history,latency, environment_level = Evaluate(Environment, Agent, nrounds=nrounds, tie_break_mode=tie_break_mode ,DisplayCumulativeRewardGraph = displayDynamicGraph, debug = debug)
    injector_level_exp[experiment] = injector_level
    latency_reference_exp[experiment] = latency_ref_history
    latency_exp[experiment] = latency
    environment_level_exp[experiment] = environment_level
    
with open("hpa_injector_level.bin", 'wb') as f:
    np.save(f , injector_level_exp)
with open("hpa_latency_reference.bin", 'wb') as f:
    np.save(f , latency_reference_exp)
with open("hpa_latency.bin", 'wb') as f:
    np.save(f , latency_exp)
with open("hpa_environment_level.bin", 'wb') as f:
    np.save(f , environment_level_exp)

