# MNIST classification

In [None]:
!pip3 install kfp

In [None]:
import tensorflow as tf

import numpy as np
import collections

# visualization tools
%matplotlib inline
import matplotlib.pyplot as plt

import kfp
from kfp.components import func_to_container_op, InputPath, OutputPath
from kfp import dsl
import numpy as np

In [None]:
import tensorflow as tf
import matplotlib
import kfp
from kfp.components import func_to_container_op, InputPath, OutputPath
from typing import NamedTuple
from kfp import dsl

@func_to_container_op
def read_data(output_text_path: OutputPath(str)):
    import tensorflow as tf
    import numpy as np
    import os
    from zipfile import ZipFile
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

    # Rescale the images from [0,255] to the [0.0,1.0] range.
    x_train, x_test = x_train[..., np.newaxis]/255.0, x_test[..., np.newaxis]/255.0

    np.save('xtrain.npy', x_train)
    np.save('ytrain.npy', y_train)

    np.save('xtest.npy', x_test)
    np.save('ytest.npy', y_test)
    
    zipObj = ZipFile(output_text_path, 'w')
    
    zipObj.write('xtrain.npy')
    zipObj.write('ytrain.npy')
    zipObj.write('xtest.npy')
    zipObj.write('ytest.npy')
    
    zipObj.close()

In [None]:
# Preprocessing data
@func_to_container_op
def preprocess_data(text_path: InputPath(), output_text_path: OutputPath()):
    import numpy as np
    import os
    import tarfile
    print('tarfile imported')
    from zipfile import ZipFile
    
    with ZipFile(text_path, 'r') as zipObj:
       zipObj.extractall()
    
    # Load data
    x_train = np.load('xtrain.npy')
    y_train = np.load('ytrain.npy')

    x_test = np.load('xtest.npy')
    y_test = np.load('ytest.npy')
    
    # Filter 3 and 6
    def filter_36(x, y):
        keep = (y == 3) | (y == 6)
        x, y = x[keep], y[keep]
        y = y == 3
        return x,y
    
    print("Number of unfiltered training examples:", len(x_train))
    print("Number of unfiltered test examples:", len(x_test))
    
    x_train, y_train = filter_36(x_train, y_train)
    x_test, y_test = filter_36(x_test, y_test)

    print("Number of filtered training examples:", len(x_train))
    print("Number of filtered test examples:", len(x_test))
    
    # Save modified data
    np.save('xtrain_filtered.npy', x_train)
    np.save('ytrain_filtered.npy', y_train)

    np.save('xtest_filtered.npy', x_test)
    np.save('ytest_filtered.npy', y_test)
    
    zipObj = ZipFile(output_text_path, 'w')
    
    zipObj.write('xtrain_filtered.npy')
    zipObj.write('ytrain_filtered.npy')
    zipObj.write('xtest_filtered.npy')
    zipObj.write('ytest_filtered.npy')
    
    zipObj.close()

In [None]:
# Model
@func_to_container_op
def model_full(text_path: InputPath(), output_text_path: OutputPath()):
    # A simple model based off LeNet from https://keras.io/examples/mnist_cnn/
    import tensorflow as tf
    from zipfile import ZipFile
    import numpy as np
    
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Conv2D(32, [3, 3], activation='relu', input_shape=(28,28,1)))
    model.add(tf.keras.layers.Conv2D(64, [3, 3], activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.25))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(1))

    model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])

    model.summary()
    
    with ZipFile(text_path, 'r') as zipObj:
       zipObj.extractall()
    
    # Load data
    x_train = np.load('xtrain_filtered.npy')
    y_train = np.load('ytrain_filtered.npy')

    x_test = np.load('xtest_filtered.npy')
    y_test = np.load('ytest_filtered.npy')
    
    model.fit(x_train, y_train, batch_size=128, epochs=1, verbose=1, validation_data=(x_test, y_test))

    cnn_results = model.evaluate(x_test, y_test)
    
    with open(output_text_path, 'w') as writer:
        writer.write(str(cnn_results) + '\n')

In [None]:
# Model
@func_to_container_op
def model_fair(text_path: InputPath(), output_text_path: OutputPath()):
    import tensorflow as tf
    from zipfile import ZipFile
    import numpy as np
    
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Flatten(input_shape=(28,28,1)))
    model.add(tf.keras.layers.Dense(2, activation='relu'))
    model.add(tf.keras.layers.Dense(1))

    model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(),
                  metrics=['accuracy'])

    model.summary()
    
    with ZipFile(text_path, 'r') as zipObj:
       zipObj.extractall()
    
    # Load data
    x_train = np.load('xtrain_filtered.npy')
    y_train = np.load('ytrain_filtered.npy')

    x_test = np.load('xtest_filtered.npy')
    y_test = np.load('ytest_filtered.npy')
    
    model.fit(x_train, y_train, batch_size=128, epochs=1, verbose=1, validation_data=(x_test, y_test))

    cnn_results = model.evaluate(x_test, y_test)
    
    with open(output_text_path, 'w') as writer:
        writer.write(str(cnn_results) + '\n')

In [None]:
@func_to_container_op
def models_evaluate(text_path_0: InputPath(), text_path_1: InputPath()):
    print('model 0:')
    with open(text_path_0, 'r') as reader:
        for line in reader:
            print(line, end = '')
    print('model 1:')
    with open(text_path_1, 'r') as reader:
        for line in reader:
            print(line, end = '')

In [None]:
@dsl.pipeline(
    name='ML first',
    description='ML first).'
)
def ml_pipeline_first():
    data_dir = read_data()
    new_dir = preprocess_data(data_dir.output)
    cnn_res = model_full(new_dir.output)
    fairnn_res = model_fair(new_dir.output)
    models_evaluate(cnn_res.output, fairnn_res.output)

In [None]:
kfp.compiler.Compiler().compile(ml_pipeline_first, 'ml_pipeline_demo.zip')