# Example AI Platform Training Job Submission
---

Purpose of this notebook is to test job submission to AI Platform outside of the context of an end-to-end TFX pipeline. 

In [1]:
BUCKET = 'ml-sandbox-tagging-tfx-experiments'
PROJECT = 'ml-sandbox-101'
REGION = 'europe-west1'

In [2]:
import os 
import pandas as pd

os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.3"
os.environ["PYTHONVERSION"] = "3.7"

In [3]:
%%bash 

mkdir -p trainer
touch trainer/__init__.py

## Create Model.py

In [4]:
%%writefile  trainer/model.py

import tensorflow as tf
import tensorflow_transform as tft
import pandas as pd
import numpy as np

from tensorflow.keras import callbacks, layers
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Precision, Recall

from google.cloud import storage
import pickle

EMBEDDING_LOCATION = 'gs://ml-sandbox-101-tagging/data/processed/training_data/glove_data/glove_embedding_index.pkl'
MAX_STRING_LENGTH = 277


def create_tag_lookup_table(tag_file):
    table = tf.lookup.StaticVocabularyTable(
        tf.lookup.TextFileInitializer(
            tag_file,
            key_dtype=tf.string, key_index=tf.lookup.TextFileIndex.WHOLE_LINE,
            value_dtype=tf.int64, value_index=tf.lookup.TextFileIndex.LINE_NUMBER,
            delimiter=None),
        num_oov_buckets=1)
    return table


def label_transform(x, y, num_tags, table):
    """Use the number of classes to convert the sparse tag indicies to dense"""
    # Need to add one for out-of-vocabulary tags in eval dataset
    return (x, tf.cast(tf.sparse.to_indicator(table.lookup(y), vocab_size=num_tags + 1), tf.int32))

#def _gzip_reader_fn(filenames):
#    """Small utility returning a record reader that can read gzip'ed fies"""
#    return tf.data.TFRecordDataset(filenames) #, compression_type="GZIP")


def _input_fn(file_pattern, tf_transform_output, num_tags, table, batch_size=64, shuffle=True, epochs=None):
    """Generates features and label for tuning/training.
    Args:
        file_pattern: input tfrecord file pattern.
        tf_transform_output: A TFTransformOutput.
        batch_size: representing the number of consecutive elements of
          returned dataset to combine in a single batch
    Returns:
        A dataset that contains (features, indices) tuple where features
        is a dictionary of Tensors, and indices is a single Tensor of
        label indices.
    """
    transformed_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy()
    )

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=tf.data.TFRecordDataset, #_gzip_reader_fn,
        shuffle=shuffle,
        label_key='series_ep_tags',
        num_epochs=epochs
    )
    return dataset.map(lambda x, y: label_transform(x, y, num_tags, table))

