# What are Kubeflow Pipelines? 

A <b>pipeline</b> is a description of a machine learning (ML) workflow, including all of the components in the workflow and how the components relate to each other in the form of a graph.


# What is Pipeline Component?

A pipeline component is one step of a workflow/pipeline that does a specific task.

<img src="./img/iris_classification_pipeline.png" alt="iris classification pipeline" align="left" style="padding-left: 10px; padding-right: 40px;"/> 


The left upper image, shows the pipeline this workshop creates. It is a simple pipeline with just a few steps:
1. The first steps train models using respectively decision tree and K-nearest neighbours algorythms. These two steps are executing in paralel. 
2. The results from the these steps are taken in consideration in the next steps - conditional steps. These steps check which model provides higher accuracity. 
3. Depending on the results of the conditional components only the model that give higher accuracity is saved for later use/serving.

When a pipeline is run, the system launches one or more Kubernetes Pods corresponding to the steps in the workflow (pipeline). The Pods start Docker containers, and the containers in turn start your programs.

# Steps to create a Pipeline
1. Write the ML python code
2. Containerize component’s code
    - Create a Dockerfile
    - Build container image
    - Push image into a container registry
3. Define a Pipeline
    - Define component's specifications (interfaces)
    - Leverage [kfp](https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.html) library 

##  1. ML Python Code

In [8]:
import joblib, IPython
from sklearn import datasets
from sklearn import neighbors
from sklearn import tree
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

In [9]:
def download_data():
    iris_dataset = datasets.load_iris()
    X = iris_dataset.data
    y = iris_dataset.target
    return X, y


def split_data(X, y, test_size=0.25):
    return train_test_split(X, y, test_size=test_size)

In [15]:
def build_model(model="", parameter=""):
    X, y = download_data()
    X_train, X_test, y_train, y_test = split_data(X, y)

    if model == "knn":
        classifier = neighbors.KNeighborsClassifier(n_neighbors=int(parameter))
    else:
        model = "tree"
        classifier = tree.DecisionTreeClassifier(splitter=parameter)

    classifier.fit(X_train, y_train)
 
    predictions = classifier.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    print("Accuracy for {}: {}".format(model, accuracy))

    text_file = open("/tmp/accuracy_{}.txt".format(model), "w")
    text_file.write("{}".format(accuracy))
    text_file.close()
    
    return classifier


In [16]:
def save_final_model(model="", parameter=""):
    classifier = build_model(model, parameter)
    joblib.dump(classifier, "/tmp/{}.pkl".format(model))

## 2. Containerize pipeline component’s code

### Create a Dockerfile

```
FROM python:3.8-slim-buster
RUN pip3 install sklearn
ADD iris_classification.py iris_classification.py
```

### Build container image

We already have created and uploaded the image with our ML code. Dockerhub image - **annajung/iris:latest**

`docker build --tag "iris:latest" -f Dockerfile . `

### Push image into a container registry

`docker login <hub-server>`

`docker push <hub-user>/<repo-name>:<tag>`

## 3. Create Iris Classification Pipeline

In [12]:
import kfp
from kfp import dsl

In [13]:
@dsl.pipeline(
    name='iris-classification',
    description='A basic pipeline example for iris classification'
)
def iris_classification_pipeline(n_neighbors=2, splitter="random"):
    tree = dsl.ContainerOp(
        name="Train using Decision Tree",
        image="annajung/iris:latest",
        command=["sh", "-c"],
        arguments=["python iris_classification.py build_model tree " + str(splitter)],
        file_outputs={'output': '/tmp/accuracy_tree.txt'}
    )

    knn = dsl.ContainerOp(
        name="Train using K Nearest Neighbors",
        image="annajung/iris:latest",
        command=["sh", "-c"],
        arguments=["python iris_classification.py build_model knn " + str(n_neighbors)],
        file_outputs={'output': '/tmp/accuracy_knn.txt'}
    )

    with dsl.Condition(tree.output >= knn.output):
        dsl.ContainerOp(
            name='Train Tree',
            image="annajung/iris:latest",
            command=['sh', '-c'],
            arguments=["python3  iris_classification.py save_final_model tree " + str(splitter)],
            file_outputs={'output': '/tmp/tree.pkl'},
        )

    with dsl.Condition(knn.output > tree.output):
        dsl.ContainerOp(
            name='Train KNN',
            image="annajung/iris:latest",
            command=['sh', '-c'],
            arguments=["python3  iris_classification.py save_final_model knn " + str(n_neighbors)],
            file_outputs={'output': '/tmp/knn.pkl'},
        )

    dsl.get_pipeline_conf().set_ttl_seconds_after_finished(500)

DSL Compiler compiles given pipeline function into workflow yaml.

In [14]:
kfp.compiler.Compiler().compile(iris_classification_pipeline, 'iris_classification_pipeline.yaml')