# Santander Customer Transaction Prediction


![image](imgs/sant_comp.png)

# Problem and data
- Identify which customers will make a specific transaction in the future, irrespective of the amount of money transacted.

- Anonymized dataset containing `numeric feature variables`, the binary `target column`.

- The task is to predict the value of binary `target` column in the test set.

# End-to-end real world ML pipeline
![image](imgs/demo_pipeline.png)

# Kubeflow Pipelines

- Kubeflow Pipelines is a platform for building and deploying portable, scalable machine learning (ML) workflows based on docker containers.


- Kubeflow pipelines make it easy to implement production grade machine learning workflows without worrying about the low-level details of managing a Kubernetes cluster.

# Kubeflow Pipelines Goals

![image](imgs/kf_goals1.png)

 - Visual depiction of pipeline topology

- For any given step you can use libraries and tools of choice

- A single step cloud be a single node instance GPU or CPU / or something that can run on a distributed fashion.

- Easily convert a dockerized task into a pipeline step by following specific format of how the docker container accepts input and produces output.


- Python SDK to specify the sequence of steps of your workflow

# ML Workflow Orchestration

![image](imgs/example_pipeline1.png)

# Kubeflow Pipelines Goals
![image](imgs/kf_goals2.png)

- Package the define pipeline up as a zip file

- Even an individual step can be packaged up as a reusable component.

# Kubeflow Pipelines Goals
![image](imgs/kf_goals3.png)

- Rapidly iterate on your ideas in a reliable and manageable way

- Historical view of any prior runs

- Filters to find past runs

- Clone a past run

- Comparison feature

# What Constitutes a Kubeflow Pipeline?

### 1. Containerized implementations of ML tasks
- Docker containers provide portability, repeatability and encapsulation
- Any tools or libraries could be adopted inside your containerized docker image

# What Constitutes a Kubeflow Pipeline?
### 2. Specification of the sequence of steps
- Specification of the sequence of steps that should run, how the data flows between these steps and how to connect the output of one step with the inputs of downstream step
- Model multi-step workflows as a sequence of steps or capture the dependencies between tasks using a graph (DAG)

# What Constitutes a Kubeflow Pipeline?
### 3. Input Parameters
- Defining input parameters that will be exposed to the end user in a UI form.

# Santander Customer Transaction Prediction: Model

```python
(train_input_fn, feature_names, feature_columns) = \
    make_inputs_from_np_arrays(features_np=train_data[:, 1:],
                               label_np=train_data[:, 0:1])
classifier = tf.estimator.BoostedTreesClassifier(
    feature_columns,
    n_batches_per_layer=args.num_batch,
    model_dir=args.job_dir,
    n_trees=args.n_trees,
    max_depth=args.max_depth,
    learning_rate=args.learning_rate,
    )
classifier.train(train_input_fn)
```

# Setup

In [1]:
EXP_NAME='Santander Training Pipeline'
OUTPUT_DIR='gs://kubeflow-pipelines-demo/tfx'

PROJECT_NAME = 'kf-pipelines'

In [2]:
!pip install kfp --upgrade

Requirement already up-to-date: kfp in /anaconda2/envs/settlementpred/lib/python3.6/site-packages (0.1.31.2)


# Create an Experiment in the Kubeflow pipeline

Kubeflow Pipeline requires having an _experiment_ before making a run. An experiment is a group of comparable runs.

In [3]:
import kfp
from kfp import compiler
import kfp.dsl as dsl

import kfp.notebook
import kfp.gcp as gcp

client = kfp.Client()
exp = client.create_experiment(name=EXP_NAME)

# Define a Pipeline

- Authoring a pipeline is just like authoring a normal Python function with a sepcial decoration for the SDK to recognize it.

- The pipeline function describes the topology of the pipeline and how the input of one component is passed down the stream as an output to another component. 

- In the below pipeline, all the docker container images referenced in the pipeline are already built

# Imports

In [4]:
import warnings; warnings.simplefilter('ignore')

import kfp
from kfp import components
from kfp import dsl
from kfp import gcp
from kfp import onprem

platform = 'GCP'

# Convert a dockerized task into a pipeline step

## Define a python function wrapper for that container by invoking an sdk function ContainerOp

```python
def dataproc_train_op(
    project,
    train_data,
    eval_data,
    output
):
    return dsl.ContainerOp(
        name='Dataproc - Train XGBoost model',
        image='gcr.io/ml-pipeline/xgboost-training:v1.1',
        arguments=[
            '--project', project,
            '--train', train_data,
            '--eval', eval_data,
            '--output', output,
        ],
        file_outputs={
            'output': '/output.txt',
        }
    )
```

