In [6]:
import kfp
import kfp.dsl as dsl 
import kfp.compiler as compiler
import kubernetes.client.models as k8s
import namesgenerator

# Download

At this stage we will obtain all training data for our pipeline.

### Stage Image

Create working file

In [1]:
%%bash
cat > 01_download.py << EOL
from PIL import Image
import struct, numpy
import os, gzip, tarfile, shutil, glob
import urllib, urllib.parse, urllib.request

filenames = [
    'train-images-idx3-ubyte.gz',
    'train-labels-idx1-ubyte.gz',
    't10k-images-idx3-ubyte.gz',
    't10k-labels-idx1-ubyte.gz'
]

def download_files(base_url, base_dir, filenames=None):
    """ Download required data """
    if not filenames: 
        # if not any filenames provided, use global instead
        filenames = globals()["filenames"]
    
    os.makedirs(base_dir, exist_ok=True)
    for file in filenames:
        print("Started downloading {}".format(file), flush=True)
        download_url = urllib.parse.urljoin(base_url, file)
        download_path = os.path.join(base_dir, file)
        local_file, _ = urllib.request.urlretrieve(download_url, download_path)
        unpack_archive(local_file, base_dir)

def unpack_archive(file, base_dir):
    """ Unpack compressed file """

    print("Unpacking archive {}".format(file), flush=True)
    with gzip.open(file, 'rb') as f_in, open(file[:-3],'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)
    os.remove(file)

def process_images(path, dataset):
    """ Preprocess downloaded MNIST datasets """
    
    print("Processing images {}".format(os.path.join(path, dataset)), flush=True)
    label_file = os.path.join(path, dataset + '-labels-idx1-ubyte')
    with open(label_file, 'rb') as file:
        _, num = struct.unpack(">II", file.read(8))
        labels = numpy.fromfile(file, dtype=numpy.int8) #int8
        new_labels = numpy.zeros((num, 10))
        new_labels[numpy.arange(num), labels] = 1

    img_file = os.path.join(path, dataset + '-images-idx3-ubyte')
    with open(img_file, 'rb') as file:
        _, num, rows, cols = struct.unpack(">IIII", file.read(16))
        imgs = numpy.fromfile(file, dtype=numpy.uint8).reshape(num, rows, cols) #uint8
        imgs = imgs.astype(numpy.float32) / 255.0

    os.remove(label_file); os.remove(img_file)
    print("Saving files under {} path".format(os.path.join(path, dataset)), flush=True)
    numpy.savez_compressed(os.path.join(path, dataset), imgs=imgs, labels=labels)

def download_mnist(base_url, base_dir):
    """ Download original MNIST structs and pack them into numpy arrays """

    download_files(base_url, base_dir)
    process_images(base_dir, "train")
    process_images(base_dir, "t10k") 
    
if __name__ == "__main__": 
    mount_path = os.environ.get("MOUNT_PATH", "./")
    data_path = os.path.join(mount_path, "data", "mnist")
    download_mnist(
        base_url="http://yann.lecun.com/exdb/mnist/",
        base_dir=data_path)
EOL

Create Dockerfile

In [3]:
%%bash 
cat > 01_Dockerfile << EOL
FROM tidylobster/odsc-base:1.0
ADD ./01_download.py /src/download.py
WORKDIR /src/
ENTRYPOINT [ "python", "download.py" ]
EOL

Build & publish image

In [4]:
%%bash 
docker build -t tidylobster/mnist-pipeline-download:latest -f 01_Dockerfile --no-cache . 
docker push tidylobster/mnist-pipeline-download:latest

Sending build context to Docker daemon  140.3MB
Step 1/4 : FROM tidylobster/odsc-base:1.0
 ---> a44d37e5b862
Step 2/4 : ADD ./01_download.py /src/download.py
 ---> 10d83625539c
Step 3/4 : WORKDIR /src/
 ---> Running in f0886124a87f
Removing intermediate container f0886124a87f
 ---> 845ced88d068
Step 4/4 : ENTRYPOINT [ "python", "download.py" ]
 ---> Running in 2c979ed6faca
Removing intermediate container 2c979ed6faca
 ---> a520dd169843
Successfully built a520dd169843
Successfully tagged tidylobster/mnist-pipeline-download:latest
The push refers to repository [docker.io/tidylobster/mnist-pipeline-download]
201688e90ce7: Preparing
66a75017de07: Preparing
83c720aa6f39: Preparing
56995c671038: Preparing
eee35c27cf87: Preparing
93ed1238773c: Preparing
eeb5ce6b3db4: Preparing
886601877ba4: Preparing
0fc100fdc7f9: Preparing
68dda0c9a8cd: Preparing
f67191ae09b8: Preparing
b2fd8b4c3da7: Preparing
0de2edf7bff4: Preparing
b2fd8b4c3da7: Waiting
93ed1238773c: Waiting
0fc100fdc7f9: Waiting
0de2edf

### Pipeline

Create Kubernetes PVC resource

In [7]:
storage_pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="storage")
storage_volume = k8s.V1Volume(name="storage", persistent_volume_claim=storage_pvc)
storage_volume_mount = k8s.V1VolumeMount(
    mount_path="{{workflow.parameters.mount-path}}", name="storage")

