In [None]:

from typing import NamedTuple
def trainOp(epochs: int = 50, networks: int = 3) -> NamedTuple('outputs', [('modelfile', str), ('validAcc', float), ('validLoss', float)]):
    
    import json
    import os
    import pathlib


    home = '/home/jovyan'
    
   
    output_file_dir = os.path.join(home, "dataprep")
    output_model_dir = os.path.join(home, "model") # model dir can't contain anything other than model itself.
    output_temp_dir  = os.path.join(home, ".temp")
    model_name = os.environ.get('TINTIN_SESSION_TEMPLATE_MODEL_NAME', 'resnet_graphdef')
    model_version = os.environ.get('TINTIN_SESSION_TEMPLATE_MODEL_VERSION', '1')
    
    
    print("outputdir:", output_file_dir)
    pathlib.Path(output_file_dir).mkdir(parents=True, exist_ok=True)
    pathlib.Path(output_model_dir).mkdir(parents=True, exist_ok=True)
    pathlib.Path(output_temp_dir).mkdir(parents=True, exist_ok=True)
    def load_data():
        
        import numpy as np
        from keras.datasets import cifar10
        (x_train, y_train), (x_test, y_test) = cifar10.load_data()
        np.save(os.path.join(output_file_dir, 'x_train.npy'), x_train)
        np.save(os.path.join(output_file_dir, 'y_train.npy'), y_train)
        np.save(os.path.join(output_file_dir, 'x_test.npy'), x_test)
        np.save(os.path.join(output_file_dir, 'y_test.npy'), y_test)

        
    def train(epochs=50, networks=3):
        
        import os
        import shutil
        import argparse
        import numpy as np

        import tensorflow as tf
        from tensorflow.python.saved_model import builder as saved_model_builder
        from tensorflow.python.saved_model.signature_def_utils import predict_signature_def
        from tensorflow.python.saved_model import tag_constants
        from tensorflow.python.saved_model import signature_constants

        import keras
        from keras.regularizers import l2
        from keras import backend as K
        from keras.models import Model
        from keras import backend as K
        from keras.optimizers import Adam
        from keras.models import load_model
        from keras.layers import Dense, Conv2D
        from keras.layers import BatchNormalization, Activation
        from keras.layers import AveragePooling2D, Input, Flatten
        from keras.callbacks import ModelCheckpoint, LearningRateScheduler
        from keras.callbacks import ReduceLROnPlateau
        #from keras.preprocessing.image import ImageDataGenerator
        from tensorflow.keras.preprocessing.image import ImageDataGenerator

        import tensorflow.contrib.tensorrt as trt
    
        validAcc = None
        validLoss = None

        CONT_TRTIS_RESOURCE_DIR = 'trtis_resource'

        # Copy TRTIS resource (containing config.pbtxt, labels.txt, ...) from container to mounted volume
        model_dir = os.path.join(output_model_dir, model_name)
        if model_version == '1': # if it is default version, we always clear it to keep the space clean
            model_verison_dir = os.path.join(output_model_dir, model_name, model_version)
            if os.path.isdir(model_verison_dir):
                shutil.rmtree(model_verison_dir)
        configdotpbtxt = """name: "resnet_graphdef"
platform: "tensorflow_graphdef"
max_batch_size: 128


input [
  {
    name: "input_1_1"
    data_type: TYPE_FP32
    format: FORMAT_NHWC
    dims: [ 32, 32, 3 ]
  }
]

output [
  {
    name: "dense_1_1/Softmax"
    data_type: TYPE_FP32
    dims: [ 10 ]
    label_filename: "labels.txt"
  }
]

instance_group [
  {
    count: 2,
    kind: KIND_GPU
  }
]

version_policy: { all { }}
"""
        labeldottxt = """airplane
automobile
bird
cat
deer
dog
frog
horse
ship
truck"""

        
        pathlib.Path(model_dir).mkdir(parents=True, exist_ok=True)
        with open(os.path.join(model_dir, 'config.pbtxt'), 'w') as f:
            f.write(configdotpbtxt)
        with open(os.path.join(model_dir, 'labels.txt'), 'w') as f:
            f.write(labeldottxt)

        #shutil.copytree(CONT_TRTIS_RESOURCE_DIR, model_dir)
        #pathlib.Path(os.path.join(model_dir, model_version)).mkdir(parents=True, exist_ok=True)

        # Training parameters
        batch_size = 128  # orig paper trained all networks with batch_size=128
        epochs = int(epochs)
        data_augmentation = True
        num_classes = 10

        # Subtracting pixel mean improves accuracy
        subtract_pixel_mean = True

        n = int(networks)
        # Model version
        # Orig paper: version = 1 (ResNet v1), Improved ResNet: version = 2 (ResNet v2)
        version = 2

        # Computed depth from supplied model parameter n
        if version == 1:
            depth = n * 6 + 2
        elif version == 2:
            depth = n * 9 + 2

        # Model name, depth and version
        model_type = 'ResNet%dv%d' % (depth, version)

        # Load the CIFAR10 data.
        x_train = np.load(os.path.join(output_file_dir, "x_train.npy"))
        y_train = np.load(os.path.join(output_file_dir, "y_train.npy"))
        x_test = np.load(os.path.join(output_file_dir, "x_test.npy"))
        y_test = np.load(os.path.join(output_file_dir, "y_test.npy"))

        # Input image dimensions.
        input_shape = x_train.shape[1:]

        # Normalize data.
        x_train = x_train.astype('float32') / 255
        x_test = x_test.astype('float32') / 255

        # If subtract pixel mean is enabled
        if subtract_pixel_mean:
            x_train_mean = np.mean(x_train, axis=0)
            x_train -= x_train_mean
            x_test -= x_train_mean

        print('x_train shape:', x_train.shape)
        print(x_train.shape[0], 'train samples')
        print(x_test.shape[0], 'test samples')
        print('y_train shape:', y_train.shape)

        # Convert class vectors to binary class matrices.
        y_train = keras.utils.to_categorical(y_train, num_classes)
        y_test = keras.utils.to_categorical(y_test, num_classes)

        def lr_schedule(epoch):
            """Learning Rate Schedule

            Learning rate is scheduled to be reduced after 80, 120, 160, 180 epochs.
            Called automatically every epoch as part of callbacks during training.

            # Arguments
                epoch (int): The number of epochs

            # Returns
                lr (float32): learning rate
            """
            lr = 1e-3
            if epoch > 180:
                lr *= 0.5e-3
            elif epoch > 160:
                lr *= 1e-3
            elif epoch > 120:
                lr *= 1e-2
            elif epoch > 80:
                lr *= 1e-1
            print('Learning rate: ', lr)
            return lr

        def resnet_layer(inputs,
                         num_filters=16,
                         kernel_size=3,
                         strides=1,
                         activation='relu',
                         batch_normalization=True,
                         conv_first=True):
            """2D Convolution-Batch Normalization-Activation stack builder

            # Arguments
                inputs (tensor): input tensor from input image or previous layer
                num_filters (int): Conv2D number of filters
                kernel_size (int): Conv2D square kernel dimensions
                strides (int): Conv2D square stride dimensions
                activation (string): activation name
                batch_normalization (bool): whether to include batch normalization
                conv_first (bool): conv-bn-activation (True) or
                    bn-activation-conv (False)

            # Returns
                x (tensor): tensor as input to the next layer
            """
            conv = Conv2D(num_filters,
                          kernel_size=kernel_size,
                          strides=strides,
                          padding='same',
                          kernel_initializer='he_normal',
                          kernel_regularizer=l2(1e-4))

            x = inputs
            if conv_first:
                x = conv(x)
                if batch_normalization:
                    x = BatchNormalization()(x)
                if activation is not None:
                    x = Activation(activation)(x)
            else:
                if batch_normalization:
                    x = BatchNormalization()(x)
                if activation is not None:
                    x = Activation(activation)(x)
                x = conv(x)
            return x

        def resnet_v1(input_shape, depth, num_classes=10):
            """ResNet Version 1 Model builder [a]

            Stacks of 2 x (3 x 3) Conv2D-BN-ReLU
            Last ReLU is after the shortcut connection.
            At the beginning of each stage, the feature map size is halved (downsampled)
            by a convolutional layer with strides=2, while the number of filters is
            doubled. Within each stage, the layers have the same number filters and the
            same number of filters.
            Features maps sizes:
            stage 0: 32x32, 16
            stage 1: 16x16, 32
            stage 2:  8x8,  64
            The Number of parameters is approx the same as Table 6 of [a]:
            ResNet20 0.27M
            ResNet32 0.46M
            ResNet44 0.66M
            ResNet56 0.85M
            ResNet110 1.7M

            # Arguments
                input_shape (tensor): shape of input image tensor
                depth (int): number of core convolutional layers
                num_classes (int): number of classes (CIFAR10 has 10)

            # Returns
                model (Model): Keras model instance
            """
            if (depth - 2) % 6 != 0:
                raise ValueError('depth should be 6n+2 (eg 20, 32, 44 in [a])')
            # Start model definition.
            num_filters = 16
            num_res_blocks = int((depth - 2) / 6)

            inputs = Input(shape=input_shape)
            x = resnet_layer(inputs=inputs)
            # Instantiate the stack of residual units
            for stack in range(3):
                for res_block in range(num_res_blocks):
                    strides = 1
                    if stack > 0 and res_block == 0:  # first layer but not first stack
                        strides = 2  # downsample
                    y = resnet_layer(inputs=x,
                                     num_filters=num_filters,
                                     strides=strides)
                    y = resnet_layer(inputs=y,
                                     num_filters=num_filters,
                                     activation=None)
                    if stack > 0 and res_block == 0:  # first layer but not first stack
                        # linear projection residual shortcut connection to match
                        # changed dims
                        x = resnet_layer(inputs=x,
                                         num_filters=num_filters,
                                         kernel_size=1,
                                         strides=strides,
                                         activation=None,
                                         batch_normalization=False)
                    x = keras.layers.add([x, y])
                    x = Activation('relu')(x)
                num_filters *= 2

            # Add classifier on top.
            # v1 does not use BN after last shortcut connection-ReLU
            x = AveragePooling2D(pool_size=8)(x)
            y = Flatten()(x)
            outputs = Dense(num_classes,
                            activation='softmax',
                            kernel_initializer='he_normal')(y)

            # Instantiate model.
            model = Model(inputs=inputs, outputs=outputs)
            return model

        
        class LossAndErrorPrintingCallback(keras.callbacks.Callback):
            def on_epoch_end(self, epoch, logs=None):
                print("epoch={}".format(epoch))
                print("Training-Accuracy={:7.6f}".format(logs["accuracy"]))
                print("Training-Loss={:7.6f}".format(logs["loss"]))
                print("Validation-Accuracy={:7.6f}".format(logs["val_accuracy"]))
                print("Validation-Loss={:7.6f}".format(logs["val_loss"]))
        
        def resnet_v2(input_shape, depth, num_classes=10):
            """ResNet Version 2 Model builder [b]

            Stacks of (1 x 1)-(3 x 3)-(1 x 1) BN-ReLU-Conv2D or also known as
            bottleneck layer
            First shortcut connection per layer is 1 x 1 Conv2D.
            Second and onwards shortcut connection is identity.
            At the beginning of each stage, the feature map size is halved (downsampled)
            by a convolutional layer with strides=2, while the number of filter maps is
            doubled. Within each stage, the layers have the same number filters and the
            same filter map sizes.
            Features maps sizes:
            conv1  : 32x32,  16
            stage 0: 32x32,  64
            stage 1: 16x16, 128
            stage 2:  8x8,  256

            # Arguments
                input_shape (tensor): shape of input image tensor
                depth (int): number of core convolutional layers
                num_classes (int): number of classes (CIFAR10 has 10)

            # Returns
                model (Model): Keras model instance
            """
            if (depth - 2) % 9 != 0:
                raise ValueError('depth should be 9n+2 (eg 56 or 110 in [b])')
            # Start model definition.
            num_filters_in = 16
            num_res_blocks = int((depth - 2) / 9)

            inputs = Input(shape=input_shape)
            # v2 performs Conv2D with BN-ReLU on input before splitting into 2 paths
            x = resnet_layer(inputs=inputs,
                             num_filters=num_filters_in,
                             conv_first=True)

            # Instantiate the stack of residual units
            for stage in range(3):
                for res_block in range(num_res_blocks):
                    activation = 'relu'
                    batch_normalization = True
                    strides = 1
                    if stage == 0:
                        num_filters_out = num_filters_in * 4
                        if res_block == 0:  # first layer and first stage
                            activation = None
                            batch_normalization = False
                    else:
                        num_filters_out = num_filters_in * 2
                        if res_block == 0:  # first layer but not first stage
                            strides = 2    # downsample

                    # bottleneck residual unit
                    y = resnet_layer(inputs=x,
                                     num_filters=num_filters_in,
                                     kernel_size=1,
                                     strides=strides,
                                     activation=activation,
                                     batch_normalization=batch_normalization,
                                     conv_first=False)
                    y = resnet_layer(inputs=y,
                                     num_filters=num_filters_in,
                                     conv_first=False)
                    y = resnet_layer(inputs=y,
                                     num_filters=num_filters_out,
                                     kernel_size=1,
                                     conv_first=False)
                    if res_block == 0:
                        # linear projection residual shortcut connection to match
                        # changed dims
                        x = resnet_layer(inputs=x,
                                         num_filters=num_filters_out,
                                         kernel_size=1,
                                         strides=strides,
                                         activation=None,
                                         batch_normalization=False)
                    x = keras.layers.add([x, y])

                num_filters_in = num_filters_out

            # Add classifier on top.
            # v2 has BN-ReLU before Pooling
            x = BatchNormalization()(x)
            x = Activation('relu')(x)
            x = AveragePooling2D(pool_size=8)(x)
            y = Flatten()(x)
            outputs = Dense(num_classes,
                            activation='softmax',
                            kernel_initializer='he_normal')(y)

            # Instantiate model.
            model = Model(inputs=inputs, outputs=outputs)
            return model

        if version == 2:
            model = resnet_v2(input_shape=input_shape, depth=depth)
        else:
            model = resnet_v1(input_shape=input_shape, depth=depth)

        model.compile(loss='categorical_crossentropy',
                      optimizer=Adam(lr=lr_schedule(0)),
                      metrics=['accuracy'])
        model.summary()
        print(model_type)

        # Prepare model model saving directory.
        
        save_dir = os.path.join(home, 'saved_models')
        save_model_name = 'cifar10_%s_model.%03d.h5' % (model_type, epochs)
        if not os.path.isdir(save_dir):
            os.makedirs(save_dir)
        filepath = os.path.join(save_dir, save_model_name)

        # Prepare callbacks for model saving and for learning rate adjustment.
        checkpoint = ModelCheckpoint(filepath=filepath,
                                     monitor='val_accuracy',
                                     verbose=1,
                                     save_best_only=True)

        lr_scheduler = LearningRateScheduler(lr_schedule)

        lr_reducer = ReduceLROnPlateau(factor=np.sqrt(0.1),
                                       cooldown=0,
                                       patience=5,
                                       min_lr=0.5e-6)

        
        
        
        callbacks = [checkpoint, LossAndErrorPrintingCallback(), lr_reducer, lr_scheduler]

        # Run training, with or without data augmentation.
        if not data_augmentation:
            print('Not using data augmentation.')
            model.fit(x_train, y_train,
                      batch_size=batch_size,
                      epochs=epochs,
                      validation_data=(x_test, y_test),
                      shuffle=True,
                      callbacks=callbacks)
        else:
            print('Using real-time data augmentation.')
            # This will do preprocessing and realtime data augmentation:
            datagen = ImageDataGenerator(
                # set input mean to 0 over the dataset
                featurewise_center=False,
                # set each sample mean to 0
                samplewise_center=False,
                # divide inputs by std of dataset
                featurewise_std_normalization=False,
                # divide each input by its std
                samplewise_std_normalization=False,
                # apply ZCA whitening
                zca_whitening=False,
                # epsilon for ZCA whitening
                zca_epsilon=1e-06,
                # randomly rotate images in the range (deg 0 to 180)
                rotation_range=0,
                # randomly shift images horizontally
                width_shift_range=0.1,
                # randomly shift images vertically
                height_shift_range=0.1,
                # set range for random shear
                shear_range=0.,
                # set range for random zoom
                zoom_range=0.,
                # set range for random channel shifts
                channel_shift_range=0.,
                # set mode for filling points outside the input boundaries
                fill_mode='nearest',
                # value used for fill_mode = "constant"
                cval=0.,
                # randomly flip images
                horizontal_flip=True,
                # randomly flip images
                vertical_flip=False,
                # set rescaling factor (applied before any other transformation)
                rescale=None,
                # set function that will be applied on each input
                preprocessing_function=None,
                # image data format, either "channels_first" or "channels_last"
                data_format=None,
                # fraction of images reserved for validation (strictly between 0 and 1)
                validation_split=0.0)

            # Compute quantities required for featurewise normalization
            # (std, mean, and principal components if ZCA whitening is applied).
            datagen.fit(x_train)

            # Fit the model on the batches generated by datagen.flow().
            model.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size),
                                steps_per_epoch=len(x_train)/batch_size,
                                validation_data=(x_test, y_test),
                                epochs=epochs, verbose=1, workers=4,
                                callbacks=callbacks)

        # Score trained model.
        
        scores = model.evaluate(x_test, y_test, verbose=1)
        
        validAcc = scores[1]
        validLoss = scores[0]

        # Save Keras model
        tmp_model_path = os.path.join(output_temp_dir, "tmp")
        if os.path.isdir(tmp_model_path):
            shutil.rmtree(tmp_model_path)
        os.mkdir(tmp_model_path)
        

        keras_model_path = os.path.join(tmp_model_path, 'keras_model.h5')
        model.save(keras_model_path)

        # Convert Keras model to Tensorflow SavedModel
        def export_h5_to_pb(path_to_h5, export_path):
            # Set the learning phase to Test since the model is already trained.
            K.set_learning_phase(0)
            # Load the Keras model
            keras_model = load_model(path_to_h5)
            # Build the Protocol Buffer SavedModel at 'export_path'
            builder = saved_model_builder.SavedModelBuilder(export_path)
            # Create prediction signature to be used by TensorFlow Serving Predict API
            signature = predict_signature_def(inputs={"input_1": keras_model.input},
                                              outputs={"dense_1": keras_model.output})
            with K.get_session() as sess:
                # Save the meta graph and the variables
                builder.add_meta_graph_and_variables(sess=sess, tags=[tag_constants.SERVING],
                                                     signature_def_map={signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature})
            builder.save()

        tf_model_path = os.path.join(output_temp_dir, "tf_saved_model")
        if os.path.isdir(tf_model_path):
            shutil.rmtree(tf_model_path)

        export_h5_to_pb(keras_model_path, tf_model_path)

        # Apply TF_TRT on the Tensorflow SavedModel
        graph = tf.Graph()
        with graph.as_default():
            with tf.Session():
                # Create a TensorRT inference graph from a SavedModel:
                trt_graph = trt.create_inference_graph(
                    input_graph_def=None,
                    outputs=None,
                    input_saved_model_dir=tf_model_path,
                    input_saved_model_tags=[tag_constants.SERVING],
                    max_batch_size=batch_size,
                    max_workspace_size_bytes=2 << 30,
                    precision_mode='fp16')

                print([n.name + '=>' + n.op for n in trt_graph.node])
                
                # note: we should avoid pre-create folder, as inference server will load all models when it started
                # and since we created an empty folder, this will crash inference server
                pathlib.Path(os.path.join(model_dir, model_version)).mkdir(parents=True, exist_ok=True)
                tf.io.write_graph(
                    trt_graph,
                    os.path.join(model_dir, model_version),
                    'model.graphdef',
                    as_text=False
                )

        # Remove tmp dirs
        shutil.rmtree(tmp_model_path)
        shutil.rmtree(tf_model_path)    
        return (validAcc, validLoss)
    
    load_data();
    validAcc, validLoss = train(epochs, networks);
    print("done")
    
    from collections import namedtuple
    outputs = namedtuple('outputs', ['modelfile', 'validAcc', 'validLoss'])
    return outputs(output_model_dir, validAcc, validLoss)

