In [None]:
# !apt-get update -y
# !apt-get install libxslt-dev libxml2-dev libpam-dev libedit-dev -y
# !apt-get install postgresql postgresql-client -y
# !apt install postgresql-server-dev-12 -y
# !pip install psycopg2 tensorflow kfp --no-cache-dir
authservice_session='<oidc_authservice_session_token'
namespace="<user_id>"


In [None]:
import os
os.getcwd()

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras

# Data exploration libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
fashion_mnist = keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

In [None]:
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

In [None]:
print(f'Number of training images: {train_images.shape[0]}')
print(f'Number of test images: {test_images.shape[0]}')
print(f'Image size: {train_images.shape[1:]}')
print(f'Number of labels: {len(train_labels)}')
print(f'Number of test labels: {len(test_labels)}')

In [None]:
unique_train_labels = np.unique(train_labels)
train_images = train_images / 255.0
test_images = test_images / 255.0

In [None]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [None]:
def train(data_path, model_file):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import pickle
    
    import tensorflow as tf
    from tensorflow import keras
    
    # Download the dataset and split into training and test data. 
    fashion_mnist = keras.datasets.fashion_mnist

    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

    # Normalize the data so that the values all fall between 0 and 1.
    train_images = train_images / 255.0
    test_images = test_images / 255.0

    # Define the model using Keras.
    model = keras.Sequential([
    keras.layers.Flatten(input_shape=(28, 28)),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
                  loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Run a training job with specified number of epochs
    model.fit(train_images, train_labels, epochs=10)

    # Evaluate the model and print the results
    test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)
    print('Test accuracy:', test_acc)

    # Save the model to the designated 
    model.save(f'{data_path}/{model_file}')

    # Save the test_data as a pickle file to be used by the predict component.
    with open(f'{data_path}/test_data', 'wb') as f:
        pickle.dump((test_images,test_labels), f)

In [None]:
def predict(data_path, model_file, image_number):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import pickle

    import tensorflow as tf
    from tensorflow import keras

    import numpy as np
    
    # Load the saved Keras model
    model = keras.models.load_model(f'{data_path}/{model_file}')

    # Load and unpack the test_data
    with open(f'{data_path}/test_data','rb') as f:
        test_data = pickle.load(f)
    # Separate the test_images from the test_labels.
    test_images, test_labels = test_data
    # Define the class names.
    class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                   'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

    # Define a Softmax layer to define outputs as probabilities
    probability_model = tf.keras.Sequential([model, 
                                            tf.keras.layers.Softmax()])

    # See https://github.com/kubeflow/pipelines/issues/2320 for explanation on this line.
    image_number = int(image_number)

    # Grab an image from the test dataset.
    img = test_images[image_number]

    # Add the image to a batch where it is the only member.
    img = (np.expand_dims(img,0))

    # Predict the label of the image.
    predictions = probability_model.predict(img)

    # Take the prediction with the highest probability
    prediction = np.argmax(predictions[0])

    # Retrieve the true label of the image from the test labels.
    true_label = test_labels[image_number]
    
    class_prediction = class_names[prediction]
    confidence = 100*np.max(predictions)
    actual = class_names[true_label]
    
    
    with open(f'{data_path}/result.txt', 'w') as result:
        result.write(" Prediction: {} | Confidence: {:2.0f}% | Actual: {}".format(class_prediction,
                                                                        confidence,
                                                                        actual))
    
    print('Prediction has be saved successfully!')

In [None]:
def red(data_path, host, db, port, user, pw):
    import os
    import psycopg2
    con=psycopg2.connect(dbname=db, host=host, 
                         port=port, user=user, password=pw)
    print("Connection Successful")
    
    import csv
    cursor = con.cursor()
    query = """SELECT * FROM who_icd9_vdm_td.icd9_proc"""
    cursor.execute(query)
    with open(f'{data_path}/reddata.csv', 'w') as f:
        writer = csv.writer(f, delimiter=',')
        for row in cursor.fetchall():
            writer.writerow(row)
    con.close()

In [None]:
def store(data_path, model_file, bucket):
    import os
    import boto3
    file = (f'{data_path}/{model_file}')
    s3client = boto3.client('s3')
    bucket_name = bucket
    object_key = "project1/"+model_file
    list_buckets_resp = s3client.list_buckets()
    
    s3client.put_object(Bucket=bucket_name, Key=object_key, Body=file)

In [None]:
red_op = comp.func_to_container_op(red, base_image='<image_registry>/kubeflow:ds2')

In [None]:
# gpu tensorflow/tensorflow:latest-gpu-py3
train_op = comp.func_to_container_op(train, base_image='tensorflow/tensorflow')
predict_op = comp.func_to_container_op(predict, base_image='tensorflow/tensorflow')

In [None]:
store_op = comp.func_to_container_op(store, base_image='<image_registry>/kubeflow:ds2')

In [None]:
client = kfp.Client(host='https://<kubeflow_platform>/pipeline', cookies=f"authservice_session={authservice_session}")
client.list_experiments(namespace=namespace)

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='MNIST Pipeline',
   description='To test mnist model training and prediction.'
)

