In [None]:
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_percentage_error, accuracy_score, r2_score
from sklearn.model_selection import train_test_split

In [None]:
include_metrics = ['cpu_app', 'memory_app', 'cpu_db', 'memory_db', 'rows_fetched', 'rows_returned', 'db_commits', 'response_time']
exclude_from_calls = ['timestamp_begin', 'timestamp_end', 'total_calls'] + include_metrics
test_sample = []

Definindo o caminho para os arquivos CSV contendo os resultados dos testes

In [None]:
DATA_CSV_PATH_BASE = '/dados/data_versao_boa_1.csv'
DATA_CSV_PATH_TARGET = '/dados/data_versao_boa_2.csv'

Definindo a função que irá transformar os dados do arquivo CSV em arrays para utilizar as funções so scikit-learn.

In [None]:
def recupera_outliers(df, target='cpu'):
  outliers_indexes = []

  q1 = df[target].quantile(0.25)
  q3 = df[target].quantile(0.75)
  iqr = q3-q1
  maximum = q3 + (1.5 * iqr)
  minimum = q1 - (1.5 * iqr)

  outlier_samples = df[(df[target] < minimum) | (df[target] > maximum)]
  outliers_indexes.extend(outlier_samples.index.tolist())

  outliers_indexes = list(set(outliers_indexes))

  return outliers_indexes

def populate_arrays(data_csv):

    metrics = data_csv[include_metrics].values.tolist()
    calls = data_csv.drop(exclude_from_calls, axis=1).values.tolist()

    return [calls, metrics]

def handle_test(dados_csv):
    result = populate_arrays(dados_csv)
    requisicoes = result[0]
    metricas_conjuntas = result[1]

    florestas_aleatorias = []
    metricas= []

    for index in range(len(include_metrics)):
      metrica = []
      for linha in metricas_conjuntas:
        metrica.append(linha[index])

      metricas.append(metrica)
      floresta_aleatoria = RandomForestRegressor(random_state=0)
      floresta_aleatoria = floresta_aleatoria.fit(requisicoes, metrica)
      florestas_aleatorias.append(floresta_aleatoria)

    return [florestas_aleatorias, requisicoes, metricas]

In [None]:
def get_metrics(random_forest_base, calls_alvo):
  metrics_predict = []

  for index in range(len(include_metrics)):
    metrics_predict.append(random_forest_base[index].predict(calls_alvo))

  return metrics_predict

In [None]:
def calculate_mape(random_forest_base, metrics_alvo, calls_alvo):
  mapes = []

  metrics_predicted = get_metrics(random_forest_base, calls_alvo)

  for index in range(len(include_metrics)):
    mapes.append(mean_absolute_percentage_error(metrics_predicted[index], metrics_alvo[index]))

  return mapes

def print_mape(index, mapes):
  for metric in range(len(include_metrics)):
    print(f"[MAPE TESTE {index}] - {include_metrics[metric]}: {round(mapes[metric]*100,3)}%")
  print("")


In [None]:
def calculate_cliff_delta(metrics_base, metrics_alvo):
    threshold = {'pequena': 0.147, 'media': 0.33, 'grande': 0.474}

    size = len(metrics_base)
    sum = 0

    for index_base in range(len(metrics_base)):
      for index_alvo in range(len(metrics_base)):
        value = 0

        if metrics_base[index_base] < metrics_alvo[index_alvo]:
          value = -1

        elif metrics_base[index_base] > metrics_alvo[index_alvo]:
          value = 1

        sum += value

    delta = sum / (size*size)
    size = eval_cliff_delta(delta, threshold)
    return delta, size

def eval_cliff_delta(delta: float, threshold: dict) -> str:
    delta = abs(delta)
    if delta < threshold['pequena']:
        return 'trivial'
    if threshold['pequena'] <= delta < threshold['media']:
        return 'pequena'
    if threshold['media'] <= delta < threshold['grande']:
        return 'media'
    if delta >= threshold['grande']:
        return 'grande'

def print_cliff_delta(random_forest_base, metrics_target, calls_alvo):
  good = 'POSITIVA'
  bad = 'NEGATIVA'

  metrics_predicted = get_metrics(random_forest_base, calls_alvo)

  for index in range(len(include_metrics)):
    cliff = calculate_cliff_delta(metrics_predicted[index], metrics_target[index])
    result = cliff[0] > 0 and good or bad

    if index == 2:
      print(f"metric: {include_metrics[index].upper()} \t\t\t| cliff delta: {round(cliff[0],3)} \t\t| DIFERENÇA: {result} {cliff[1].upper()}\n")
    else:
      print(f"metric: {include_metrics[index].upper()} \t\t| cliff delta: {round(cliff[0],3)} \t\t| DIFERENÇA: {result} {cliff[1].upper()}\n")