class AutoTaggingModel:
    def __init__(
        self,
        embedding_file,
        embedding_dim,
        train_embedding,
        output_size,
        vocab_size,
        vocab_df,
        max_string_length,
    ):
        self.__embedding_file = embedding_file
        self.__embedding_dim = embedding_dim
        self.__vocab_size = vocab_size
        self.__vocab_df = vocab_df
        self.__train_embedding = train_embedding
        self.__output_size = output_size
        self.__max_string_length = max_string_length
        
        self.__initialize_embedding_matrix()
    
    def __initialize_embedding_matrix(self):
        storage_client = storage.Client()
        
        # Better way to do this with os.path?
        split_path = self.__embedding_file.split('/')
        bucket_name = split_path[2]
        blob_name = ('/').join(split_path[3:])
        
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        
        pickle_in = blob.download_as_string()
        file = pickle.loads(pickle_in)
        
        self.embedding_matrix = np.zeros((self.__vocab_size, 
                                     self.__embedding_dim))
        
        for i, word in enumerate(self.__vocab_df.values):
            embedding_vector = file.get(word[0])
            if embedding_vector is not None:
                self.embedding_matrix[i] = embedding_vector
        
    def embedding_layer(self):
        return layers.Embedding(
            input_dim=self.__vocab_size,
            output_dim=self.__embedding_dim,
            weights=[self.embedding_matrix],
            input_length=self.__max_string_length,
            trainable=self.__train_embedding,
        )

    def n_grams_channel(self, inputs, n_words_filter: int):
        channel = layers.Conv2D(256, kernel_size=(n_words_filter, self.__embedding_dim), activation="relu")(inputs)
        channel_mp = layers.MaxPool2D(pool_size=(channel.shape[1], 1))(channel)
        channel_final = layers.Flatten()(channel_mp)
        return channel_final
    
    def define_model(self):
        inputs = layers.Input(shape=(self.__max_string_length,), name='features')
        embedding = self.embedding_layer()(inputs) 
        channel_inputs = layers.Reshape(target_shape=(self.__max_string_length, self.__embedding_dim, 1))(embedding)
        channel1_final = self.n_grams_channel(channel_inputs, 3)
        channel2_final = self.n_grams_channel(channel_inputs, 4)
        channel3_final = self.n_grams_channel(channel_inputs, 5)
        channels_final = layers.Concatenate()(
            [channel1_final, channel2_final, channel3_final]
        )
        channels_final = layers.Dropout(rate=0.4)(channels_final)
        channels_final = layers.Dense(2000, "relu")(channels_final)
        predictions = layers.Dense(self.__output_size, "sigmoid")(channels_final)
        model = Model(inputs=inputs, outputs=predictions)

        return model

    def get_model(self):
        strategy = tf.distribute.MirroredStrategy()
        with strategy.scope():
            model = self.define_model()

            metrics = [tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
            model.compile(
                optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
                loss=BinaryCrossentropy(),
                metrics=metrics,
            )
        return model

    
def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop('series_ep_tags')
        
        parsed_features = tf.io.parse_example(
            serialized_tf_examples, feature_spec
        )

        transformed_features = model.tft_layer(parsed_features)

        outputs = model(transformed_features)
        return {"outputs": outputs}

    return serve_tf_examples_fn


def run_fn(fn_args):
    """Train the model based on given args
    
    Args:
        fn_args: Holds args used to train the model as name/value pairs
    """
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    
    num_tags = tf_transform_output.vocabulary_size_by_name('tags')
    tag_file = tf_transform_output.vocabulary_file_by_name('tags')
    vocab_size = tf_transform_output.vocabulary_size_by_name('vocab')
    vocab_file = tf_transform_output.vocabulary_file_by_name('vocab')
    vocab_df = pd.read_csv(vocab_file, header=None)
    
    table = create_tag_lookup_table(tag_file)
    
    train_dataset = _input_fn(
        file_pattern=fn_args.train_files,
        tf_transform_output=tf_transform_output,
        num_tags=num_tags,
        table=table,
        batch_size=64)

    eval_dataset = _input_fn(
        file_pattern=fn_args.eval_files,
        tf_transform_output=tf_transform_output,
        batch_size=64,
        num_tags=num_tags,
        table=table)
    
    model = AutoTaggingModel(
        embedding_dim=300,
        train_embedding=True,
        embedding_file=EMBEDDING_LOCATION,
        output_size=num_tags + 1,
        vocab_size=vocab_size + 1,
        vocab_df=vocab_df,
        max_string_length=MAX_STRING_LENGTH).get_model()
    
    early_stopping_callback = callbacks.EarlyStopping(monitor='val_loss',
        min_delta=0.0001,
        patience=4,
        verbose=0,  
        mode='auto',  
        restore_best_weights=True)  

    reduce_lr = callbacks.ReduceLROnPlateau(
        monitor='val_loss', 
        factor=0.1, 
        patience=2, 
        verbose=0, 
        mode='auto',
        min_delta=0.0001) 
    
    history = model.fit(
        train_dataset, 
        epochs=10, 
        steps_per_epoch=fn_args.train_steps / 10, 
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        callbacks=[early_stopping_callback, reduce_lr]
    )
    
    signatures = {
        "serving_default": _get_serve_tf_examples_fn(model, tf_transform_output).get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")
        ),
    }
    
    model.save(
        fn_args.serving_model_dir, save_format="tf", signatures=signatures
    )
    

Writing trainer/model.py


### Create Task.py

In [5]:
%%writefile trainer/task.py

import argparse 
import os 

from trainer import model

