# Digit Recognizer Kubeflow Pipeline

In this [Kaggle competition](https://www.kaggle.com/competitions/digit-recognizer/overview) 

>MNIST ("Modified National Institute of Standards and Technology") is the de facto “hello world” dataset of computer vision. Since its release in 1999, this classic dataset of handwritten images has served as the basis for benchmarking classification algorithms. As new machine learning techniques emerge, MNIST remains a reliable resource for researchers and learners alike.

>In this competition, your goal is to correctly identify digits from a dataset of tens of thousands of handwritten images.

# Install relevant libraries


>Update pip `pip install --user --upgrade pip`

>Install and upgrade kubeflow sdk `pip install kfp --upgrade --user --quiet`

You may need to restart your notebook kernel after installing the kfp sdk

In [1]:
!pip install --user --upgrade pip

Collecting pip
  Downloading pip-23.2.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m23.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pip
[0mSuccessfully installed pip-23.2.1


In [None]:
!pip install --user  kfp==1.8.11

In [18]:
# confirm the kfp sdk
! pip show kfp

Name: kfp
Version: 1.8.11
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 
Location: /home/chase/.local/lib/python3.8/site-packages
Requires: absl-py, click, cloudpickle, Deprecated, docstring-parser, fire, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, typing-extensions, uritemplate
Required-by: kubeflow-kale


## Import kubeflow pipeline libraries

In [29]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
from typing import NamedTuple


## Kubeflow pipeline component creation

 Component 1: mnist_process

In [4]:
# download data step
def download_load_preprocess_data(download_link: str, output_data_path: OutputPath(str)):
    
    import os
    import zipfile
    import pickle
    import wget
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    
    # Check and create output data path
    if not os.path.exists(output_data_path):
        os.makedirs(output_data_path)
    
    # Step 1: Download Data
    wget.download(download_link.format(file='train'), f'{output_data_path}/train_csv.zip')
    wget.download(download_link.format(file='test'), f'{output_data_path}/test_csv.zip')
    
    with zipfile.ZipFile(f"{output_data_path}/train_csv.zip","r") as zip_ref:
        zip_ref.extractall(output_data_path)
        
    with zipfile.ZipFile(f"{output_data_path}/test_csv.zip","r") as zip_ref:
        zip_ref.extractall(output_data_path)

    # Step 2: Load Data
    train_data_path = os.path.join(output_data_path, 'train.csv')
    test_data_path = os.path.join(output_data_path, 'test.csv')

    train_df = pd.read_csv(train_data_path)
    test_df = pd.read_csv(test_data_path)

    ntrain = train_df.shape[0]
    all_data = pd.concat((train_df, test_df)).reset_index(drop=True)
    print("all_data size is : {}".format(all_data.shape))

    # Step 3: Preprocess Data
    all_data_X = all_data.drop('label', axis=1)
    all_data_y = all_data.label

    all_data_X = all_data_X.values.reshape(-1, 28, 28, 1)
    all_data_X = all_data_X / 255.0

    X = all_data_X[:ntrain].copy()
    y = all_data_y[:ntrain].copy()

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
    
    with open(f'{output_data_path}/train', 'wb') as f:
        pickle.dump((X_train,  y_train), f)
        
    with open(f'{output_data_path}/test', 'wb') as f:
        pickle.dump((X_test,  y_test), f)
    
    return print('Done!')


Component 2: load the digits Dataset

In [5]:
# model and predict
def modeling_and_prediction(preprocess_data_path: str, 
                            model_path: str):
    import os
    import pickle
    import numpy as np
    import pandas as pd
    import json
    from collections import namedtuple
    from sklearn.metrics import confusion_matrix
    from tensorflow import keras, optimizers
    from tensorflow.keras.metrics import SparseCategoricalAccuracy
    from tensorflow.keras.losses import SparseCategoricalCrossentropy
    from tensorflow.keras.models import load_model
    # Step 1: Modeling
    # Load train data
    with open(f'{preprocess_data_path}/train', 'rb') as f:
        train_data = pickle.load(f)
        
    # Separate the X_train from y_train.
    X_train, y_train = train_data
    
    # Initializing the model
    hidden_dim1 = 56
    hidden_dim2 = 100
    DROPOUT = 0.5
    model = keras.Sequential([
            keras.layers.Conv2D(filters=hidden_dim1, kernel_size=(5,5), padding='Same', activation='relu'),
            keras.layers.Dropout(DROPOUT),
            keras.layers.Conv2D(filters=hidden_dim2, kernel_size=(3,3), padding='Same', activation='relu'),
            keras.layers.Dropout(DROPOUT),
            keras.layers.Conv2D(filters=hidden_dim2, kernel_size=(3,3), padding='Same', activation='relu'),
            keras.layers.Dropout(DROPOUT),
            keras.layers.Flatten(),
            keras.layers.Dense(10, activation="softmax")
        ])

    model.build(input_shape=(None, 28, 28, 1))

    # Compile the model
    model.compile(optimizers.Adam(learning_rate=0.001), 
                  loss=SparseCategoricalCrossentropy(), 
                  metrics=SparseCategoricalAccuracy(name='accuracy'))

    # Fit the model
    model.fit(np.array(X_train), np.array(y_train), validation_split=0.1, epochs=1, batch_size=64)
    
    # Load test data
    with open(f'{preprocess_data_path}/test', 'rb') as f:
        test_data = pickle.load(f)
    
    # Separate X_test and y_test
    X_test, y_test = test_data
    
    # Evaluate the model
    test_loss, test_acc = model.evaluate(np.array(X_test), np.array(y_test), verbose=0)
    print("Test_loss: {}, Test_accuracy: {}".format(test_loss, test_acc))
    
    # Save the model
    os.makedirs(model_path, exist_ok=True)
    model.save(f'{model_path}/model.h5')

    # Step 2: Prediction
    # Load the model
    model = load_model(f'{model_path}/model.h5')

    # Prediction
    y_pred = np.argmax(model.predict(X_test), axis=-1)

    # Confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    vocab = list(np.unique(y_test))

    # Process confusion matrix data
    data = [(vocab[target_index], vocab[predicted_index], count) for target_index, target_row in enumerate(cm) for predicted_index, count in enumerate(target_row)]
    
    # Create a DataFrame
    df = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    df[['target', 'predicted']] = df[['target', 'predicted']].astype(int).astype(str)
    
    # Create metadata
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {"name": "target", "type": "CATEGORY"},
                    {"name": "predicted", "type": "CATEGORY"},
                    {"name": "count", "type": "NUMBER"}
                ],
                "source": df.to_csv(header=False, index=False),
                "storage": "inline",
                "labels": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
            }
        ]
    }

    # Save metadata
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

    conf_m_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata'])
    
    return conf_m_result(json.dumps(metadata))

