In [None]:
import kfp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath, create_component_from_func

In [None]:
def create_dataset(output_csv: OutputPath('csv')):
    from datetime import date, timedelta
    import requests
    import time

    today = date.today()
    end = f"{today}T17:00:00.00Z"
    
    days = 6
    last_day = date.today() - timedelta(days=days)
    start = f"{last_day}T17:00:00.00Z"
    
    total_records = 60 * 24 * days + 1
    
    errors = float('inf')
    while errors > 0:
        errors = 0
        
        # Sleep for 1 minute to avoid any HTTP requests limit error
        time.sleep(60)

        BASE_URL = "http://0.0.0.0:0000/api/v1/query_range"
        step = "1m"

        content = {
            "query": "sum(kube_pod_status_scheduled{condition=\"false\"})",
            "start": start,
            "end": end,
            "step": step,
        }

        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        try:
            response = requests.post(BASE_URL, content, headers).json()
            scheduling_failed_pods = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(apiserver_request_total[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            api_server_request_rate = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "avg(apiserver_request_duration_seconds_sum / apiserver_request_duration_seconds_count)"
            response = requests.post(BASE_URL, content, headers).json()
            api_server_request_latency = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(workqueue_depth)"
            response = requests.post(BASE_URL, content, headers).json()
            workqueue_depth = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(histogram_quantile(0.99, sum(scheduler_e2e_scheduling_duration_seconds_bucket) by (le, instance)))"
            response = requests.post(BASE_URL, content, headers).json()
            scheduler_e2e_scheduling_latency = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(scheduler_preemption_attempts_total[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            scheduler_preemption_attempts_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(nginx_ingress_controller_nginx_process_connections{state=\"active\"})"
            response = requests.post(BASE_URL, content, headers).json()
            active_client_connections = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(nginx_ingress_controller_nginx_process_connections{state=\"reading\"})"
            response = requests.post(BASE_URL, content, headers).json()
            reading_client_connections = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(nginx_ingress_controller_nginx_process_connections{state=\"waiting\"})"
            response = requests.post(BASE_URL, content, headers).json()
            waiting_client_connections = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(nginx_ingress_controller_nginx_process_connections{state=\"writing\"})"
            response = requests.post(BASE_URL, content, headers).json()
            writing_client_connections = response["data"]["result"][0]["values"]
        except:
            errors += 1


        namespace = "cattle-neuvector-system"

        try:
            content["query"] = "sum(rate(container_cpu_cfs_throttled_seconds_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_cpu_cfs_throttled_seconds_total = response["data"]["result"][0]["values"]
        except:
            container_cpu_cfs_throttled_seconds_total = [[0, 0]] * total_records

        try:
            content["query"] = "sum(rate(container_cpu_usage_seconds_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_cpu_usage_seconds_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_cpu_system_seconds_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_cpu_system_seconds_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_cpu_user_seconds_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_cpu_user_seconds_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(container_memory_working_set_bytes{namespace=\"" + namespace + "\"})"
            response = requests.post(BASE_URL, content, headers).json()
            container_memory_working_set_bytes = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_receive_packets_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_receive_packets_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_receive_packets_dropped_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_receive_packets_dropped_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_receive_errors_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_receive_errors_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_transmit_packets_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_transmit_packets_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_transmit_packets_dropped_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_transmit_packets_dropped_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_transmit_errors_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_transmit_errors_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_receive_bytes_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_receive_bytes_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_network_transmit_bytes_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_network_transmit_bytes_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_fs_reads_bytes_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_fs_reads_bytes_total = response["data"]["result"][0]["values"]
        except:
            errors += 1

        try:
            content["query"] = "sum(rate(container_fs_writes_bytes_total{namespace=\"" + namespace + "\"}[5m]))"
            response = requests.post(BASE_URL, content, headers).json()
            container_fs_writes_bytes_total = response["data"]["result"][0]["values"]
        except:
            errors += 1
        
    
    import pandas as pd

    timestamps = pd.period_range(start=f"{last_day} 12:00", end=f"{today} 12:00", freq="T")
    data = []
    for i in range(total_records):
        data.append({
            "timestamp": str(timestamps[i]),
            "api_server_request_latency": float(api_server_request_latency[i][1]),
            "api_server_request_rate": float(api_server_request_rate[i][1]),
            "scheduling_failed_pods": float(scheduling_failed_pods[i][1]),
            "workqueue_depth": float(workqueue_depth[i][1]),
            "scheduler_e2e_scheduling_latency": float(scheduler_e2e_scheduling_latency[i][1]),
            "scheduler_preemption_attempts_total": float(scheduler_preemption_attempts_total[i][1]),
            "active_client_connections": float(active_client_connections[i][1]),
            "reading_client_connections": float(reading_client_connections[i][1]),
            "waiting_client_connections": float(waiting_client_connections[i][1]),
            "writing_client_connections": float(writing_client_connections[i][1]),
            "container_cpu_cfs_throttled_seconds_total": float(container_cpu_cfs_throttled_seconds_total[i][1]),
            "container_cpu_usage_seconds_total": float(container_cpu_usage_seconds_total[i][1]),
            "container_cpu_system_seconds_total": float(container_cpu_system_seconds_total[i][1]),
            "container_cpu_user_seconds_total": float(container_cpu_user_seconds_total[i][1]),
            "container_memory_working_set_bytes": float(container_memory_working_set_bytes[i][1]),
            "container_network_receive_packets_total": float(container_network_receive_packets_total[i][1]),
            "container_network_receive_packets_dropped_total": float(container_network_receive_packets_dropped_total[i][1]), 
            "container_network_receive_errors_total": float(container_network_receive_errors_total[i][1]),
            "container_network_transmit_packets_total": float(container_network_transmit_packets_total[i][1]),
            "container_network_transmit_packets_dropped_total": float(container_network_transmit_packets_dropped_total[i][1]),
            "container_network_transmit_errors_total": float(container_network_transmit_errors_total[i][1]),
            "container_network_receive_bytes_total": float(container_network_receive_bytes_total[i][1]),
            "container_network_transmit_bytes_total": float(container_network_transmit_bytes_total[i][1]),
            "container_fs_reads_bytes_total": float(container_fs_reads_bytes_total[i][1]),
            "container_fs_writes_bytes_total": float(container_fs_writes_bytes_total[i][1]),
        })

    dataset = pd.DataFrame(data)
    
    dataset.to_csv(output_csv, index=False)
    
    import boto3
    import uuid
    from io import StringIO
    
    csv_buffer = StringIO()
    dataset.to_csv(csv_buffer, index=False)
    
    s3 = boto3.resource('s3',
        aws_access_key_id='----',
        aws_secret_access_key='----',
        region_name='----',
    )
    
    identifier = str(uuid.uuid4())
    key = f"kubeflow/datasets/{identifier}.csv"
    
    s3.Bucket('----').put_object(Key=key, Body=csv_buffer.getvalue())

In [None]:
def preprocess_dataset(input_dataset: InputPath('csv'), output_transformed_data: OutputPath('sav')):
    import numpy as np
    import sklearn
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import MinMaxScaler, StandardScaler
    
    import pandas as pd
    
    dataset = pd.read_csv(input_dataset)
    
    total_records = 60 * 24 * 6 + 1
    
    X = dataset.drop(["timestamp"], axis=1)
    y = np.array([1] * total_records)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

    min_max_scaler = MinMaxScaler()
    X_train_mm = min_max_scaler.fit_transform(X_train)
    X_test_mm = min_max_scaler.transform(X_test)

    std_scaler = StandardScaler()
    X_train_ss = std_scaler.fit_transform(X_train)
    X_test_ss = std_scaler.transform(X_test)
    
    import joblib
    import tempfile
    import boto3
    import uuid
    
    s3 = boto3.resource('s3',
        aws_access_key_id='----',
        aws_secret_access_key='----',
        region_name='----',
    )
    
    min_max_conf = X_train.apply(lambda x: pd.Series([x.min(), x.max()])).T.values.tolist()
    with tempfile.TemporaryFile() as file:
        joblib.dump(min_max_conf, file)
        
        file.seek(0)
        
        identifier = str(uuid.uuid4())
        min_max_key = f"kubeflow/preprocessing/min_max_scaler/{identifier}.sav"
    
        s3.Bucket('----').put_object(Key=min_max_key, Body=file.read())
    
    std_scaler_conf = X_train.apply(lambda x: pd.Series([x.mean(), x.std()])).T.values.tolist()
    with tempfile.TemporaryFile() as file:
        joblib.dump(std_scaler_conf, file)
        
        file.seek(0)
        
        identifier = str(uuid.uuid4())
        std_scaler_key = f"kubeflow/preprocessing/standard_scaler/{identifier}.sav"
        
        s3.Bucket('----').put_object(Key=std_scaler_key, Body=file.read()) 
    
    current_run_preprocessing = {
        "min_max_scaler_path": min_max_key,
        "std_scaler_path": std_scaler_key,
    }
    with tempfile.TemporaryFile() as file:
        joblib.dump(current_run_preprocessing, file)
        
        file.seek(0)
        
        key = "kubeflow/current_run_preprocessing.sav"
        
        s3.Bucket('----').put_object(Key=key, Body=file.read()) 

    output_data = {
        "X": X,
        "y": y,
        "X_train": X_train,
        "y_train": y_train,
        "X_test": X_test,
        "y_test": y_test,
        "X_train_mm": X_train_mm,
        "X_test_mm": X_test_mm,
        "X_train_ss": X_train_ss,
        "X_test_ss": X_test_ss,
    }
    joblib.dump(output_data, output_transformed_data)

In [None]:
def train_ml_models(input_dataset: InputPath('sav')) -> dict:
    
    import sklearn
    from sklearn.svm import OneClassSVM
    from sklearn.ensemble import IsolationForest
    from sklearn.metrics import accuracy_score
    
    import joblib
    
    transformed_data = joblib.load(input_dataset)
    
    osvm = OneClassSVM(kernel="rbf", nu=0.05, gamma="auto").fit(transformed_data["X_train_mm"])
    y_pred = osvm.predict(transformed_data["X_test_mm"])
    osvm_acc = accuracy_score(y_true=transformed_data["y_test"], y_pred=y_pred)
    
    isolation_forest = IsolationForest(n_estimators=50, max_samples=256, contamination=0.01).fit(transformed_data["X_train_ss"])
    y_pred = isolation_forest.predict(transformed_data["X_test_ss"])
    isolation_forest_acc = accuracy_score(y_true=transformed_data["y_test"], y_pred=y_pred)
    
    import boto3
    import tempfile
    import uuid
    
    s3 = boto3.resource('s3',
        aws_access_key_id='----',
        aws_secret_access_key='----',
        region_name='----',
    )
    
    with tempfile.TemporaryFile() as file:
        joblib.dump(osvm, file)
        
        file.seek(0)
        
        identifier = str(uuid.uuid4())
        osvm_key = f"kubeflow/ai_models/osvm/{identifier}.sav"
    
        s3.Bucket('----').put_object(Key=osvm_key, Body=file.read())
        
    with tempfile.TemporaryFile() as file:
        joblib.dump(isolation_forest, file)
        
        file.seek(0)
        
        identifier = str(uuid.uuid4())
        isolation_forest_key = f"kubeflow/ai_models/isolation_forest/{identifier}.sav"
        
        s3.Bucket('----').put_object(Key=isolation_forest_key, Body=file.read())
        
    
    current_run_ml_models = {
        "osvm_path": osvm_key,
        "isolation_forest_path": isolation_forest_key,
    }
    with tempfile.TemporaryFile() as file:
        joblib.dump(current_run_ml_models, file)
        
        file.seek(0)
        
        key = "kubeflow/current_run_ml_models.sav"
        
        s3.Bucket('----').put_object(Key=key, Body=file.read())
    
    
    return {
        "osvm_acc": osvm_acc,
        "isolation_forest_acc": isolation_forest_acc,
    }

In [None]:
def train_dl_model(input_dataset: InputPath('sav')) -> dict:
    import numpy as np
    
    import sklearn
    from sklearn.metrics import accuracy_score
    
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.autograd import Variable
    from torch.utils.data import TensorDataset
    
    import joblib
    
    transformed_data = joblib.load(input_dataset)
    
    input_dim = transformed_data["X_train"].shape[1]
    num_epochs = 10
    batch_size = 256
    
    class Autoencoder(nn.Module):
        def __init__(self):
            super(Autoencoder, self).__init__()
            
            # Encoder
            self.enc_layer1 = nn.Linear(input_dim, 12)
            self.enc_layer2 = nn.Linear(12, 9)
            self.enc_layer3 = nn.Linear(9, 6)

            # Decoder
            self.dec_layer1 = nn.Linear(6, 9)
            self.dec_layer2 = nn.Linear(9, 12)
            self.dec_layer3 = nn.Linear(12, input_dim)


        def forward(self, x):
            x = F.relu(self.enc_layer1(x))
            x = F.relu(self.enc_layer2(x))
            x = F.relu(self.enc_layer3(x))
            x = F.relu(self.dec_layer1(x))
            x = F.relu(self.dec_layer2(x))
            x = F.relu(self.dec_layer3(x))

            return x
        
    X_train_mm_torch = torch.from_numpy(transformed_data["X_train_mm"]).type(torch.FloatTensor)
    y_train_torch = torch.from_numpy(transformed_data["y_train"])

    X_test_mm_torch = torch.from_numpy(transformed_data["X_test_mm"]).type(torch.FloatTensor)
    y_test_torch = torch.from_numpy(transformed_data["y_test"])

    train_mm = TensorDataset(X_train_mm_torch, y_train_torch)
    test_mm = TensorDataset(X_test_mm_torch, y_test_torch)

    train_mm_dataloader = torch.utils.data.DataLoader(train_mm, batch_size=batch_size, shuffle=True, num_workers=3)
    test_mm_dataloader = torch.utils.data.DataLoader(test_mm, batch_size=batch_size, shuffle=True, num_workers=3)
    
    ae = Autoencoder()
    
    loss_func = nn.MSELoss()
    optimizer = torch.optim.Adam(ae.parameters(), lr=1e-3)
    for epoch in range(num_epochs):
        for batch_idx, (data, target) in enumerate(train_mm_dataloader):
            data = torch.autograd.Variable(data)

            optimizer.zero_grad()

            pred = ae(data)
            loss = loss_func(pred, data)
            loss.backward()
            optimizer.step()

        current_loss = loss.item()
    
    ae.eval()
    predictions = []
    for batch_idx, (data,target) in enumerate(train_mm_dataloader):
        data = torch.autograd.Variable(data)
        pred = ae(data)
        for prediction in pred:
            predictions.append(prediction.detach().numpy())
    
    mse = np.mean(np.power(transformed_data["X_train_mm"] - predictions, 2), axis=1)
    ae_threshold = np.mean(mse) + np.std(mse)
    
    predictions = []
    for batch_idx, (data,target) in enumerate(test_mm_dataloader):
        data = torch.autograd.Variable(data)
        pred = ae(data)
        for prediction in pred:
            predictions.append(prediction.detach().numpy())
            
    mse = np.mean(np.power(transformed_data["X_test_mm"] - predictions, 2), axis=1)
    avg_mse = np.mean(mse)
    
    import boto3
    import uuid
    import tempfile
    from io import BytesIO
    
    s3 = boto3.resource('s3',
        aws_access_key_id='----',
        aws_secret_access_key='----',
        region_name='----',
    )
    
    ae_buffer = BytesIO()
    torch.save(ae.state_dict(), ae_buffer)
    
    identifier = str(uuid.uuid4())
    ae_key = f"kubeflow/ai_models/autoencoder/{identifier}.pt"
    
    s3.Bucket('----').put_object(Key=ae_key, Body=ae_buffer.getvalue())
    
    
    current_run_dl_model = {
        "ae_path": ae_key,
        "ae_threshold": ae_threshold,
    }
    with tempfile.TemporaryFile() as file:
        joblib.dump(current_run_dl_model, file)
        
        file.seek(0)
        
        key = "kubeflow/current_run_dl_model.sav"
        
        s3.Bucket('----').put_object(Key=key, Body=file.read())
    
    
    return {
        "current_loss": current_loss,
        "avg_mse": avg_mse,
    }

In [None]:
def report_training_results(ml_output: dict, dl_output: dict):
    import requests
    
    bot_token = "----"
    bot_chat_id = "----"
    
    message = "*Training pipeline finished*\n*Results*\n- OSVM accuracy: {:.3f}\n- Isolation Forest accuracy: {:.3f}\n- Autoencoder loss: {:.4f}".format(ml_output["osvm_acc"], ml_output["isolation_forest_acc"], dl_output["current_loss"])

    send_text = f"https://api.telegram.org/bot{bot_token}/sendMessage?parse_mode=Markdown&chat_id={bot_chat_id}&text={message}"
    
    try:
        requests.get(send_text)
    except:
        print("Error when trying to send the Telegram message")

In [None]:
create_dataset_op = create_component_from_func(
    func=create_dataset,
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu@sha256:b6f7894a4118a5bf51ebf7ae83a444b32e34828c40d6a780d6ff2d3c34818ffd",
    packages_to_install=["boto3"],
)

preprocess_dataset_op = create_component_from_func(
    func=preprocess_dataset,
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu@sha256:b6f7894a4118a5bf51ebf7ae83a444b32e34828c40d6a780d6ff2d3c34818ffd",
    packages_to_install=["boto3"],
)

train_ml_models_op = create_component_from_func(
    func=train_ml_models,
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu@sha256:b6f7894a4118a5bf51ebf7ae83a444b32e34828c40d6a780d6ff2d3c34818ffd",
    packages_to_install=["boto3"],
)

train_dl_model_op = create_component_from_func(
    func=train_dl_model,
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu@sha256:b6f7894a4118a5bf51ebf7ae83a444b32e34828c40d6a780d6ff2d3c34818ffd",
    packages_to_install=["boto3"],
)

report_training_results_op = create_component_from_func(
    func=report_training_results,
    base_image="gcr.io/deeplearning-platform-release/pytorch-gpu@sha256:b6f7894a4118a5bf51ebf7ae83a444b32e34828c40d6a780d6ff2d3c34818ffd"
)

In [None]:
@dsl.pipeline(
    name="Training pipeline for Aligo Smart Detective",
    description="AI Monitor for Aligo\s DevSecOps Infrastructure",
)
def training_pipeline():
    # First step
    dataset = create_dataset_op()
    
    # Second step
    transformed_dataset = preprocess_dataset_op(dataset.outputs["output_csv"])
    
    # Third step (in parallel)
    ml_res = train_ml_models_op(transformed_dataset.outputs["output_transformed_data"])
    dl_res = train_dl_model_op(transformed_dataset.outputs["output_transformed_data"])
    
    # Fourth step
    report_training_results_op(ml_res.output, dl_res.output)

In [None]:
kfp.compiler.Compiler().compile(training_pipeline, "training_pipeline.zip")