In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
import functools
import io
import re
import glob
import tempfile

import numpy as np
import pandas as pd

import tensorflow as tf

import tensorflow_transform as tft
from tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import dataset_metadata, dataset_schema

import apache_beam as beam
from apache_beam.io import tfrecordio

In [3]:
tf.logging.set_verbosity(tf.logging.ERROR)
tf.set_random_seed(1)
np.random.seed(1)

### Load data into TFRecords

In [4]:
def load_data(g, out):
    inputs = glob.glob(g)
    np.random.shuffle(inputs)
    with tf.python_io.TFRecordWriter(out) as writer:
        for i in inputs:
            label = 1 if i.split('/')[2] == 'pos' else 0
            with open(i, 'r') as f:
                review = f.read()
            
            example = tf.train.Example()
            example.features.feature['review'].bytes_list.value.append(review)
            example.features.feature['label'].int64_list.value.append(label)
                                
            writer.write(example.SerializeToString())
    
load_data('aclImdb/train/[posneg]*/*.txt', 'data/train.tfrecord')
load_data('aclImdb/test/[posneg]*/*.txt', 'data/test.tfrecord')

### Use TFT to preprocess data

In [5]:
# schema for raw data
RAW_DATA_FEATURE = {
    'review': tf.FixedLenFeature(shape=[1], dtype=tf.string),
    'label': tf.FixedLenFeature(shape=[1], dtype=tf.int64)
}

RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE))

In [6]:
!rm -Rf tft_output/transform_fn 
!rm -Rf tft_output/transformed_metadata

In [7]:
# train our tft transformer
with beam.Pipeline() as pipeline:
    with beam_impl.Context(temp_dir=tempfile.mkdtemp()):
        coder = tft.coders.ExampleProtoCoder(RAW_DATA_METADATA.schema)

        train_data = (
            pipeline
            | 'ReadTrain' >> tfrecordio.ReadFromTFRecord('data/train.tfrecord')
            | 'DecodeTrain' >> beam.Map(coder.decode))

        test_data = (
            pipeline
            | 'ReadTest' >> tfrecordio.ReadFromTFRecord('data/test.tfrecord')
            | 'DecodeTest' >> beam.Map(coder.decode))

        
        # remove links, tags, quotes, apostraphes, and number commas
        # then lowercase, split by punctuation, and remove low frequency words
        def preprocessing_fn(inputs):
            remove = ["https?:\/\/(www\.)?([^\s]*)", "<([^>]+)>", "\'", "\""]
            remove = '|'.join(remove)
            
            reviews = tf.reshape(inputs['review'], [-1])
            reviews = tf.regex_replace(reviews, remove, '')
            reviews = tf.regex_replace(reviews, r"([0-9]),([0-9])", '\\1\\2')
            
            for letter in list('ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
                reviews = tf.regex_replace(reviews, letter, letter.lower())
                
            terms = tf.string_split(reviews, '.,!?() ')
            terms_indices = tft.compute_and_apply_vocabulary(terms, frequency_threshold=5, num_oov_buckets=1, vocab_filename='vocab')
            
            return {
                'terms': terms_indices,
                'label': inputs['label']
            }

        
        (transformed_train_data, transformed_metadata), transform_fn = (
            (train_data, RAW_DATA_METADATA)
            | 'AnalyzeAndTransform' >> beam_impl.AnalyzeAndTransformDataset(preprocessing_fn))

        transformed_test_data, _ = (
            ((test_data, RAW_DATA_METADATA), transform_fn)
            | 'Transform' >> beam_impl.TransformDataset())
        
        transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)

        _ = (
            transformed_train_data
            | 'EncodeTrain' >> beam.Map(transformed_data_coder.encode)
            | 'WriteTrain' >> tfrecordio.WriteToTFRecord('data/train_transformed.tfrecord'))

        _ = (
            transformed_test_data
            | 'EncodeTest' >> beam.Map(transformed_data_coder.encode)
            | 'WriteTest' >> tfrecordio.WriteToTFRecord('data/test_transformed.tfrecord'))
        
        _ = (
            transform_fn
            | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn('tft_output'))



### Build model with classifier

In [8]:
# input function
feature_spec = transformed_metadata.schema.as_feature_spec()

