# Building Kubeflow Pipeline
## Use case : Customer Churn Prediction

## Step 1 : Build Docker Artifacts

In [1]:
cd /home/jupyter/kubeflow-pipeline-demo/customer_churn_analysis

/home/jupyter/kubeflow-pipeline-demo/customer_churn_analysis


In [2]:
#Login to docker
!docker login -u datasciencechampion -p pongapandit@6171

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [59]:
%%writefile Dockerfile
FROM tensorflow/tensorflow:2.2.0-gpu
ARG DEBIAN_FRONTEND=noninteractive
# Install apt dependencies
RUN apt-get update && apt-get install -y \
    git \
    gpg-agent \
    python3-cairocffi \
    protobuf-compiler \
    python3-pil \
    python3-lxml \
    python3-tk \
    wget
# Install gcloud and gsutil commands
# https://cloud.google.com/sdk/docs/quickstart-debian-ubuntu
RUN export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" && \
    echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && \
    curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
    apt-get update -y && apt-get install google-cloud-sdk -y
WORKDIR /pipeline
COPY ./ ./
RUN pip install -r requirements.txt
RUN pip install "dask[dataframe]" --upgrade
RUN pip install decorator --upgrade
ENV TF_CPP_MIN_LOG_LEVEL 3

Overwriting Dockerfile


## Step 2 : Building Docker Image

In [63]:
!docker build -t datasciencechampion/kubeflow-pipeline-churn_prediction .

Sending build context to Docker daemon  1.329MB
Step 1/10 : FROM tensorflow/tensorflow:2.2.0-gpu
 ---> f5ba7a196d56
Step 2/10 : ARG DEBIAN_FRONTEND=noninteractive
 ---> Using cache
 ---> b2e05ab73d8b
Step 3/10 : RUN apt-get update && apt-get install -y     git     gpg-agent     python3-cairocffi     protobuf-compiler     python3-pil     python3-lxml     python3-tk     wget
 ---> Using cache
 ---> 0d616db986bf
Step 4/10 : RUN export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" &&     echo "deb http://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list &&     curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - &&     apt-get update -y && apt-get install google-cloud-sdk -y
 ---> Using cache
 ---> 92da9db79b84
Step 5/10 : WORKDIR /pipeline
 ---> Using cache
 ---> e6d134cf60f2
Step 6/10 : COPY ./ ./
 ---> Using cache
 ---> 976f2d694cec
Step 7/10 : RUN pip install -r requirements.txt
 ---> Using cache
 ---> d00

## Push Docker Image to Docker hub

In [64]:
!docker push datasciencechampion/kubeflow-pipeline-churn_prediction

Using default tag: latest
The push refers to repository [docker.io/datasciencechampion/kubeflow-pipeline-churn_prediction]