# Define a docker container specification file
```yaml
name: Train Boosted Trees
description: Trains Boosted Trees using Tensorflow
inputs:
  - {name: Transformed data dir,  type: GCSPath, description: 'Path to transformed data'}
  - {name: Learning rate,type: Float, default: '0.1', description: 'Learning rate for training.'}
outputs:
  - {name: Training output dir, type: GCSPath, description: 'GCS or local directory.'} 
implementation:
  container:
    image: us.gcr.io/kf-pipelines/kubeflow-train_boosted:v0.3
    command: [python, -m, trainer.boosted]
    args: [
      --transformed-data-dir, {inputValue: Transformed data dir},
      --learning-rate, {inputValue: Learning rate},
      --steps, {inputValue: Steps},
    ]
    fileOutputs:
      Training output dir: /output.txt
```

# Load components (steps) from specification YAML file

In [5]:
dataflow_tf_transform_op = \
    components.load_component_from_file('../pipeline_steps/preprocessing/tft/component.yaml')
tf_train_op = \
    components.load_component_from_file('../pipeline_steps/training/dnntrainer/component.yaml')
dataflow_tf_predict_op = \
    components.load_component_from_file('../pipeline_steps/training/predict/component.yaml')

confusion_matrix_op = \
    components.load_component_from_file('../pipeline_steps/metrics/confusion_matrix/component.yaml')
roc_op = \
    components.load_component_from_file('../pipeline_steps/metrics/roc/component.yaml')

kubeflow_deploy_op = \
    components.load_component_from_file('../pipeline_steps/serving/deployer/component.yaml')

# Define pipeline function

In [6]:
@dsl.pipeline(
    name='Santander Customer Transaction Prediction',
    description='Example pipeline that does classification with model analysis based on Santander customer transaction dataset.'
)
def santander_transaction_classification(
        output,
        project,
        train='gs://kubeflow-pipelines-demo/dataset/train.csv',
        evaluation='gs://kubeflow-pipelines-demo/dataset/test.csv',
        mode='local',
        preprocess_module='gs://kubeflow-pipelines-demo/dataset/preprocessing.py',
        learning_rate=0.1,
        hidden_layer_size='1500',
        steps=3000
):
    output_template = str(output) + '/{{workflow.uid}}/{{pod.name}}/data'
    target_class_lambda = """lambda x: x['target']"""

    tf_server_name = 'kfdemo-service'

    if platform != 'GCP':
        vop = dsl.VolumeOp(
            name="create_pvc",
            resource_name="pipeline-pvc",
            modes=dsl.VOLUME_MODE_RWM,
            size="1Gi"
        )

        checkout = dsl.ContainerOp(
            name="checkout",
            image="alpine/git:latest",
            command=["git", "clone", "https://github.com/kubeflow/pipelines.git", str(output) + "/pipelines"],
        ).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
        checkout.after(vop)

    preprocess = dataflow_tf_transform_op(
        training_data_file_pattern=train,
        evaluation_data_file_pattern=evaluation,
        schema="not.txt",
        gcp_project=project,
        run_mode=mode,
        preprocessing_module=preprocess_module,
        transformed_data_dir=output_template
    )

    training = tf_train_op(
        transformed_data_dir=preprocess.output,
        schema='not.txt',
        learning_rate=learning_rate,
        hidden_layer_size=hidden_layer_size,
        steps=steps,
        target='tips',
        preprocessing_module=preprocess_module,
        training_output_dir=output_template
    )

    prediction = dataflow_tf_predict_op(
        data_file_pattern=evaluation,
        schema='not.txt',
        target_column='tips',
        model=training.outputs['training_output_dir'],
        run_mode=mode,
        gcp_project=project,
        predictions_dir=output_template
    )

    cm = confusion_matrix_op(
        predictions=prediction.outputs['predictions_dir'],
        output_dir=output_template
    )

    roc = roc_op(
        predictions_dir=prediction.outputs['predictions_dir'],
        target_lambda=target_class_lambda,
        output_dir=output_template
    )

    steps = [training, prediction, cm, roc]
    for step in steps:
        if platform == 'GCP':
            step.apply(gcp.use_gcp_secret('user-gcp-sa'))
        else:
            step.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))


# Compile the pipeline

Compile the pipeline into a tar package

In [7]:
compiler.Compiler().compile(santander_transaction_classification, 'santander_training_pipeline.zip')

# Submit the run with parameters

In [8]:
import datetime
import time

run = client.run_pipeline(exp.id, 'santander_training_pipeline-' + time.strftime("%Y%m%d-%H%M%S"),
                          'santander_training_pipeline.zip',
                          params={'output': 'gs://kubeflow-pipelines-demo/tfx', 'project': 'kf-pipelines',
                                  'train': 'gs://kubeflow-pipelines-demo/dataset/train.csv',
                                  'evaluation': 'gs://kubeflow-pipelines-demo/dataset/test.csv', 'mode': 'local',
                                  'preprocess_module': 'gs://kubeflow-pipelines-demo/dataset/preprocessing.py',
                                  'learning_rate': 0.1, 'hidden_layer_size': '1500', 'steps': 3000})