In [1]:
!pip install tfx

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
!git clone https://github.com/Franck-Dernoncourt/pubmed-rct.git
!ls pubmed-rct

fatal: destination path 'pubmed-rct' already exists and is not an empty directory.
PubMed_200k_RCT
PubMed_200k_RCT_numbers_replaced_with_at_sign
PubMed_20k_RCT
PubMed_20k_RCT_numbers_replaced_with_at_sign
README.md


In [3]:
import os
import absl
import tempfile
import tensorflow as tf

from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext


In [4]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.11.0
TFX version: 1.12.0


In [5]:
_tfx_root = tfx.__path__[0]
_skimlit_root = os.path.join(_tfx_root, 'examples/skimlit_pipeline')

# This is the path where your model will be pushed for serving.
_serving_model_dir = os.path.join(
    tempfile.mkdtemp(), 'serving_model/skimlit_simple')

# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)

In [6]:
_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = '/content/pubmed-rct/PubMed_20k_RCT_numbers_replaced_with_at_sign'
_data_filepath = os.path.join(_data_root, "PubMed_20k_RCT_numbers_replaced_with_at_sign")


In [7]:
context = InteractiveContext()



In [8]:
%%writefile preprocessing.py

import os
import tempfile
import pandas as pd

try:
    _data_root = os.mkdir('tfx-data')
except FileExistsError:
   # directory already exists
   pass

_data_root = os.getcwd()+'/tfx-data'
data_dir = '/content/pubmed-rct/PubMed_20k_RCT_numbers_replaced_with_at_sign'

def read_lines(filename):
    """args:
            filename - name/path of the file 
       returns:
            reads line by and make each sentence as separate line and return a list of strings
    """
    with open(filename, 'r') as f:
        return f.readlines()

def preprocess_text_with_line_numbers(filename):
    train_eg = read_lines(filename)
    abstract_samples = []
    abstract_lines = ""
    for line in train_eg:
        if line.startswith('###'):
            abstract_id = line
            abstract_lines = ""
        elif line.isspace():
            abstract_line_split = abstract_lines.splitlines()
            
            for abstract_line_number,abstract_line in enumerate(abstract_line_split):
                line_data = {}
                target_line_and_label = abstract_line.split('\t')
                line_data['target'] = target_line_and_label[0]
                line_data['text'] = target_line_and_label[1] 
                line_data['line_number'] = abstract_line_number
                line_data['total_lines'] = len(abstract_line_split)
                abstract_samples.append(line_data)
        else:
            abstract_lines += line 
    return abstract_samples     

train_samples = preprocess_text_with_line_numbers(data_dir +'/'+ "train.txt")
val_sample = preprocess_text_with_line_numbers(data_dir+'/'+ "dev.txt")
test_sample = preprocess_text_with_line_numbers(data_dir+'/'+ "test.txt")


train_df = pd.DataFrame(train_samples)
val_df = pd.DataFrame(val_sample)
test_df = pd.DataFrame(test_sample)



train_df['target_int'] = pd.Categorical(train_df['target']).codes
val_df['target_int'] = pd.Categorical(val_df['target']).codes
test_df['target_int'] = pd.Categorical(test_df['target']).codes


train_df = train_df.to_csv(os.path.join(_data_root,"train.csv"))
val_df = val_df.to_csv(os.path.join(_data_root,"val.csv"))
test_df = test_df.to_csv(os.path.join(_data_root,"test.csv"))

Overwriting preprocessing.py


In [9]:
!python preprocessing.py

In [None]:
# Examplegen to ingest data into pipeline
example_gen = tfx.components.CsvExampleGen(input_base='/content/tfx-data/')
context.run(example_gen, enable_cache=True)

INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Running executor for CsvExampleGen
INFO:absl:Generating examples.


