In [None]:
%%bash
pip install tensorflow-transform
pip install apache-beam[gcp]

In [None]:
import os
if not os.path.exists('trainer'):
    os.mkdir('trainer')

In [None]:
%%writefile trainer/model.py
#!/usr/bin/env python


from __future__ import print_function, division, absolute_import # python 2 compatibility
import sys
reload(sys)
sys.setdefaultencoding('utf8')

import pandas as pd
import tensorflow as tf
import tensorflow.contrib.learn as tflearn
import tensorflow.contrib.metrics as metrics
from tensorflow_transform.saved import input_fn_maker, saved_transform_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.beam.tft_beam_io import beam_metadata_io
import tensorflow_model_analysis as tfma
import tensorflow_hub as hub
import apache_beam as beam
import shutil
import os
from config import REGION, BUCKET, PROJECT, LABEL_COL, PASSTHROUGH_COLS, STRING_COLS, FLOAT_COLS, INT_COLS, DELIM, RENAMED_COLS, TOKENIZE_COL, MAX_TOKENS
print(tf.__version__)
tf.logging.set_verbosity(tf.logging.INFO)

# Cloud Setup
This section is required only if running on cloud (ML Engine)

In [None]:
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = '1.10'

In [None]:
%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

# Set up Model as a Package

## Pre-requisites
Data is assumed to have been prepared in the `TFRecords` format with GZIP compression. This gets us the best performance and scalability compared to csv files. The conversion of `csv` to `TFRecords` should be done in the previous notebook, `02-tf_transform.ipynb`

## Package Setup
We need to set up our model as a package for training and serving.

- `model.py` provides the code for data inputs and the model itself
- `setup.py` provides metadata about the package
- `task.py` sets up the package to be used from the command line, with arguments that specify hyperparameters to the model as well as GCP resources 

In [None]:
%%writefile trainer/model.py --append


with open('data/misc/labels.txt', 'r') as f:
    LABEL_VOCABULARY = f.readline().split(DELIM)
    N_CLASSES = len(LABEL_VOCABULARY)


def build_estimator(model_dir, model_type, embedding_type, learning_rate,
                    hidden_units, dropout, embedding_trainable,
                    l1_regularization_strength, l2_regularization_strength):
  
    if embedding_type == 'nnlm':
        module_url = 'https://tfhub.dev/google/nnlm-en-dim128/1'
    elif embedding_type == 'universal-sentence-encoder':
        module_url = 'https://tfhub.dev/google/universal-sentence-encoder/2'
    elif embedding_type == 'elmo':
        module_url = 'https://tfhub.dev/google/elmo/2'
    elif embedding_type == 'word2vec':
        module_url = 'https://tfhub.dev/google/Wiki-words-500-with-normalization/1'
    elif embedding_type is None:
        pass
    else:
        raise InputError('Embedding type must be one of "nnlm", "universal-sentence-encoder", "elmo", "word2vec", None')
    
    if embedding_type is not None:
        embedding = hub.text_embedding_column('full_description', module_url, trainable=embedding_trainable)
        vendor_embedding = hub.text_embedding_column('vendor_name', module_url, trainable=embedding_trainable)
    
    bow_indices = tf.feature_column.categorical_column_with_identity('bow_indices', num_buckets=MAX_TOKENS+1)
    weighted_bow = tf.feature_column.weighted_categorical_column(bow_indices, 'bow_weight')
    
    
    if model_type == 'linear':
        feature_columns = [weighted_bow]
        
        estimator = tf.estimator.LinearClassifier(
            feature_columns=feature_columns,
            n_classes=N_CLASSES,
            label_vocabulary=LABEL_VOCABULARY,
            model_dir=model_dir,
            optimizer=tf.train.FtrlOptimizer(
                learning_rate=learning_rate,
                l1_regularization_strength=l1_regularization_strength,
                l2_regularization_strength=l2_regularization_strength
            )
        )
    elif model_type == 'dnn':
        feature_columns = [embedding, vendor_embedding]
        
        estimator = tf.estimator.DNNClassifier(
            feature_columns=feature_columns,
            hidden_units=hidden_units,
            n_classes=N_CLASSES,
            label_vocabulary=LABEL_VOCABULARY,
            model_dir=model_dir,
            optimizer=tf.train.AdamOptimizer(
                learning_rate=learning_rate,
            ),
            dropout=dropout,
            batch_norm=True
        )
    elif model_type == 'dnn-linear-combined':
        dnn_features = [embedding, vendor_embedding]
        linear_features = [weighted_bow]
        
        estimator = tf.estimator.DNNLinearCombinedClassifier(
            linear_feature_columns=linear_features,
            linear_optimizer=tf.train.FtrlOptimizer(
                learning_rate=learning_rate,
                l1_regularization_strength=l1_regularization_strength,
                l2_regularization_strength=l2_regularization_strength
            ),
            dnn_feature_columns=dnn_features,
            dnn_optimizer=tf.train.AdamOptimizer(
                learning_rate=learning_rate,
            ),
            dnn_dropout=dropout,
            dnn_hidden_units=hidden_units,
            n_classes=N_CLASSES,
            label_vocabulary=LABEL_VOCABULARY,
            model_dir=model_dir,
            batch_norm=True
        )
    else:
        raise InputErorr('Model type must be one of "linear", "dnn" or "dnn-linear-combined"')
    
    # enable feature passthrough for matching results to input
    if len(PASSTHROUGH_COLS) > 0:
        estimator = tf.contrib.estimator.forward_features(estimator, PASSTHROUGH_COLS)

    return estimator
        