In [None]:
def print_accuracy(rfs, metrics, calls, test_index):
  for metric in range(len(include_metrics)):
    accuracy = calculate_accuracy(rfs, metrics, calls, metric)
    print(f"Test {test_index} - R² {include_metrics[metric]}: {accuracy}")

  print("")

def r_quadrado(metrica_real, metrica_prevista):
  soma_real = 0
  soma_dif_predict = 0
  soma_dif_media = 0

  for indice in range(len(metrica_real)):
    soma_real += metrica_real[indice]
    aux = (metrica_real[indice] - metrica_prevista[indice])
    soma_dif_predict += (aux * aux)

  media = soma_real/len(metrica_real)

  for indice in range(len(metrica_real)):
    aux = (metrica_real[indice] - media)
    soma_dif_media += (aux * aux)

  return 1 - (soma_dif_predict/soma_dif_media)


def calculate_accuracy(rfs, metrics, calls, metric_index):

  metric_predict = rfs[metric_index].predict(calls)
  r2 = r_quadrado(metrics[metric_index], metric_predict)
  r2_ajustado = 1-(1-r2)*(len(metrics)-1)/(len(metrics)-len(include_metrics)-1)
  return r2

Leitura de dados e remoção de outliers

In [None]:
data_csv_target = pd.read_csv(DATA_CSV_PATH_TARGET)
data_csv_base = pd.read_csv(DATA_CSV_PATH_BASE).head(len(data_csv_target))

outliers_indexes = list(set([*recupera_outliers(data_csv_base, 'total_calls'), *recupera_outliers(data_csv_target, 'total_calls')]))

print(f'Indices dos outliers encontrados: {outliers_indexes}')

data_csv_base.drop(outliers_indexes, inplace=True)
data_csv_base.reset_index(drop=True, inplace=True)

data_csv_target.drop(outliers_indexes, inplace=True)
data_csv_target.reset_index(drop=True, inplace=True)

result_base = handle_test(data_csv_base)
rf_base = result_base[0]
calls_base = result_base[1]
metrics_base = result_base[2]

result_target = handle_test(data_csv_target)
rf_target = result_target[0]
calls_target = result_target[1]
metrics_target = result_target[2]

Calculo de R² ajustado

In [None]:
print_accuracy(rf_base, metrics_target, calls_target, "BASE")

Calculo MAPE comparando:

1.   versão base x versão base
2.   versão base x versão base normalizada



In [None]:
print_mape("BASE", calculate_mape(rf_base, metrics_base, calls_base))
print_mape("TARGET", calculate_mape(rf_base, metrics_target, calls_target))

Interpretação do Cliff Delta:

A métrica prevista pelo Random Forest representa como o sistema se comportaria recebendo aquela quantidade de chamada no ambiente de testes configurado no teste 1. Portanto, se o valor da métrica prevista for maior que o valor da métrica real (capturada no ambiente do teste 2), significa que houve uma melhora no sistema quando comparamos a versão 1 com a versão 2.

Valor **positivo** representa que o **primeiro** teste possui valores **maiores**

Valor **negativo** representa que o **segundo** teste possui valores **maiores**

In [None]:
print_cliff_delta(rf_base, metrics_target, calls_target)

Gráficos ilustrando processo de normalização de escala utilizando Floresta Aleatória.

In [None]:
traduz_metricas = {
    'cpu_app': 'Uso de CPU % - Aplicação',
    'memory_app': 'Uso de Memória RAM % - Aplicação',
    'cpu_db': 'Uso de CPU % - Banco de dados',
    'memory_db': 'Uso de Memória RAM % - Banco de dados',
    'rows_fetched': 'Linhas buscadas',
    'rows_returned': 'Linhas retornadas',
    'db_commits': 'Transações concluídas',
    'response_time': 'Tempo de resposta'
}

def plot_grafico(df_base, df_target, metric, ax, label_base, label_target):
  ax.plot(df_base, 'b-', label=label_base)
  ax.plot(df_target, 'r--', label=label_target)
  ax.legend(loc='lower right')
  ax.set_xlabel('Período de tempo (minuto)', fontsize = 14)
  ax.set_ylabel(f'{traduz_metricas[metric]}', fontsize = 14)

fig, ax = plt.subplots(len(include_metrics), 3, figsize=(15,35))

for index, metric in enumerate(include_metrics):
  metric_true_base = metrics_base[index]
  metric_true_target = metrics_target[index]
  metric_predict_target = rf_base[index].predict(calls_target)

  plot_grafico(metric_true_base, metric_true_target, metric, ax[index, 0], 'Teste base', 'Teste alvo')
  plot_grafico(metric_true_base, metric_predict_target, metric, ax[index, 1], 'Teste base', 'Teste base normalizado')
  plot_grafico(metric_predict_target, metric_true_target, metric, ax[index, 2], 'Teste base normalizado', 'Teste alvo')

plt.tight_layout()
plt.show()