In [None]:
# = trainOp(1,3)

In [None]:
with open("requirements.txt", "w") as f:
    f.write("kfp==0.5.1\n")
    f.write("h5py<3.0.0\n")
    f.write("keras==2.3.1\n")

!pip install -r requirements.txt --user --upgrade

In [None]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
import kfp.compiler as compiler

In [None]:
import os
pvcname = os.environ.get('TINTIN_SESSION_TEMPLATE_PVC_NAME')
generated_pipeline_zip_filename = os.environ.get('TINTIN_SESSION_TEMPLATE_GENERATED_PIPELINE_ZIP_FILENAME')
gpu_type_list_text = os.environ.get('TINTIN_SESSION_TEMPLATE_GPU_TYPE_LIST')
default_image = os.environ.get('TINTIN_SESSION_TEMPLATE_DEFAULT_IMAGE', 'footprintai/nvidia-tensorflow:19.12-tf1-py3')
mountPath = os.environ.get('TINTIN_SESSION_TEMPLATE_MOUNT_PATH', '/home/jovyan')



In [None]:
trainComp = comp.func_to_container_op(trainOp, 
                                      base_image=default_image,
                                      packages_to_install=["keras==2.3.1", "h5py<3.0.0"])

import kfp.dsl as dsl
@dsl.pipeline(
   name='Projectname pipeline',
   description='simple pipeline.'
)
def templated_pipeline_func(
    epochs=50,
    networks=3,
):
    
    pvolumn_dict = {}
    pvolumn_dict[mountPath] = dsl.PipelineVolume(pvc=pvcname);
    
    train_task = trainComp(epochs, networks).add_pvolumes(pvolumn_dict)
    train_task = train_task.add_resource_request('cpu', '1')
    train_task = train_task.add_resource_limit('cpu', '1')
    train_task = train_task.add_resource_request('memory', '4Gi')
    train_task = train_task.add_resource_limit('memory', '4Gi')
    if len(gpu_type_list_text) > 0:
        gpu_type_list = gpu_type_list_text.split(',')
        # Set gpu type here. Default the first gpu type in user resource quota.
        # User could print out the gpu type list and pick up one to replace the following line.
        gpu_type = gpu_type_list[0]
        train_task = train_task.add_resource_request(gpu_type, '1')
        train_task = train_task.add_resource_limit(gpu_type, '1')
        # GPU company is from the GPU type. Used by the gpu driver
        gpu_company = gpu_type.split('/')[0] + '/gpu'
        train_task = train_task.add_resource_request(gpu_company, '1')
        train_task = train_task.add_resource_limit(gpu_company, '1')
    model_name = os.environ.get('TINTIN_SESSION_TEMPLATE_MODEL_NAME', 'resnet_graphdef')
    train_task = train_task.add_pod_annotation('tintin.footprint-ai.com/session-model-name', model_name)
compiler.Compiler().compile(templated_pipeline_func, generated_pipeline_zip_filename)