# Serving input function
def make_serving_input_fn_for_base64_json(args):
    raw_metadata = metadata_io.read_metadata(
        os.path.join(args['metadata_path'], 'rawdata_metadata'))
    transform_savedmodel_dir = (
        os.path.join(args['metadata_path'], 'transform_fn'))
    return input_fn_maker.build_parsing_transforming_serving_input_receiver_fn(
        raw_metadata,
        transform_savedmodel_dir,
        exclude_raw_keys=[LABEL_COL]
    )

def make_serving_input_fn(args):
    transform_savedmodel_dir = (
        os.path.join(args['metadata_path'], 'transform_fn'))
    
    def _input_fn():
        feature_placeholders = {
            column_name: tf.placeholder(tf.string, [None]) for column_name in STRING_COLS
        }
        feature_placeholders.update({
            column_name: tf.placeholder(tf.float32, [None]) for column_name in FLOAT_COLS
        })
        feature_placeholders.update({
            column_name: tf.placeholder(tf.int64, [None]) for column_name in INT_COLS
        })
        feature_placeholders.pop(LABEL_COL)
        
        _, features = saved_transform_io.partially_apply_saved_transform(
            transform_savedmodel_dir,
            feature_placeholders
        )
        
        # so that outputs are consistently in lists
        if len(PASSTHROUGH_COLS) > 0:
            for col in PASSTHROUGH_COLS:
                features[col] = tf.expand_dims(tf.identity(feature_placeholders[col]), axis=1)
        
        return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
    
    return _input_fn


def make_eval_input_fn(args):
    transform_savedmodel_dir = (
        os.path.join(args['metadata_path'], 'transform_fn'))
    
    def _input_fn():
        metadata = beam_metadata_io.metadata_io.read_metadata('data/tft/metadata/rawdata_metadata/')
        raw_feature_spec = metadata.schema.as_feature_spec()
        
        serialized_tf_example = tf.placeholder(dtype=tf.string, shape=[None], name='input_example_tensor')
        
        features = tf.parse_example(serialized_tf_example, raw_feature_spec)
        
        _, transformed_features = saved_transform_io.partially_apply_saved_transform(
            transform_savedmodel_dir,
            features
        )
        
        receiver_tensors = {'examples': serialized_tf_example}
        
        return tfma.export.EvalInputReceiver(
            features=transformed_features,
            receiver_tensors=receiver_tensors,
            labels=transformed_features[LABEL_COL]
        )
    
    return _input_fn


# training, eval and test input function
def read_dataset(args, mode):
    batch_size = args['train_batch_size']
    if mode == tf.estimator.ModeKeys.TRAIN:
        input_paths = args['train_data_paths']
    elif mode == tf.estimator.ModeKeys.EVAL:
        input_paths = args['eval_data_paths']
    else:
        input_paths = args['test_data_paths']
    
    transformed_metadata = metadata_io.read_metadata(
        os.path.join(args['metadata_path'], 'transformed_metadata'))
    
    return input_fn_maker.build_training_input_fn(
        metadata=transformed_metadata,
        file_pattern = (input_paths[0] if len(input_paths) == 1 else input_paths),
        training_batch_size=batch_size,
        label_keys=[LABEL_COL],
        reader=gzip_reader_fn,
        randomize_input=(mode == tf.estimator.ModeKeys.TRAIN),
        num_epochs=(None if mode == tf.estimator.ModeKeys.TRAIN else 1)
    )


