For this introductory exercise, you will walk through the "Hello World" of using TensorFlow Transform to preprocess input data. As you've seen in class, the main steps are to:

1. Collect raw data
2. Define metadata
3. Create a preprocessing function
4. Generate a constant graph with the required transformations

Let's begin!

In [1]:
!pip install tfx

Collecting tfx
  Downloading tfx-1.2.0-py3-none-any.whl (2.4 MB)
[K     |████████████████████████████████| 2.4 MB 5.1 MB/s 
[?25hCollecting tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,<2.6,>=1.15.2
  Downloading tensorflow-2.5.1-cp37-cp37m-manylinux2010_x86_64.whl (454.4 MB)
[K     |████████████████████████████████| 454.4 MB 8.9 kB/s 
Collecting apache-beam[gcp]<3,>=2.31
  Downloading apache_beam-2.32.0-cp37-cp37m-manylinux2010_x86_64.whl (9.8 MB)
[K     |████████████████████████████████| 9.8 MB 39.2 MB/s 
[?25hCollecting google-cloud-bigquery<2.21,>=1.28.0
  Downloading google_cloud_bigquery-2.20.0-py2.py3-none-any.whl (189 kB)
[K     |████████████████████████████████| 189 kB 44.1 MB/s 
[?25hCollecting google-cloud-aiplatform<0.8,>=0.5.0
  Downloading google_cloud_aiplatform-0.7.1-py2.py3-none-any.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 55.6 MB/s 
Collecting tfx-bsl<1.3.0,>=1.2.0
  Downloading tfx_bsl-1.2.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux

# Imports

In [1]:
import tensorflow as tf 
import tensorflow_transform as tft 
import tensorflow_transform.beam as tft_beam 

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils 

import pprint 
import tempfile 

print(f'Tensorflow version: {tf.__version__}')
print(f'TFX Transform version: {tft.__version__}')

Tensorflow version: 2.5.1
TFX Transform version: 1.2.0


# Collect raw data 



In [2]:
raw_data = [
            {'x':1, 'y':1, 's':'hello'},
            {'x':2, 'y':2, 's':'world'},
            {'x':3, 'y':3, 's':'hello'}
]

# Define the metadata

Next, you will define the metadata. This contains the schema that tells the types of each feature column (or key) in `raw_data`. You need to take note of a few things:

* The transform function later expects the metadata to be packed in a [DatasetMetadata](https://github.com/tensorflow/transform/blob/master/tensorflow_transform/tf_metadata/dataset_metadata.py#L23) object. 
* The constructor for the `DatasetMetadata` class expects a [Schema protocol buffer](https://github.com/tensorflow/metadata/blob/master/tensorflow_metadata/proto/v0/schema.proto#L46) data type. You can use the [schema_from_feature_spec()](https://github.com/tensorflow/transform/blob/master/tensorflow_transform/tf_metadata/schema_utils.py#L36) method to generate that from a dictionary.
* To build the said dictionary, you will use the keys/column names of `raw_data` and assign a [FeatureSpecType](https://github.com/tensorflow/transform/blob/master/tensorflow_transform/common_types.py#L29) as values. This allows you to specify if the input is fixed or variable length (using [tf.io](https://www.tensorflow.org/api_docs/python/tf/io) classes), as well as to define the shape and data type.

In [3]:
# define the schema as a DatasetMetadata object
raw_data_metadata = dataset_metadata.DatasetMetadata(
    
    # use convenience function to build a Schema protobuf
    schema_utils.schema_from_feature_spec({
        
        # define a dictionary mapping the keys to its feature spec type
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string)
    })
)

print(raw_data_metadata._schema)

feature {
  name: "s"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}



# Create a preprocessing function

The _preprocessing function_ is the most important concept of `tf.Transform`. A preprocessing function is where the transformation of the dataset really happens. It accepts and returns a dictionary of tensors, where a tensor means a <a target='_blank' href='https://www.tensorflow.org/versions/r1.15/api_docs/python/tf/Tensor'><code>Tensor</code></a> or <a target='_blank' href='https://www.tensorflow.org/versions/r1.15/api_docs/python/tf/SparseTensor'><code>SparseTensor</code></a>. There are two main groups of API calls that typically form the heart of a preprocessing function:

1. **TensorFlow Ops:** Any function that accepts and returns tensors. These add TensorFlow operations to the graph that transforms raw data into transformed data one feature vector at a time.  These will run for every example, during both training and serving.
2. **TensorFlow Transform Analyzers:** Any of the analyzers provided by `tf.Transform`. Analyzers also accept and return tensors, but unlike TensorFlow ops they only run once during training, and typically make a full pass over the entire training dataset. They create <a target='_blank' href='https://www.tensorflow.org/versions/r1.15/api_docs/python/tf/constant'>tensor constants</a>, which are added to your graph. For example, `tft.min` computes the minimum of a tensor over the training dataset.

*Caution: When you apply your preprocessing function to serving inferences, the constants that were created by analyzers during training do not change.  If your data has trend or seasonality components, plan accordingly.*

You can see available functions to transform your data [here](https://www.tensorflow.org/tfx/transform/api_docs/python/tft).

In [4]:
def processing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    #extract the columns and assign to local variables
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']

    # data transformations using tft functions
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = (x_centered * y_normalized)

    # return the transformed data
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        's_integerized': s_integerized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized
    }

# Generate a constant graph with the required transformations

In [7]:
# ignore teh warnings
tf.get_logger().setLevel('ERROR')

# a temporary directory is needed when analyzing the data
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
  # define the pipeline using Apache Beam syntax
  transformed_dataset, transform_fn = (
      
      # analyze and transform the dataset using the preprocessing function
      (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(processing_fn)
  )

# unpack the transformed dataset
transformed_data, transformed_metadata = transformed_dataset

# print the results
print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))






Raw data:
[{'s': 'hello', 'x': 1, 'y': 1},
 {'s': 'world', 'x': 2, 'y': 2},
 {'s': 'hello', 'x': 3, 'y': 3}]

Transformed data:
[{'s_integerized': 0,
  'x_centered': -1.0,
  'x_centered_times_y_normalized': -0.0,
  'y_normalized': 0.0},
 {'s_integerized': 1,
  'x_centered': 0.0,
  'x_centered_times_y_normalized': 0.0,
  'y_normalized': 0.5},
 {'s_integerized': 0,
  'x_centered': 1.0,
  'x_centered_times_y_normalized': 1.0,
  'y_normalized': 1.0}]