Create required environmnet variables 

In [8]:
mount_path_env = k8s.V1EnvVar(name="MOUNT_PATH", value="{{workflow.parameters.mount-path}}")

Define container operation

In [10]:
def download_op():
    download = dsl.ContainerOp(name="download",
        image="tidylobster/mnist-pipeline-download:latest")  # <-- Replace with built docker image
    download.add_volume(storage_volume)
    download.add_volume_mount(storage_volume_mount)
    download.add_env_variable(mount_path_env)
    return download

In [11]:
@dsl.pipeline(name="mnist", description="MNIST classifier")
def pipeline_definition(
    mount_path="/storage",
):
    download = download_op()

In [12]:
compiler.Compiler().compile(pipeline_definition, "pipeline.tar.gz")

### Test

In [13]:
# Create Pipelines client
client = kfp.Client("http://d0958ac4.kubeflow.odsc.k8s.hydrosphere.io")

In [14]:
# Define an experiment name
experiment_name='MNIST Showreal'

In [15]:
# Define a name for the run
run_name = namesgenerator.get_random_name()
run_name

'dazzling_nobel'

In [16]:
# get or create an experiment_id
try:
    experiment_id = client.get_experiment(experiment_name=experiment_name).id
except:
    experiment_id = client.create_experiment(experiment_name).id

In [17]:
# make a run
result = client.run_pipeline(experiment_id, run_name, "pipeline.tar.gz")

# Train

At this stage we will create a model & train it on the downloaded data.

### Stage Image

Create working file

In [21]:
%%bash
cat > 02_train.py << EOL
import os, json
import tensorflow as tf
import numpy as np

mount_path = os.environ.get("MOUNT_PATH", "./")
models_path = os.path.join(mount_path, "models")
data_path = os.path.join(mount_path, "data", "mnist")
dev_env = int(os.environ.get("DEV_ENV", "0"))
recurring_run = int(os.environ.get("RECURRING_RUN", "0"))

if recurring_run:
    train_file = "subsample-train.npz"
    test_file = "subsample-test.npz"
else: 
    train_file = "train.npz"
    test_file = "t10k.npz"

learning_rate = float(os.environ.get("LEARNING_RATE", 0.01))
epochs = int(os.environ.get("EPOCHS", 10))
batch_size = int(os.environ.get("BATCH_SIZE", 256))


def input_fn(file, shuffle=True):
    with np.load(os.path.join(data_path, file)) as data:
        imgs = data["imgs"]
        labels = data["labels"].astype(int)
    return tf.estimator.inputs.numpy_input_fn(x={"imgs": imgs}, y=labels, 
        shuffle=shuffle, batch_size=batch_size, num_epochs=epochs)

if __name__ == "__main__":
    tf.logging.set_verbosity(tf.logging.INFO)

    # Prepare data inputs
    img_feature_column = tf.feature_column.numeric_column("imgs", shape=(28,28))
    train_fn, test_fn = input_fn(train_file), input_fn(test_file)

    # Create the model
    estimator = tf.estimator.DNNClassifier(
        n_classes=10,
        hidden_units=[256, 64],
        feature_columns=[img_feature_column],
        optimizer=tf.train.AdamOptimizer(learning_rate=learning_rate))

    # Train and evaluate the model
    estimator.train(train_fn)
    evaluation = estimator.evaluate(test_fn)
    accuracy = float(evaluation["accuracy"])

    # Export the model 
    serving_input_receiver_fn = tf.estimator \
        .export.build_raw_serving_input_receiver_fn(
            {"imgs": tf.placeholder(tf.float32, shape=(None, 28, 28))})
    estimator.export_savedmodel(models_path, serving_input_receiver_fn)

    # Perform metrics calculations
    accuracy_file = "./accuracy.txt" if dev_env else "/accuracy.txt"
    metrics_file = "./mlpipeline-metrics.json" if dev_env else "/mlpipeline-metrics.json"
    
    metrics = {
        'metrics': [
            {
                'name': 'accuracy-score',   # -- The name of the metric. Visualized as the column 
                                            # name in the runs table.
                'numberValue': accuracy,    # -- The value of the metric. Must be a numeric value.
                'format': "PERCENTAGE",     # -- The optional format of the metric. Supported values are 
                                            # "RAW" (displayed in raw format) and "PERCENTAGE" 
                                            # (displayed in percentage format).
            },
        ],
    }

    # Dump metrics
    with open(accuracy_file, "w+") as file:
        file.write(str(accuracy))
    
    with open(metrics_file, "w+") as file:
        json.dump(metrics, file)