# create tf.estimator train and evaluate function
def train_and_evaluate(args):
    # figure out train steps based on no. of epochs, no. of rows in dataset and batch size
    tfrecord_options = tf.python_io.TFRecordOptions(compression_type=tf.python_io.TFRecordCompressionType.GZIP)
    nrows = sum(
        sum(1 for _ in tf.python_io.tf_record_iterator(f, options=tfrecord_options)) 
        for f in tf.gfile.Glob(args['train_data_paths'])
    )
    num_epochs = args['num_epochs']
    batch_size = args['train_batch_size']
    if batch_size > nrows:
        batch_size = nrows
    max_steps = num_epochs * nrows / batch_size
    
    # modify according to build_estimator function
    estimator = build_estimator(
        args['model_dir'],
        args['model_type'],
        args['embedding_type'],
        args['learning_rate'],
        args['hidden_units'].split(' '),
        args['dropout'],
        args['embedding_trainable'],
        args['l1_regularization_strength'],
        args['l2_regularization_strength']
    )
    
    os.makedirs('./model_trained/eval')
    early_stopping = tf.contrib.estimator.stop_if_no_decrease_hook(
        estimator,
        metric_name='loss',
        max_steps_without_decrease=3 * nrows / batch_size
    )
    train_spec = tf.estimator.TrainSpec(
        input_fn=read_dataset(args, tf.estimator.ModeKeys.TRAIN),
        max_steps=max_steps,
        hooks=[early_stopping]
    )
    
    exporter = tf.estimator.LatestExporter('exporter', make_serving_input_fn(args))
    
    eval_spec = tf.estimator.EvalSpec(
        input_fn=read_dataset(args, tf.estimator.ModeKeys.EVAL),
        steps=None,
        exporters=exporter
    )
    
    tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
    
    tfma.export.export_eval_savedmodel(
        estimator=estimator,
        export_dir_base=os.path.join(args['model_dir'], 'eval', 'tfma'),
        eval_input_receiver_fn=make_eval_input_fn(args)
    )
    
    # export results
    if not os.path.exists('data/output'):
        os.mkdir('data/output')
    eval_preds = pd.DataFrame(list(estimator.predict(input_fn=read_dataset(args, tf.estimator.ModeKeys.EVAL))))
    probabilities = list(list(arr) for arr in eval_preds['probabilities']) # pandas is weird with how it stores arrays
    with tf.Session() as sess:
        eval_preds['probability'] = sess.run(tf.reduce_max(probabilities, reduction_indices=[1]))
    eval_preds['pred_' + LABEL_COL] = eval_preds['classes'].map(lambda x: x[0]) # predictions come in a list per row
    eval_preds = eval_preds[['pred_' + LABEL_COL, 'probability']]
    raw_eval_df = pd.concat([
        pd.read_csv(f, sep=DELIM, names=RENAMED_COLS)
        for f in tf.gfile.Glob('data/split/eval*.tsv')], 
        axis=0, ignore_index=True)
    cols = list(raw_eval_df.columns)
    cols.remove(LABEL_COL)
    raw_eval_df = raw_eval_df[cols + [LABEL_COL]]
    for col in ['pred_' + LABEL_COL, 'probability']:
        raw_eval_df[col] = eval_preds[col]
    raw_eval_df['wrong'] = (raw_eval_df['pred_' + LABEL_COL] != raw_eval_df[LABEL_COL]).astype(int)
    raw_eval_df.to_excel('data/output/eval_with_preds.xlsx', index=False)


def gzip_reader_fn():
    return tf.TFRecordReader(options=tf.python_io.TFRecordOptions(
        compression_type=tf.python_io.TFRecordCompressionType.GZIP))


def get_eval_metrics():
    return {
        'accuracy': tflearn.MetricSpec(metric_fn=metrics.streaming_accuracy),
        'training/hptuning/metric': tflearn.MetricSpec(metric_fn=metrics.streaming_accuracy),
    }

In [None]:
%%writefile trainer/setup.py

from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = [
]

setup(
    name='{name_of_model}',
    version='0.1',
    author = '{name of author}',
    author_email = '{email@example.com}',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='{Some description}',
    requires=[]
)

In [None]:
%%writefile trainer/task.py
import traceback
import argparse
import json
import os

import model

