In [None]:
#@title Please input your project id
import pandas as pd
import numpy as np
import altair as alt
from google.cloud import bigquery
# Provide credentials to the runtime
from google.colab import auth
from google.cloud.bigquery import magics

auth.authenticate_user()
print('Authenticated')
project_id = 'kafka-lb' #@param {type: "string"}
# Set the default project id for %bigquery magic
magics.context.project = project_id

# Use the client to run queries constructed from a more complicated function.
client = bigquery.Client(project=project_id)

In [None]:
#@title Select a cell and plot its per-machine resource utilization CDFs

# Functions to plot CDFs using Altair
def pick_quantiles_from_tall_dataframe(data, qcol, name=""):
  quantiles = pd.DataFrame([x for x in data[qcol]]).transpose()
  if name != "":
    quantiles.columns = data[name]
  return quantiles

# - data: a dataframe with one row and one or more columns of quantiles (results
#   returned from APPROX_QUANTILES)
# - qcols: a list of names of the quantiles
# - names: the names of each returned quantiles' columns.
def pick_quantiles_from_wide_dataframe(data, qcols, names=[]):
  quantiles = {}
  i = 0
  for qcol in qcols:
    col_name = qcol
    if i < len(names):
      col_name = names[i]
    quantiles[col_name] = data[qcol][0]
    i+=1
  return pd.DataFrame(quantiles)

# - quantiles: a dataframe where each column contains the quantiles of one
#   data set. The index (i.e. row names) of the dataframe is the quantile. The
#   column names are the names of the data set.
def plot_cdfs(quantiles, xlab="Value", ylab="CDF",
              legend_title="dataset", labels=[],
              interactive=False,
              title=''):
  dfs = []
  label = legend_title
  yval = range(quantiles.shape[0])
  esp = 1.0/(len(quantiles)-1)
  yval = [y * esp for y in yval]
  while label == xlab or label == ylab:
    label += '_'
  for col_idx, col in enumerate(quantiles.columns):
    col_label = col
    if col_idx < len(labels):
      col_label = labels[col_idx]
    dfs.append(pd.DataFrame({
        label: col_label,
        xlab: quantiles[col],
        ylab: yval
    }))
  cdfs = pd.concat(dfs)
  lines = alt.Chart(cdfs).mark_line().encode(
    # If you can draw a CDF, it has to be continuous real-valued
    x=xlab+":Q",
    y=ylab+":Q",
    color=label+":N"
  ).properties(
    title=title
  )
  if not interactive:
    return lines
  # Create a selection that chooses the nearest point & selects based on x-value
  nearest = alt.selection(type='single', nearest=True, on='mouseover',
                        fields=[ylab], empty='none')
  # Transparent selectors across the chart. This is what tells us
  # the y-value of the cursor
  selectors = alt.Chart(cdfs).mark_point().encode(
    y=ylab+":Q",
    opacity=alt.value(0),
  ).properties(
    selection=nearest
  )

  # Draw text labels near the points, and highlight based on selection
  text = lines.mark_text(align='left', dx=5, dy=-5).encode(
    text=alt.condition(nearest,
                       alt.Text(xlab+":Q", format=".2f"),
                       alt.value(' '))
  )

  # Draw a rule at the location of the selection
  rules = alt.Chart(cdfs).mark_rule(color='gray').encode(
    y=ylab+":Q",
  ).transform_filter(
    nearest.ref()
  )
  # Draw points on the line, and highlight based on selection
  points = lines.mark_point().encode(
    opacity=alt.condition(nearest, alt.value(1), alt.value(0))
  )
  # Put the five layers into a chart and bind the data
  return alt.layer(lines, selectors, rules, text, points).interactive(
      bind_y=False)

# Functions to create the query

def query_machine_capacity(cell):
  return '''
SELECT machine_id, MAX(capacity.cpus) AS cpu_cap,
  MAX(capacity.memory) AS memory_cap
FROM `google.com:google-cluster-data`.clusterdata_2019_{cell}.machine_events
GROUP BY 1
  '''.format(cell=cell)

def query_top_level_instance_usage(cell):
  return '''
SELECT CAST(FLOOR(start_time/(1e6 * 300)) * (1000000 * 300) AS INT64) AS time,
  collection_id,
  instance_index,
  machine_id,
  average_usage.cpus AS cpu_usage,
  average_usage.memory AS memory_usage,
  start_time,
  end_time
FROM `google.com:google-cluster-data`.clusterdata_2019_{cell}.instance_usage
WHERE (alloc_collection_id IS NULL OR alloc_collection_id = 0)
  AND (end_time - start_time) >= (5 * 60 * 1e6)
  '''.format(cell=cell)