EOL

Create Dockerfile

In [22]:
%%bash 
cat > 02_Dockerfile << EOL
FROM tidylobster/odsc-base:1.0
ADD ./02_train.py /src/train.py
WORKDIR /src/
ENTRYPOINT [ "python", "train.py" ]
EOL

Build and publish image

In [23]:
%%bash 
docker build -t tidylobster/mnist-pipeline-train:latest -f 02_Dockerfile --no-cache . 
docker push tidylobster/mnist-pipeline-train:latest

Sending build context to Docker daemon  140.3MB
Step 1/4 : FROM tidylobster/odsc-base:1.0
 ---> a44d37e5b862
Step 2/4 : ADD ./02_train.py /src/train.py
 ---> 027c25d137aa
Step 3/4 : WORKDIR /src/
 ---> Running in 931a8c43b3f0
Removing intermediate container 931a8c43b3f0
 ---> 88a3890b7600
Step 4/4 : ENTRYPOINT [ "python", "train.py" ]
 ---> Running in 7a385dd80dc3
Removing intermediate container 7a385dd80dc3
 ---> 1aff4627778c
Successfully built 1aff4627778c
Successfully tagged tidylobster/mnist-pipeline-train:latest
The push refers to repository [docker.io/tidylobster/mnist-pipeline-train]
3cfa4e88bb4d: Preparing
66a75017de07: Preparing
83c720aa6f39: Preparing
56995c671038: Preparing
eee35c27cf87: Preparing
93ed1238773c: Preparing
eeb5ce6b3db4: Preparing
886601877ba4: Preparing
0fc100fdc7f9: Preparing
68dda0c9a8cd: Preparing
f67191ae09b8: Preparing
b2fd8b4c3da7: Preparing
0de2edf7bff4: Preparing
0fc100fdc7f9: Waiting
68dda0c9a8cd: Waiting
f67191ae09b8: Waiting
93ed1238773c: Waiting


### Pipeline

Create required environmnet variables

In [24]:
learning_rate_env = k8s.V1EnvVar(name="LEARNING_RATE", value="{{workflow.parameters.learning-rate}}")
epochs_env = k8s.V1EnvVar(name="EPOCHS", value="{{workflow.parameters.epochs}}")
batch_size_env = k8s.V1EnvVar(name="BATCH_SIZE", value="{{workflow.parameters.batch-size}}")
recurring_run_env = k8s.V1EnvVar(name="RECURRING_RUN", value="{{workflow.parameters.recurring-run}}")

Define container operation

In [25]:
def train_op():
    train = dsl.ContainerOp(
        name="train",
        image="tidylobster/mnist-pipeline-train:latest",        # <-- Replace with correct docker image
        file_outputs={"accuracy": "/accuracy.txt"})

    train.add_volume(storage_volume)
    train.add_volume_mount(storage_volume_mount)
    train.add_env_variable(mount_path_env)
    train.add_env_variable(learning_rate_env)
    train.add_env_variable(epochs_env)
    train.add_env_variable(batch_size_env)
    train.add_env_variable(recurring_run_env)
    
    return train

In [26]:
@dsl.pipeline(name="mnist", description="MNIST classifier")
def pipeline_definition(
    mount_path="/storage",
    learning_rate="0.01",
    epochs="10",
    batch_size="256",
    recurring_run="0",
):
    
    download = download_op()
    
    train = train_op()
    train.after(download)
    train.set_memory_request('2G')
    train.set_cpu_request('1')

In [27]:
compiler.Compiler().compile(pipeline_definition, "pipeline.tar.gz")

### Test

In [28]:
run_name = namesgenerator.get_random_name()
run_name

'wizardly_dubinsky'

In [29]:
# make a run
result = client.run_pipeline(experiment_id, run_name, "pipeline.tar.gz")

# Upload 

At this stage we will upload the model to Hydrosphere

### Stage Image

Create working file