import tensorflow as tf

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # Input Arguments
    parser.add_argument(
        '--train_data_paths',
        help = 'GCS or local path to training data',
        required = True
    )
    parser.add_argument(
        '--train_batch_size',
        help = 'Batch size for training steps',
        type = int,
        default = 512
    )
    parser.add_argument(
        '--eval_batch_size',
        help = 'Batch size for evaluation steps',
        type = int,
        default = 512
    )
    parser.add_argument(
        '--num_epochs',
        help = 'Epochs to run the training job for',
        type = int,
        default = 50
    )
    parser.add_argument(
        '--eval_steps',
        help = 'Number of steps to run evalution for at each checkpoint',
        default = 10,
        type = int
    )
    parser.add_argument(
        '--eval_data_paths',
        help = 'GCS or local path to evaluation data',
        required = True
    )
    # TensorFlow Transform args
    parser.add_argument(
        '--metadata_path',
        help = 'GCS or local path to transformed metadata if using TFT',
        default = '../../data/tft/metadata'
    )
    # Training arguments
    parser.add_argument(
        '--model_dir',
        help = 'GCS location to write checkpoints and export models',
        required = True
    )
    parser.add_argument(
        '--job-dir',
        help = 'this model ignores this field, but it is required by gcloud',
        default = 'junk'
    )
    # Eval arguments
    parser.add_argument(
        '--eval_delay_secs',
        help = 'How long to wait before running first evaluation',
        default = 10,
        type = int
    )
    parser.add_argument(
        '--min_eval_frequency',
        help = 'Minimum number of training steps between evaluations',
        default = 1,
        type = int
    )
    # Model Specific arguments
    parser.add_argument(
        '--model_type',
        help='Type of ML model, either "linear" or "dnn"',
        default='linear',
        type=str
    )
    parser.add_argument(
        '--embedding_type',
        help='Embedding to use, one of "nnlm", "universal-sentence-encoder", "elmo"',
        default='universal-sentence-encoder',
        type=str
    )
    parser.add_argument(
        '--learning_rate',
        help='Learning rate',
        default=0.01,
        type=float
    )
    parser.add_argument(
        '--hidden_units',
        help='Hidden units of the DNN model, separated by space e.g. "128 64"',
        type=str
    )
    parser.add_argument(
        '--dropout',
        help='Dropout rate (between 0 and 1)',
        default=0.0,
        type=float
    )
    parser.add_argument(
        '--embedding_trainable',
        help='Is embedding column from TFHub trainable? e.g. True',
        default=False,
        type=bool
    )
    parser.add_argument(
        '--l1_regularization_strength',
        help='L1 regularisation strength; controls how sparse the linear model will be',
        default=0.01,
        type=float
    )
    parser.add_argument(
        '--l2_regularization_strength',
        help='L2 regularisation strength; controls the magnitude of the weights in the linear model',
        default=0.01,
        type=float
    )

    args = parser.parse_args()
    arguments = args.__dict__

    # Unused args provided by service
    arguments.pop('job_dir', None)
    arguments.pop('job-dir', None)

    output_dir = arguments['model_dir']

    # Append trial_id to path if we are doing hptuning
    # This code can be removed if you are not using hyperparameter tuning
    output_dir = os.path.join(
        output_dir,
        json.loads(
            os.environ.get('TF_CONFIG', '{}')
        ).get('task', {}).get('trial', '')
    )

    # Run the training job:
    try:
        model.train_and_evaluate(arguments)
    except:
        traceback.print_exc()

In [None]:
%%writefile trainer/__init__.py
# 

# Train Model

In [None]:
%%bash
export PYTHONPATH=${PYTHONPATH}:$PWD
rm -rf model_trained
python -m trainer.task \
    --train_data_paths='./data/tft/train*' \
    --eval_data_paths='./data/tft/eval*' \
    --model_dir='./model_trained' \
    --num_epochs=50 \
    --train_batch_size=512 \
    --eval_batch_size=512 \
    --metadata_path='./data/tft/metadata' \
    \
    --model_type='dnn' \
    --embedding_type='nnlm' \
    --learning_rate=0.01 \
    --hidden_units='256 256 256' \
    --dropout=0.4 \
    --embedding_trainable=False \
    --l1_regularization_strength=0.01 \
    --l2_regularization_strength=0.01

# Serve Model
Better to run this in an actual terminal rather than here, so you can continue running other stuff.

1. Replace 'model_trained' with whatever OUTPUT_DIR you have specified
1. Replace 'exporter' with whatever you specified in `tf.estimator.LatestExporter`

In [None]:
%%bash
tensorflow_model_server \
    --rest_api_port=9000 \
    --model_base_path=${PWD}/model_trained/export/exporter/

# Predictions
The REST API can be called using the following signature: `http:{URI}:{PORT}/v1/models/{MODEL_NAME}[/versions/{VERSION}]:{VERB}`