[1B833cd69d: Preparing 
[1Bccb3b8e5: Preparing 
[1B7973907f: Preparing 
[1B75602735: Preparing 
[1B3c4a94f0: Preparing 
[1Ba48741b8: Preparing 
[1B17f323df: Preparing 
[1Be55f84c6: Preparing 
[1Bb0f92c14: Preparing 
[1Bcf4cd527: Preparing 
[1Bc1f74e01: Preparing 
[1B9e4b0fc9: Preparing 
[1Be3b79e0a: Preparing 
[1Be43735a0: Preparing 
[1B3918ca41: Preparing 
[1B768f66a4: Preparing 
[1Bd332a58a: Preparing 
[1Bf11cbf29: Preparing 
[1Ba4b22186: Preparing 
[1Bafb09dc3: Preparing 
[1Bb5a53aac: Preparing 
[1Bc8e5063e: Preparing 
[2Bc8e5063e: Layer already exists [18A[2K[16A[2K[13A[2K[11A[2K[8A[2K[4A[2K[3A[2K[2A[2Klatest: digest: sha256:6490dba35a9d9e80cf9a18e724c3567b691445587ff0735c51bfc6967fcad516 size: 5147


# Step 3: Install Kubeflow SDk

In [29]:
!pip install kfp



# Step 4 : Building Kubeflow Pipeline

In [65]:
import kfp
import kfp.components as comp
from kubernetes.client.models import V1EnvVar

## Define kubeflow pipeline Component

In [66]:
@kfp.dsl.component
def get_sample_data():
    # Defining component configuration
    get_sample_data_component = kfp.dsl.ContainerOp(
        name='Data Collection',
        image='docker.io/datasciencechampion/kubeflow-pipeline-churn_prediction',
        command=['python', 'get_sample_data.py'],
        )
    return get_sample_data_component

In [67]:
@kfp.dsl.component
def data_preprocessing():    
    # Defining component configuration
    data_preprocessing = kfp.dsl.ContainerOp(
        name='data-preprocessing',
        image='docker.io/datasciencechampion/kubeflow-pipeline-churn_prediction',
        command=['python', 'preprocess_data.py'],
        )
    return data_preprocessing

In [68]:
@kfp.dsl.component
def training_and_evaluation():
    # Defining component configuration
    training_and_evaluation_component = kfp.dsl.ContainerOp(
        name='training-and-evaluation',
        image='docker.io/datasciencechampion/kubeflow-pipeline-churn_prediction',
        command=['python', 'train_evaluate_model.py'],
        file_outputs={'mlpipeline-ui-metadata':'/mlpipeline-ui-metadata.json', "mlpipeline-metrics":'/mlpipeline-metrics.json'}
        )
    
    return training_and_evaluation_component

In [69]:
# Let see output of component configuration
debug = True
if debug :
    training_component_vis = training_and_evaluation()
    print(training_component_vis)

{'ContainerOp': {'is_exit_handler': False, 'human_name': 'training-and-evaluation', 'display_name': None, 'name': 'training-and-evaluation 800007f7da18e325', 'node_selector': {}, 'volumes': [], 'tolerations': [], 'affinity': {}, 'pod_annotations': {}, 'pod_labels': {}, 'num_retries': 0, 'retry_policy': None, 'backoff_factor': None, 'backoff_duration': None, 'backoff_max_duration': None, 'timeout': 0, 'init_containers': [], 'sidecars': [], 'loop_args': None, '_inputs': [], 'dependent_names': [], 'enable_caching': True, 'attrs_with_pipelineparams': ['node_selector', 'volumes', 'pod_annotations', 'pod_labels', 'num_retries', 'init_containers', 'sidecars', 'tolerations', '_container', 'artifact_arguments', '_parameter_arguments'], '_is_v2': False, '_container': {'args': None,
 'command': ['python', 'train_evaluate_model.py'],
 'env': None,
 'env_from': None,
 'image': 'docker.io/datasciencechampion/kubeflow-pipeline-churn_prediction',
 'image_pull_policy': None,
 'lifecycle': None,
 'liven

## Final Kubeflow pipeline Definition

In [70]:
@kfp.dsl.pipeline(
  name="Telecom Customer Churn Prediction ",
  description="Modeling Churn Prediction for telecom"
)
def churn():
    download_data = get_sample_data()
    download_data.execution_options.caching_strategy.max_cache_staleness = "P0D"
    data_processing = data_preprocessing().after(download_data)
    data_processing.execution_options.caching_strategy.max_cache_staleness = "P0D"
    train = training_and_evaluation().after(data_processing)
    train.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [71]:
# Let see output of pipeline configuration
debug = True
if debug :
    training_pipeline_output = churn()
    print(training_pipeline_output)

None


## Compile Kubeflow Pipeline
### It will Generate .zip file inside this contain YAMl file which contain the configuration of kubeflow pipeline

In [72]:
kfp.compiler.Compiler().compile(churn, 'kubeflow-pipeline-churn_prediction.zip')

# Step 5 : Connect to deployed kubeflow pipeline Endpoint (GCP)

In [73]:
# Create kfp client
# Note: Add the KubeFlow Pipeline endpoint below if the client is not running on the same cluster.
#client = kfp.Client("336edb688959bb59-dot-us-central1.pipelines.googleusercontent.com")
client = kfp.Client("41ab66e06164f219-dot-us-central1.pipelines.googleusercontent.com")

# Step 6 : Create Experiment

In [74]:
EXPERIMENT_NAME = 'churn-prediction-1'
experiment = client.create_experiment(name=EXPERIMENT_NAME)

## Deploy pipeline to kubeflow pipeline Endpoint

In [75]:
run = client.run_pipeline(experiment.id, 'churn-prediction-run-4', 'kubeflow-pipeline-churn_prediction.zip')