### Git + DVC clone

In [8]:
from kfp.dsl import Output, Dataset, Model, Artifact
from kfp import dsl

@dsl.component(base_image='python:3.9.16',packages_to_install=["subprocess.run","dvc[s3]"])
def git_dvc_clone(
    repo: Output[Artifact],
    dataset: Output[Dataset]
):
    import subprocess, os, shutil

    gl_at = os.environ['GITLAB_ACCESS_TOKEN']
    repo_name = "ds-seminar-mlops-demo"

    subprocess.call(["git", "clone", "https://<Git User>:" + gl_at + "@gitlab.com/<Git User>/" + repo_name + ".git"])
    subprocess.call(["git", "pull"], cwd="/" + repo_name)
    shutil.copytree("/" + repo_name, repo.path)

    subprocess.call(["dvc", "pull"], cwd="/" + repo_name)

    os.rename("/" + repo_name + "/data.csv", dataset.path)

### Data Preprocessing

In [9]:
from kfp.dsl import Input, Output, Dataset
from kfp import dsl

@dsl.component(base_image='python:3.9.16',packages_to_install=["pandas", "scikit-learn"])
def data_preprocessing(
    dataset: Input[Dataset],
    processed_dataset: Output[Dataset]
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler

    ## Data Storage
    # Einlesen Datensatz "auto_mpg" aus aktuellem Ordner 
    df = pd.read_csv(dataset.path)


    ## Data Cleaning
    # Entfernung der nominaler Merkmale für die Regression 
    df = df.drop(columns=['name'])

    # Umwandlung in numerische Datentypen, löschen inkompatibler Daten
    df = df.apply(pd.to_numeric, errors='coerce').dropna()

    # Entfernen von Ausreißerdaten außerhalb von Mittelwert +/- 3 Standardabweichung
    for col in df.columns:
        df = df.drop(df[(df[col] < (df[col].mean() - 2.5*df[col].std())) 
                    | (df[col] > (df[col].mean() + 2.5*df[col].std()))].index)

    # Standardisierung der Variablen in das Vielfache ihrer Standardabweichung 
    #df = pd.DataFrame(StandardScaler().fit_transform(df), index=df.index, columns=df.columns)

    df.reset_index(drop=True).to_pickle(processed_dataset.path)


### Train

In [10]:
# Benötigte Bibliotheken des Kubeflow-Frameworks
from kfp.dsl import Input, Output, Dataset, Model
from kfp import dsl

@dsl.component(
    base_image='python:3.9.16',
    packages_to_install=["pandas", "scikit-learn"])
def train(
    processed_dataset: Input[Dataset],
    trained_model: Output[Model]
):
    # Benötigte Bibliotheken innerhab der Komponente
    from sklearn.preprocessing import PolynomialFeatures
    from sklearn.linear_model import LinearRegression
    import pandas as pd
    import pickle

    # Ausgabe der vorherigen Komponente
    df = pd.read_pickle(processed_dataset.path) 

    X = df[["wght", "year"]] # unabhängige Variablen
    y_true = df["mpg"] # abhängige Variable
    X_poly = PolynomialFeatures(degree=2).fit_transform(X)

    poly = LinearRegression()
    poly.fit(X_poly, y_true)

    # Speichern für nachfolgende Komponenten
    with open(trained_model.path,'wb') as f: 
        pickle.dump(poly,f)

### Metrics

In [11]:
from kfp.dsl import Input, Output, Dataset, Model, Artifact
from kfp import dsl

@dsl.component(base_image='python:3.9.16',packages_to_install=["pandas", "scikit-learn", "matplotlib"])
def metrics(
    trained_model: Input[Model],
    processed_dataset: Input[Dataset],
    metrics: Output[Artifact]
):
    from sklearn.metrics import PredictionErrorDisplay
    from sklearn.preprocessing import PolynomialFeatures
    import matplotlib.pyplot as plt
    import pandas as pd
    import pickle    
    
    with open(trained_model.path, 'rb') as f:
        poly = pickle.load(f)

    processed_dataset_df = pd.read_pickle(processed_dataset.path)
    X = processed_dataset_df[["wght", "year"]] # Ergebnis Feature Engineering: unabhängige Variablen
    y_true = processed_dataset_df["mpg"] # abhängige Variable "Verbrauch"  
    X_poly = PolynomialFeatures(degree=2).fit_transform(X)

    y_pred = poly.predict(X_poly)

    # Ausgabe Bestimmtheitsmaß R^2
    r_2 = poly.score(X_poly, y_true)

    # Visueller Vergleich der wahren mit den vorhergesagten Werten 
    fig, axs = plt.subplots(ncols=2, figsize=(6, 4))
    PredictionErrorDisplay.from_predictions(
        y_true=y_true,
        y_pred=y_pred,
        kind="actual_vs_predicted",
        subsample=100,
        ax=axs[0],
        random_state=0,
    )
    axs[0].yaxis.set_ticklabels([])
    axs[0].xaxis.set_ticklabels([])
    PredictionErrorDisplay.from_predictions(
        y_true=y_true,
        y_pred=y_pred,
        kind="residual_vs_predicted",
        subsample=100,
        ax=axs[1],
        random_state=0,
    )
    axs[1].xaxis.set_ticklabels([])
    axs[1].set_ylabel("Actual - Predicted ", labelpad=-3)
    fig.suptitle("R^2 score: " + str(r_2) , y=0.75)
    plt.tight_layout(pad=4)
    plt.savefig(metrics.path, format='png')    

### Git + DVC push model and metrics

In [12]:
from kfp.dsl import Input, Output, Dataset, Model, Artifact
from kfp import dsl, kubernetes

@dsl.component(base_image='python:3.9.16',packages_to_install=["subprocess.run","dvc[s3]"])
def git_dvc_push(
    trigger: str,
    timestamp: str,
    trained_model: Model,
    metrics: Artifact,
    repo: Artifact
) -> str:
    import subprocess, os, shutil
    import datetime

    shutil.copy2(trained_model.path, os.path.join(repo.path, "model.pkl"))
    shutil.copy2(metrics.path, os.path.join(repo.path, "metrics.png"))

    subprocess.call(["dvc", "add", "model.pkl"], cwd=repo.path)
    subprocess.call(["dvc", "push", "model.pkl"], cwd=repo.path)

    subprocess.call(["git", "config", "--global", "user.email", '"' + os.environ['GITHUB_USER_EMAIL'] + '"'] , cwd=repo.path)
    subprocess.call(["git", "config", "--global", "user.name",  '"' + os.environ['GITHUB_USER_NAME'] + '"'] , cwd=repo.path)
   
    subprocess.call(["git", "add", "model.pkl.dvc"] , cwd=repo.path)
    subprocess.call(["git", "add", "metrics.png"] , cwd=repo.path)
    subprocess.call(["git", "commit", "-m", "[AUTO] Kubeflow Pipeline Execution triggered by [" + trigger + "] at [" + timestamp + "] "] , cwd=repo.path)

    timestamp_as_tag = datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S%z").strftime('%Y%m%d%H%M%S')
    subprocess.call(["git", "tag", "-a", timestamp_as_tag, "-m", trigger], cwd=repo.path)

    gl_at = os.environ['GITLAB_ACCESS_TOKEN']
    subprocess.call(["git", "push", "https://<Git User>:" + gl_at + "@gitlab.com/<Git User>/ds-seminar-mlops-demo.git", "--follow-tags", "--force"], cwd=repo.path) # TODO Change to create Tag
    return "True"


### Deployment

In [13]:
@dsl.component(base_image='python:3.9.16',packages_to_install=["kubernetes","kserve==0.11.1"])
def deploy(
    # trained_model: Input[Model],
):    
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    from kubernetes import client
    from kubernetes.client.rest import ApiException 
    from kubernetes.client.models import V1EnvVar
    from kubernetes.client import V1ResourceRequirements
    from time import sleep

    import shutil
    import pathlib
    import os
    
    namespace = "kserve-deploy-test" # utils.get_default_target_namespace()  
    name='sklearn-mpg'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    KServe = KServeClient()

    try:
        KServe.get(name, namespace=namespace, watch=True, timeout_seconds=10)
        KServe.delete(name, namespace)
    except RuntimeError:
        pass # model does not exist yet
    
    found = False
    for _ in range(10):
        try:
            KServe.get(name, namespace=namespace, timeout_seconds=20)
            sleep(10)
        except RuntimeError:
            found = True
            break # model does not exist yet ... 
        
    if found == False:
        raise Exception("Could not delete model")

    # TODO: remove??
    aws_env_vars = [V1EnvVar(name="AWS_ACCESS_KEY_ID", value=os.environ['AWS_ACCESS_KEY_ID']),
                    V1EnvVar(name="AWS_SECRET_ACCESS_KEY", value=os.environ['AWS_SECRET_ACCESS_KEY'])]

    isvc = V1beta1InferenceService(api_version=api_version,
                                kind=constants.KSERVE_KIND,
                                metadata=client.V1ObjectMeta(
                                    name=name, 
                                    namespace=namespace, 
                                    annotations={
                                        'sidecar.istio.io/inject':'false',
                                        'serving.kserve.io/enable-prometheus-scraping': "true"
                                        }),
                                spec=V1beta1InferenceServiceSpec(
                                predictor=V1beta1PredictorSpec(
                                    service_account_name="sa-aws-kserve",
                                    sklearn=(
                                        V1beta1SKLearnSpec(
                                            storage_uri="s3://ds-seminar-model/model.pkl",
                                            resources=V1ResourceRequirements(
                                                requests={'cpu': '200m', 'memory': '500Mi'},
                                                limits={'cpu': '200m', 'memory': '500Mi'}
                                                ),
                                            env=aws_env_vars
                                            )
                                        )
                                    )
                                )
                            )
    KServe.create(isvc)

### Compile Pipeline

In [14]:
from kfp import dsl, kubernetes, compiler
from kfp.dsl import PipelineTask

@dsl.pipeline
def pipeline(trigger: str = "", timestamp: str = ""):
    
    git_dvc_clone_task = git_dvc_clone()
    git_dvc_clone_task.set_caching_options(enable_caching=False)
    kubernetes.use_secret_as_env(task=git_dvc_clone_task,secret_name="pipeline-secrets",secret_key_to_env={"AWS_ACCESS_KEY_ID" : "AWS_ACCESS_KEY_ID" , "AWS_SECRET_ACCESS_KEY" : "AWS_SECRET_ACCESS_KEY", "GITLAB_ACCESS_TOKEN" : "GITLAB_ACCESS_TOKEN", "GITHUB_USER_EMAIL" : "GITHUB_USER_EMAIL", "GITHUB_USER_NAME" : "GITHUB_USER_NAME"})

    data_preprocessing_task = data_preprocessing(dataset=git_dvc_clone_task.outputs['dataset'])

    train_task = train(processed_dataset=data_preprocessing_task.outputs['processed_dataset'])

    metrics_task = metrics(
        trained_model=train_task.outputs['trained_model'],
        processed_dataset=data_preprocessing_task.outputs['processed_dataset']
    )

    git_dvc_push_task = git_dvc_push(
        trigger=trigger,
        timestamp=timestamp,
        repo=git_dvc_clone_task.outputs['repo'],
        trained_model=train_task.outputs['trained_model'],
        metrics=metrics_task.outputs['metrics']
    )
    kubernetes.use_secret_as_env(task=git_dvc_push_task,secret_name="pipeline-secrets",secret_key_to_env={"AWS_ACCESS_KEY_ID" : "AWS_ACCESS_KEY_ID" , "AWS_SECRET_ACCESS_KEY" : "AWS_SECRET_ACCESS_KEY", "GITLAB_ACCESS_TOKEN" : "GITLAB_ACCESS_TOKEN", "GITHUB_USER_EMAIL" : "GITHUB_USER_EMAIL", "GITHUB_USER_NAME" : "GITHUB_USER_NAME"})
    
    with dsl.If(git_dvc_push_task.output == "True"):    
        deployment_task = deploy()
        deployment_task.set_caching_options(enable_caching=False)
        kubernetes.use_secret_as_env(task=deployment_task,secret_name="pipeline-secrets",secret_key_to_env={"AWS_ACCESS_KEY_ID" : "AWS_ACCESS_KEY_ID" , "AWS_SECRET_ACCESS_KEY" : "AWS_SECRET_ACCESS_KEY", "GITLAB_ACCESS_TOKEN" : "GITLAB_ACCESS_TOKEN", "GITHUB_USER_EMAIL" : "GITHUB_USER_EMAIL", "GITHUB_USER_NAME" : "GITHUB_USER_NAME"})

compiler.Compiler().compile(pipeline, package_path='pipeline.yaml')