where

- MODEL_NAME is "default" if no model name is specified when exporting the model
- Specifying the version is optional
- VERB is one of 'classify', 'regress', 'predict'. For serving, you should be using 'predict'
- signature_name should be 'predict' when serving

In [None]:
%%writefile debug.json
{
    "signature_name": "predict",
    "instances": [
        {
            "vendor_name": "some vendor name",
            "invoice_number": "Invoice1",
            "invoice_date": "DD/MM/YYYY",
            "business_unit": "BU01",
            "header_description": "some header description",
            "line_number": 1,
            "line_description": "some line description",
            "amount": 123.0
        },
        {
            "vendor_name": "some vendor name",
            "invoice_number": "Invoice1",
            "invoice_date": "DD/MM/YYYY",
            "business_unit": "BU01",
            "header_description": "some header description",
            "line_number": 1,
            "line_description": "some line description",
            "amount": 10.0
        }
    ]
}

In [None]:
%%bash
curl -H "Content-Type: application/json" -X POST \
    http://localhost:9000/v1/models/default:predict \
    -d @debug.json

# Test Model

In [None]:
import pandas as pd
import json
import requests
pd.options.display.max_rows = 6

# set up test dataset
df = pd.read_csv('data/split/test.tsv', sep='\t')
df.columns = [
    'invoice_date', 'business_unit', 'invoice_number', 'line_number',
    'header_description', 'line_description', 'vendor_name', 'acc_code',
    'acc_description', 'amount'
]

# df['invoice_date'] = pd.to_datetime(df['invoice_date'])
# df = df[df.invoice_date.between('01/04/2018', '30/04/2018')]
# df['invoice_date'] = df['invoice_date'].dt.date.astype('str')

test = df[[
    'invoice_date', 'business_unit', 'invoice_number', 'line_number',
    'header_description', 'line_description', 'vendor_name', 'amount'
]]
# workaround for precision problem
test = test.assign(amount = test['amount'].astype('float32'))
# workaround because model expects line number as float
test = test.assign(line_number = test['line_number'].astype('int64'))
test = test.to_json(orient='records')
test = json.loads(test)
test = {"signature_name": "predict", "instances": test}
# send request to model server to get results
result = requests.post('http://localhost:9000/v1/models/default:predict', json=test)
result = json.loads(result.content)['predictions']
# merge results back to df
results = []
for d in result:
    new = {}
    new['business_unit'] = d['business_unit'][0]
    new['pred_acc_code'] = d['classes'][0]
    new['invoice_number'] = d['invoice_number'][0]
    new['line_number'] = int(d['line_number'][0])
    new['invoice_date'] = d['invoice_date'][0]
    new['vendor_name'] = d['vendor_name'][0]
    results.append(new)
results_df = pd.DataFrame(results).merge(df, how='left', on=[
    'invoice_date', 'business_unit', 'vendor_name', 'invoice_number', 'line_number'])
results_df = results_df.assign(wrong = results_df.acc_code != results_df.pred_acc_code)
results_df = results_df.assign(wrong = results_df.wrong.astype(int))
results_df.to_excel('eval/ccy_mnd.xlsx', index=False)
results_df[(results_df.wrong == 1) & (~results_df.acc_code.isnull())] \
    .sample(100) \
    .to_excel('eval/ccy_mnd_wrong_sample.xlsx', index=False)
results_df

In [None]:
sizes = results_df \
    .groupby(['invoice_date', 'business_unit', 'vendor_name', 'invoice_number', 'line_number']) \
    .size().rename('size') \
    .reset_index()
clean_results = results_df.merge(sizes)
clean_results = clean_results[(clean_results['size'] == 1) & (~clean_results['acc_code'].isnull())]
size_per_bu = clean_results.groupby('business_unit').size().rename('bu_freq').reset_index()
clean_results = clean_results \
    .groupby(['business_unit', 'wrong']) \
    .size().rename('freq').reset_index() \
    .merge(size_per_bu)
clean_results.assign(pct = clean_results['freq'] / clean_results['bu_freq'] * 100)

In [None]:
size_per_bu = results_df.groupby('business_unit').size().rename('bu_freq').reset_index()
unclean_results = results_df[results_df.business_unit.isin(['CCY', 'MND'])] \
    .groupby(['business_unit', 'wrong']) \
    .size().rename('freq').reset_index() \
    .merge(size_per_bu)
unclean_results.assign(pct = unclean_results['freq'] / unclean_results['bu_freq'] * 100)