# Define parameters to be fed into pipeline
def mnist_container_pipeline(
    data_path: str,
    model_file: str, 
    image_number: int,
    host: str,
    db: str,
    port: int,
    user: str,
    pw: str,
    bucket: str,
):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume",
    size="1Gi")
#     storage_class="efs-sc",
#     modes=dsl.VOLUME_MODE_RWM)
#     vop.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    red_container = red_op(data_path, host, db, port, user, pw) \
                .add_pvolumes(({data_path: vop.volume})).set_display_name("Redshift")
    red_container.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    # Create MNIST training component.
    mnist_training_container = train_op(data_path, model_file) \
                                    .add_pvolumes({data_path: red_container.pvolume})
    mnist_training_container.execution_options.caching_strategy.max_cache_staleness = "P0D"

    # Create pipeline store component.
    store_container = store_op(data_path, model_file, bucket) \
                                    .add_pvolumes({data_path: mnist_training_container.pvolume})
    store_container.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    # Create MNIST prediction component.
    mnist_predict_container = predict_op(data_path, model_file, image_number) \
                                    .add_pvolumes({data_path: store_container.pvolume})
    mnist_predict_container.execution_options.caching_strategy.max_cache_staleness = "P0D"

    # Print the result of the prediction
    mnist_result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: mnist_predict_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']
    )

In [None]:
DATA_PATH = '/mnt'
MODEL_PATH='mnist_model.h5'
# An integer representing an image from the test set that the model will attempt to predict the label for.
IMAGE_NUMBER = 0

HOST='<REDSHIFT_HOSTNAME>'
DB='who_icd'
PORT=5439

BUCKET='kubeflow-usw2-s3bucket-user-datasets-dev'

import yaml
with open("/home/jovyan/credentials.yaml", 'r') as ymlfile:
    cfg = yaml.safe_load(ymlfile)
RS_USER=(cfg['redpass']['username'])
RS_PASS=(cfg['redpass']['password'])
os.environ["RS_USER"] = RS_USER
os.environ["RS_PASS"] = RS_PASS

In [None]:
pipeline_func = mnist_container_pipeline

In [None]:
experiment_name = 'fashion_mnist_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
             "model_file":MODEL_PATH,
             "image_number": IMAGE_NUMBER,
             "host":HOST,
             "db":DB,
             "port":PORT,
             "user":RS_USER,
             "pw":RS_PASS,
             "bucket":BUCKET}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
# run_result = client.create_run_from_pipeline_func(pipeline_func, 
#                                                   experiment_name=experiment_name, 
#                                                   run_name=run_name, 
#                                                   arguments=arguments)

In [None]:
import kfp
client = kfp.Client(host='https://<kubeflow_platform>/pipeline', cookies=f"authservice_session={authservice_session}", namespace=namespace)
client.list_experiments(namespace=namespace)
client.set_user_namespace(namespace=namespace)
client.create_run_from_pipeline_func(mnist_container_pipeline, arguments=arguments)