def input_fn(input_file_pattern, num_epochs=None, batch_size=25, shuffle=True):  
    input_file_names = glob.glob(input_file_pattern)
    
    ds = tf.data.TFRecordDataset(input_file_names)
    ds = ds.map(lambda x: tf.parse_single_example(x, feature_spec))

    if shuffle:
        ds = ds.shuffle(100000)

    ds = ds.batch(batch_size).repeat(num_epochs)
    
    features = ds.make_one_shot_iterator().get_next()
    labels = features.pop('label')
    return features, labels

In [9]:
# get vocabulary size
tf_transform_output = tft.TFTransformOutput('tft_output')
vocab_size = tf_transform_output.vocabulary_size_by_name('vocab')

# set up feature columns
terms_col = tf.feature_column.categorical_column_with_identity(key='terms', num_buckets=vocab_size+1)
terms_embed_col = tf.feature_column.embedding_column(terms_col, dimension=50, combiner='sqrtn')
feature_columns = [terms_embed_col]


# create estimator spec
def make_model(features, labels, mode):

    # build graph
    net = tf.feature_column.input_layer(features, feature_columns)
    net = tf.layers.dense(net, units=10, activation=tf.nn.leaky_relu)
    net = tf.layers.dropout(net, rate=0.3, training=(mode == tf.estimator.ModeKeys.TRAIN))
    net = tf.layers.dense(net, units=10)
    logits = tf.layers.dense(net, 2)
    
    # compute predictions
    predicted_classes = tf.argmax(logits, 1)
    predicted_probs = tf.nn.softmax(logits)
    
    # generate predictions
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {
            'class': predicted_classes,
            'prob': predicted_probs
        }
        
        export_outputs = {
          'predict': tf.estimator.export.PredictOutput(outputs=predictions)
        }
        
        return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)

    # compute loss
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

    # create training op with cosine annealing for learning rate
    if mode == tf.estimator.ModeKeys.TRAIN:
        global_step = tf.train.get_global_step()
        
        learning_rate = tf.train.cosine_decay_restarts(learning_rate=0.1, global_step=global_step, alpha=0.05, 
                                                       first_decay_steps=1000, t_mul=2.0, m_mul=0.8)
        
        optimizer = tf.train.AdagradOptimizer(learning_rate=learning_rate)
        optimizer = tf.contrib.estimator.clip_gradients_by_norm(optimizer, 5.0)
        
        train_op = optimizer.minimize(loss, global_step=global_step)
        
        return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

    # compute evaluation metrics
    eval_metric_ops = {
        'accuracy': tf.metrics.accuracy(labels=labels, predictions=predicted_classes),
        'auc': tf.metrics.auc(labels=labels, predictions=predicted_probs[:, 1])
    }
    return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=eval_metric_ops)

In [10]:
# train
classifier = tf.estimator.Estimator(model_fn=make_model)
classifier.train(input_fn=lambda: input_fn('data/train_transformed.tfrecord*', num_epochs=5))

<tensorflow.python.estimator.estimator.Estimator at 0x7f6f483327d0>

### Evaluate classifier

In [11]:
train_stats = classifier.evaluate(input_fn=lambda: input_fn('data/train_transformed.tfrecord*', num_epochs=1))
test_stats = classifier.evaluate(input_fn=lambda: input_fn('data/test_transformed.tfrecord*', num_epochs=1))

train_stats = pd.DataFrame.from_dict(train_stats, orient='index', columns=['train'])
test_stats = pd.DataFrame.from_dict(test_stats, orient='index', columns=['test'])
stats = train_stats.join(test_stats)
stats

Unnamed: 0,train,test
loss,0.188809,0.301417
auc,0.979024,0.947173
global_step,5000.0,5000.0
accuracy,0.92316,0.87656


### Export classifier

In [12]:
def serving_input_fn():
    review = tf.placeholder(dtype=tf.string)
    label = tf.zeros(dtype=tf.int64, shape=[1, 1]) # just a placeholder
    
    transformed_features = tf_transform_output.transform_raw_features({'review': review, 'label': label})
    
    return tf.estimator.export.ServingInputReceiver(transformed_features, {'review': review})


export_path = classifier.export_savedmodel(export_dir_base='exports',
                                           serving_input_receiver_fn=serving_input_fn)

export_path = export_path.decode('utf-8')

In [13]:
!!saved_model_cli run --input_exprs 'review=["this is a terrible movie", "this is a great movie"]'  \
--dir $export_path --tag_set serve --signature_def predict

['2018-09-15 10:25:57.726209: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA',
 'Result for output key class:',
 '[0 1]',
 'Result for output key prob:',
 '[[0.9989225  0.00107747]',
 ' [0.01596025 0.9840397 ]]']