import tensorflow as tf


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
        "--transform_output",
        dest="transform_output",
        help="Location of transform_fn directory"
    )
    parser.add_argument(
        "--train_files",
        dest="train_files",
        help="Path to training files"
    )
    parser.add_argument(
        "--eval_files",
        dest="eval_files",
        help="Path to evaluation files"
    )
    parser.add_argument(
        "--job-dir",
        dest="job-dir",
        help="This model ignores this field, but it is required by gcloud",
        default="Junk"
    )
    parser.add_argument(
        "--serving_model_dir",
        dest="serving_model_dir",
        help="GCS location to write checkpoitns and export models"
    )
    parser.add_argument(
        "--train_steps",
        dest="train_steps",
        type=int,
        help="Number of steps to train"
    )
    parser.add_argument(
        "--eval_steps",
        dest="eval_steps",
        type=int,
        help="Number of steps to eval"
    )
    
    args = parser.parse_args()
    
    print(args)
    model.run_fn(args)

Writing trainer/task.py


In [6]:
import datetime
print(datetime.datetime.now())

2020-10-26 06:26:35.380766


In [7]:
%%bash
DATE_SUFFIX=$(date -u +%y%m%d_%H%M%S)
OUTDIR=gs://${BUCKET}/new_model/${DATE_SUFFIX}
    
echo ${OUTDIR}

gs://ml-sandbox-tagging-tfx-experiments/new_model/201026_062637


In [8]:
%%bash
DATE_SUFFIX=$(date -u +%y%m%d_%H%M%S)
OUTDIR=gs://${BUCKET}/new_model/${DATE_SUFFIX}/
JOBID=example_model_${DATE_SUFFIX}

gcloud ai-platform local train \
    --module-name=trainer.task \
    --package-path=trainer \
    --job-dir=${OUTDIR} \
    -- \
    --train_files=gs://${BUCKET}/transform_dir/train* \
    --eval_files=gs://${BUCKET}/transform_dir/test* \
    --transform_output=gs://${BUCKET}/transform_dir \
    --serving_model_dir=${OUTDIR} \
    --train_steps=10 \
    --eval_steps=1

Namespace(eval_files='gs://ml-sandbox-tagging-tfx-experiments/transform_dir/test*', eval_steps=1, serving_model_dir='gs://ml-sandbox-tagging-tfx-experiments/new_model/201026_062653/', train_files='gs://ml-sandbox-tagging-tfx-experiments/transform_dir/train*', train_steps=10, transform_output='gs://ml-sandbox-tagging-tfx-experiments/transform_dir', **{'job-dir': 'gs://ml-sandbox-tagging-tfx-experiments/new_model/201026_062653/'})
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


2020-10-26 06:27:03.884238: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2300000000 Hz
2020-10-26 06:27:03.884696: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55c648b54880 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-10-26 06:27:03.884740: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-10-26 06:27:03.887180: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
2020-10-26 06:28:07.673277: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
Instructions for updating:
This property should not be used in 

# Submit Job to AI Platform for Full Training

In [9]:
%%writefile trainer/config.yaml
trainingInput:
  scaleTier: CUSTOM
  masterType: n1-standard-16
  masterConfig:
    acceleratorConfig:
      count: 1
      type: NVIDIA_TESLA_T4

Writing trainer/config.yaml


In [11]:
%%bash
DATE_SUFFIX=$(date -u +%y%m%d_%H%M%S)
OUTDIR=gs://${BUCKET}/new_model/${DATE_SUFFIX}/
JOBID=autotagging_${DATE_SUFFIX}

gcloud ai-platform jobs submit training ${JOBID} \
    --region=europe-west2 \
    --package-path=trainer \
    --module-name=trainer.task \
    --job-dir=gs://${BUCKET}/new_model_job_dir \
    --staging-bucket=gs://${BUCKET} \
    --config=trainer/config.yaml \
    --runtime-version=2.2 \
    --python-version=3.7 \
    -- \
    --train_files=gs://${BUCKET}/transform_dir/train* \
    --eval_files=gs://${BUCKET}/transform_dir/test* \
    --transform_output=gs://${BUCKET}/transform_dir \
    --serving_model_dir=${OUTDIR} \
    --train_steps=14000 \
    --eval_steps=100

jobId: autotagging_201026_062943
state: QUEUED


Job [autotagging_201026_062943] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe autotagging_201026_062943

or continue streaming the logs with the command

  $ gcloud ai-platform jobs stream-logs autotagging_201026_062943


In [None]:
!rm -rf trainer