def query_machine_usage(cell):
  return '''
SELECT u.time AS time,
  u.machine_id AS machine_id,
  SUM(u.cpu_usage) AS cpu_usage,
  SUM(u.memory_usage) AS memory_usage,
  MAX(m.cpu_cap) AS cpu_capacity,
  MAX(m.memory_cap) AS memory_capacity,
  u.start_time AS start_time,
  u.end_time AS end_time
FROM ({instance_usage}) AS u JOIN
 ({machine_capacity}) AS m
ON u.machine_id = m.machine_id
GROUP BY 1, 2
  '''.format(instance_usage = query_top_level_instance_usage(cell),
             machine_capacity = query_machine_capacity(cell))

def query_machine_utilization_distribution(cell):
  return '''
SELECT APPROX_QUANTILES(IF(cpu_usage > cpu_capacity, 1.0, cpu_usage / cpu_capacity), 100) AS cpu_util_dist,
  APPROX_QUANTILES(IF(memory_usage > memory_capacity, 1.0, memory_usage / memory_capacity), 100) AS memory_util_dist
FROM ({table})
  '''.format(table = query_machine_usage(cell))

# cell = 'd' #@param ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
# query = query_machine_utilization_distribution(cell)
# machine_util_dist = client.query(query).to_dataframe()
# plot_cdfs(pick_quantiles_from_wide_dataframe(machine_util_dist, ['cpu_util_dist', 'memory_util_dist'], ['CPU', 'Memory']), xlab='x - resource utilization (%)', ylab="Probability (resource utilization < x)", interactive=True)

In [None]:
def query_top_level_instance_usage(cell):
  return '''
SELECT CAST(FLOOR(start_time/(1e6 * 300)) * (1000000 * 300) AS INT64) AS time,
  collection_id,
  instance_index,
  machine_id,
  average_usage.cpus AS cpu_usage,
  average_usage.memory AS memory_usage,
  start_time,
  end_time
FROM `google.com:google-cluster-data`.clusterdata_2019_{cell}.instance_usage

  '''.format(cell=cell)

cell = 'd' #@param ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
query = query_top_level_instance_usage(cell)
instance_usage_df = client.query(query).to_dataframe()

In [None]:
import json
from pandas import to_datetime
import joblib


df = instance_usage_df.copy()
# Helper function to calculate weighted usage
def weighted_usage_cpus(row, resource):
    # print("row", row['average_usage'][resource])
    avg = row['average_usage'][resource]
    max = row['maximum_usage'][resource]
    random = row['random_sample_usage'][resource]
    return (avg + max + 2*random) / 4

def weighted_usage_memory(row, resource):
    # print("row", row['average_usage'][resource])
    avg = row['average_usage'][resource]
    max = row['maximum_usage'][resource]
    return (avg + max) / 2

# Extract CPU and memory usage as weighted averages
df['cpu_usage'] = df.apply(lambda row: weighted_usage_cpus(row, 'cpus'), axis=1)
df['memory_usage'] = df.apply(lambda row: weighted_usage_memory(row, 'memory'), axis=1)

# Calculate duration in hours
df['start_time'] = to_datetime(df['start_time'])
df['end_time'] = to_datetime(df['end_time'])
df['time_duration'] = (df['end_time'] - df['start_time']).dt.total_seconds()

# Directly use 'priority', 'type', 'memory_accesses_per_instruction', and 'cycles_per_instruction' as features


def classify_load_state(row):
    if row['cpu_usage'] < 0.25 and row['memory_usage'] < 0.25:
        return 'under-loaded'
    elif row['cpu_usage'] <= 0.5 and row['memory_usage'] <= 0.5:
        return 'optimally loaded'
    else:
        return 'over-loaded'

df['load_state'] = df.apply(classify_load_state, axis=1)

def save_model(model, save_path):
    try:
        joblib.dump(model, save_path)
        print(f"Model saved at {save_path}")
    except Exception as e:
        print(e)

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Selecting features and target variable
# print(df.columns)

feature_names = ['cpu_usage', 'memory_usage'] #, 'time_duration', 'memory_accesses_per_instruction'] #, 'cycles_per_instruction']
class_names = ['under-loaded', 'optimally loaded', 'over-loaded']
X = df[feature_names]
y = df['load_state']

# Splitting the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Training the RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Predicting and evaluating the model
predictions = model.predict(X_test)
print(f'Accuracy: {accuracy_score(y_test, predictions)}')

#Saving the model
save_model(model, save_path = f'{datadir}trained_model.pkl')

Accuracy: 0.99995
Model saved at /content/drive/My Drive/ADM/Kafka-LB-ML/trained_model.pkl