INFO:absl:Processing input csv data /content/tfx-data/* to TFExample.


In [None]:
import pprint 
pp = pprint.PrettyPrinter()

train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri,'Split-train')
tfrecord_filename = [os.path.join(train_uri,name)
                       for name in os.listdir(train_uri) ]
dataset = tf.data.TFRecordDataset(tfrecord_filename,compression_type='GZIP')


for record in dataset.take(3):
    serialized_example = record.numpy()
    example = tf.train.Example()
    example.ParseFromString(serialized_example)
    pp.pprint(example)

In [None]:
statistic_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistic_gen)

In [None]:
context.show(statistic_gen.outputs['statistics'])

In [None]:
schema_gen = tfx.components.SchemaGen(statistics=statistic_gen.outputs['statistics'],infer_feature_shape=False)
context.run(schema_gen)

In [None]:
context.show(schema_gen.outputs['schema'])

In [None]:
example_validator = tfx.components.ExampleValidator(
    statistics=statistic_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema']
)
context.run(example_validator)

In [None]:
context.show(example_validator.outputs['anomalies'])

In [None]:
%%writefile constant.py
ONE_HOT_FEATURES = ['line_number','total_lines']
CAT_FEATURES = 'text'
TARGET_FEATURE = 'target_int'
VOCAB_SIZE = 60000
OOV_SIZE = 10



In [None]:
%%writefile transform.py
import sys
import absl
import tensorflow as tf
import tensorflow_transform as tft 
import constant
import numpy as np
from sklearn.preprocessing import OneHotEncoder



if 'google.colab' in sys.modules:  # Testing to see if we're doing development
  import importlib
  importlib.reload(constant)


_ONE_HOT_FEATURES = constant.ONE_HOT_FEATURES
_CAT_FEATURES = constant.ONE_HOT_FEATURES
_TARGET_FEATURE = constant.TARGET_FEATURE
_OOV_SIZE = constant.OOV_SIZE
_VOCAB_SIZE = constant.VOCAB_SIZE


def _make_one_hot(x,key):

  one_hot_encoded = tf.one_hot(
      x,
      depth=20,
      on_value=1.0,
      off_value=0.0)
  return tf.reshape(one_hot_encoded,[-1,20])


def _make_one_hot_line_number(x,key):

  one_hot_encoded = tf.one_hot(
      x,
      depth=15,
      on_value=1.0,
      off_value=0.0)
  return tf.reshape(one_hot_encoded,[-1,15])


def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  if not isinstance(x, tf.sparse.SparseTensor):
    return x

  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.sparse.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)


def _make_train_char(x):
    s = tf.strings.regex_replace(x, ' ', '')
    s = tf.strings.regex_replace(s, '', ' ')
    s = tf.strings.strip(s)
    return x ,s


def _make_one_hot_target(x):

  one_hot_encoded = tf.one_hot(
      x,
      depth=5,
      on_value=1.0,
      off_value=0.0)
  return tf.reshape(one_hot_encoded,[-1,5])


def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}

  total_lines = inputs['total_lines']
  onehot_total_lines = _make_one_hot(_fill_in_missing(total_lines), 'total_lines')

  line_number = inputs['line_number']
  line_numbers = _make_one_hot_line_number(_fill_in_missing(line_number), 'line_number')

  text = inputs['text']
  train_sentence,train_chars = _make_train_char(_fill_in_missing(text))

  target = inputs['target_int']
  target = _make_one_hot_target(_fill_in_missing(target))

  return  {
      'line_number_input':line_numbers,
      'total_lines_input':onehot_total_lines,
      'token_inputs':train_sentence,
      'char_inputs':  train_chars,
      'target_int': target
  }  


In [None]:
transform = tfx.components.Transform(
    examples = example_gen.outputs['examples'],
    schema = schema_gen.outputs['schema'],
    module_file = os.path.abspath('transform.py'))
context.run(transform, enable_cache=True)

In [None]:
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)

In [None]:
import pprint 
pp = pprint.PrettyPrinter()

train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

In [None]:
skimlit_trainer_module_file = 'skimlit_trainer.py'

In [None]:
%%writefile {skimlit_trainer_module_file}

from typing import Dict, List, Text

import os
import glob
from absl import logging

import datetime
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_hub as hub
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_transform import TFTransformOutput
import string
from tensorflow.keras import layers

alphabet = string.ascii_lowercase + string.digits + string.punctuation


_LABEL_KEY = 'target_int'
NUM_CHAR_TOKENS = len(alphabet) + 2
output_seq_char_len= 290



def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 64) -> tf.data.Dataset:
    
    dataset_ = data_accessor.tf_dataset_factory(
                                        file_pattern,
                                        tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_LABEL_KEY),
                                        tf_transform_output.transformed_metadata.schema)
    return dataset_   
                                 
    

def _get_tf_examples_serving_signature(model, tf_transform_output):
  """Returns a serving signature that accepts `tensorflow.Example`."""
  # We need to track the layers in the model in order to save it.
  # TODO(b/162357359): Revise once the bug is resolved.


  model.tft_layer_inference = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')])
  def serve_tf_examples_fn(serialized_tf_example):
    """Returns the output to be used in the serving signature."""

    raw_feature_spec = tf_transform_output.raw_feature_spec()
 
    # Remove label feature since these will not be present at serving time.
    raw_feature_spec.pop(_LABEL_KEY)
    raw_feature_spec.pop('target')
    
    raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
    transformed_features = model.tft_layer_inference(raw_features)
    logging.info('serve_transformed_features = %s', transformed_features)

    outputs = model(transformed_features)

    # TODO(b/154085620): Convert the predicted labels from the model using a
    # reverse-lookup (opposite of transform.py).

    return {'outputs': outputs}

  return serve_tf_examples_fn


def _get_transform_features_signature(model, tf_transform_output):
  """Returns a serving signature that applies tf.Transform to features."""
  
  # We need to track the layers in the model in order to save it.
  # TODO(b/162357359): Revise once the bug is resolved.

  model.tft_layer_eval = tf_transform_output.transform_features_layer()

  @tf.function(input_signature=[tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')])
  def transform_features_fn(serialized_tf_example):
    """Returns the transformed_features to be fed as input to evaluator."""

    raw_feature_spec = tf_transform_output.raw_feature_spec()
    raw_features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
  
    transformed_features = model.tft_layer_eval(raw_features)
    logging.info('eval_transformed_features = %s', transformed_features)
  
    return transformed_features

  return transform_features_fn


def export_serving_model(tf_transform_output, model, output_dir):
  """Exports a keras model for serving.
  Args:
    tf_transform_output: Wrapper around output of tf.Transform.
    model: A keras model to export for serving.
    output_dir: A directory where the model will be exported to.
  """

  # The layer has to be saved to the model for keras tracking purpases.
  
  model.tft_layer = tf_transform_output.transform_features_layer()

  signatures = {
      'serving_default':
          _get_tf_examples_serving_signature(model, tf_transform_output),
      'transform_features':
          _get_transform_features_signature(model, tf_transform_output),
  }

  model.save(output_dir, save_format='tf', signatures=signatures)


tf_hub_embedding_layer = hub.KerasLayer("https://tfhub.dev/google/universal-sentence-encoder/4",
                                            trainable=False,
                                            name="universal_sentence_encoder")

char_vectorizer = layers.TextVectorization(max_tokens=NUM_CHAR_TOKENS,  
                                    output_sequence_length=output_seq_char_len,
                                    standardize="lower_and_strip_punctuation",
                                    name="char_vectorizer")


char_embed = layers.Embedding(input_dim=NUM_CHAR_TOKENS,
                              output_dim=25, 
                              mask_zero=False, 
                              name="char_embed")

def model_builder():
    
    token_inputs = layers.Input(shape=[], dtype="string", name="token_inputs")
    token_embeddings = tf_hub_embedding_layer(token_inputs)
    token_outputs = layers.Dense(128, activation="relu")(token_embeddings)
    token_model = tf.keras.Model(inputs=token_inputs,
                                outputs=token_outputs)

    # 2. Char inputs
    char_inputs = layers.Input(shape=(1,), dtype="string", name="char_inputs")
    char_vectors = char_vectorizer(char_inputs)
    char_embeddings = char_embed(char_vectors)
    char_bi_lstm = layers.Bidirectional(layers.LSTM(32))(char_embeddings)
    char_model = tf.keras.Model(inputs=char_inputs,
                                outputs=char_bi_lstm)

    # 3. Line numbers inputs
    line_number_inputs = layers.Input(shape=(15,), dtype=tf.int32, name="line_number_input")
    x = layers.Dense(32, activation="relu")(line_number_inputs)

    line_number_model = tf.keras.Model(inputs=line_number_inputs,
                                    outputs=x)

    # 4. Total lines inputs
    total_lines_inputs = layers.Input(shape=(20,), dtype=tf.int32, name="total_lines_input")
    y = layers.Dense(32, activation="relu")(total_lines_inputs)
    total_line_model = tf.keras.Model(inputs=total_lines_inputs,
                                    outputs=y)

    # 5. Combine token and char embeddings into a hybrid embedding
    combined_embeddings = layers.Concatenate(name="token_char_hybrid_embedding")([token_model.output, 
                                                                                char_model.output])
    z = layers.Dense(256, activation="relu")(combined_embeddings)
    z = layers.Dropout(0.5)(z)

    # 6. Combine positional embeddings with combined token and char embeddings into a tribrid embedding
    z = layers.Concatenate(name="token_char_positional_embedding")([line_number_model.output,
                                                                    total_line_model.output,
                                                                    z])


    # 7. Create output layer
    output_layer = layers.Dense(5, activation="softmax", name="output_layer")(z)

    # 8. Put together model
    model_5 = tf.keras.Model(inputs=[line_number_model.input,
                                    total_line_model.input,
                                    token_model.input, 
                                    char_model.input],
                            outputs=output_layer)


    model_5.compile(loss='categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(),
                metrics=["accuracy"])

    return model_5


    
def run_fn(fn_args: tfx.components.FnArgs) -> None:
    
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir = fn_args.model_run_dir, update_freq='batch'
    )
    
    es = tf.keras.callbacks.EarlyStopping(monitor='val_categorical_accuracy', mode='max', verbose=1, patience=10)
    mc = tf.keras.callbacks.ModelCheckpoint(fn_args.serving_model_dir, monitor='val_categorical_accuracy', mode='max', verbose=1, save_best_only=True)
    

    # Load the transform output
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)
    
    # Create batches of data
    train_set = _input_fn(fn_args.train_files,fn_args.data_accessor, tf_transform_output)
    val_set = _input_fn(fn_args.eval_files,fn_args.data_accessor, tf_transform_output)
    
    
    vectorize_dataset = train_set.map(lambda f, l: f['char_inputs']).unbatch()
    char_vectorizer.adapt(vectorize_dataset.take(1000))

    # Build the model
    model = model_builder()

    
    # Train the model
    model.fit(train_set,
            validation_data = val_set,
            callbacks = [tensorboard_callback, es, mc],
            steps_per_epoch = 1000, 
            validation_steps= 1000,
            epochs=1)
    export_serving_model(tf_transform_output, model, fn_args.serving_model_dir)
    

In [None]:
from tfx.components import Trainer 
from tfx.proto import trainer_pb2 

trainer  = Trainer(
    module_file=skimlit_trainer_module_file,
    examples = transform.outputs['transformed_examples'],
    
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(splits=['train']),
    eval_args=trainer_pb2.EvalArgs(splits=['eval'])
)

context.run(trainer)

In [None]:
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}

In [None]:
from tfx.dsl.components.common.resolver import Resolver 
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import LatestBlessedModelStrategy 
from tfx.types import Channel 
from tfx.types.standard_artifacts import Model, ModelBlessing 

model_resolver = Resolver(
    strategy_class= LatestBlessedModelStrategy,
    model = Channel(type=Model),
    model_blessing = Channel(type=ModelBlessing)
).with_id('Latest_blessed_model_resolver')

context.run(model_resolver)


In [None]:
import tensorflow_model_analysis as tfma 

accuracy_threshold = tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value':0.5}),
                    change_threshold=tfma.GenericChangeThreshold(
                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                        absolute={'value':0.0001}))

eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(signature_name='serving_default',
                                label_key='target_int',
                                preprocessing_function_names=['transform_features']
    )],
    slicing_specs=[tfma.SlicingSpec()],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            
            tfma.MetricConfig(class_name='ExampleCount'),
            # tfma.MetricConfig(class_name='SparseCategoricalAccuracy',
            #                  threshold=accuracy_threshold  ),
               
        ])
    ]

)

from tfx.components import Evaluator

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)

context.run(evaluator,enable_cache=True)

In [None]:
# Visualize the evaluation results
eval_result = evaluator.outputs['evaluation'].get()[0].uri
tfma_result = tfma.load_eval_result(eval_result)
tfma.view.render_slicing_metrics(tfma_result)

In [None]:
tfma.addons.fairness.view.widget_view.render_fairness_indicator(tfma_result)

In [None]:
from tfx.components import Pusher 
from tfx.proto import pusher_pb2 

pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
    filesystem=pusher_pb2.PushDestination.Filesystem(
        base_directory='serving_model_dir'))

)

context.run(pusher)

In [None]:
print(pusher.outputs['pushed_model'].get()[0].uri)

In [None]:
MODEL_DIR = pusher.outputs['pushed_model'].get()[0].uri

In [None]:
!ls -l MODEL_DIR

In [None]:
import sys
# We need sudo prefix if not on a Google Colab.
if 'google.colab' not in sys.modules:
  SUDO_IF_NEEDED = 'sudo'
else:
  SUDO_IF_NEEDED = ''

In [None]:
!ls -l './serving_model_dir'

In [None]:
!echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | {SUDO_IF_NEEDED} tee /etc/apt/sources.list.d/tensorflow-serving.list && \
curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | {SUDO_IF_NEEDED} apt-key add -
!{SUDO_IF_NEEDED} apt update

In [None]:
!{SUDO_IF_NEEDED} apt-get install tensorflow-model-server

In [None]:
os.environ["MODEL_DIR"] = MODEL_DIR

In [None]:
!nohup tensorflow_model_server \
  --rest_api_port=8501 \
  --model_name=fashion_model \
  --model_base_path="${MODEL_DIR}" >server.log 2>&1