In [30]:
# create light weight components
download_load_preprocess_data_op = comp.create_component_from_func(download_load_preprocess_data, base_image="chasechristensen/mnist_process:v1",output_component_file='data_component.yaml')
modeling_and_prediction_op = comp.create_component_from_func(modeling_and_prediction, base_image="chasechristensen/mnist_predict:v1",output_component_file='predict_component.yaml')


NameError: name 'download_load_preprocess_data' is not defined

In [24]:
# Create real components

In [108]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
from typing import NamedTuple
data_comp=comp.load_component_from_file("./components/consolidated_steps/download_load_preprocess_data/data_component.yaml")
prediction_comp=comp.load_component_from_file("./components/consolidated_steps/modeling_and_prediction/predict_component.yaml")



Create kubeflow pipeline components from images

## Kubeflow pipeline creation

In [113]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()

In [120]:
# define pipeline
@dsl.pipeline(name="digit-recognizer-pipeline", 
              description="Performs Preprocessing, training and prediction of digits")

# Define parameters to be fed into pipeline
def digit_recognize_pipeline(download_link: str,
                             model_path:str
                            ):


    # Create download container.
    data_container = data_comp(download_link)
    data_container.set_memory_limit('40G')
    data_container.set_memory_request('40G')
    # Create predict container
    predict_container= prediction_comp(data_container.outputs['output_train_data'],data_container.outputs['output_test_data'],model_path)
    predict_container.set_memory_limit('4G')
    predict_container.set_memory_request('4G')
    

In [121]:
download_link = 'https://github.com/kubeflow/examples/blob/master/digit-recognition-kaggle-competition/data/{file}.csv.zip?raw=true'
output_data_path = "/mnt"
model_path = "/model"

In [122]:
pipeline_func = digit_recognize_pipeline

experiment_name = 'digit_recognizer_hardened04'
run_name = pipeline_func.__name__ + ' run'

arguments = {"download_link": download_link,
             "model_path":model_path}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments
                                                 )


ValueError: Invalid memory string. Should be an integer, or integer followed by one of "E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki"