<a id='top'></a><a name='top'></a>
# Get Started with TensorFlow Transform

[Source](https://www.tensorflow.org/tfx/transform/get_started)


<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/gbih/ml-notes/blob/main/tf_transform/tft-03-getting-started.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
  </td>
</table>

1. [Setup](#setup)
2. [Introduction](#2.0)
3. [Define a preprocessing function](#3.0)
    * [3.1 Preprocessing function example](#3.1)
    * [3.2 Batching](#3.2)
4. [Apache Beam Implementation](#4.0)
    * [4.1 "instance dict" TFT Beam tf.Transform implementation](#4.1)
5. [Data Formats and Schema](#5.0)
    * [5.1 The "instance dict" format](#5.1)
    * [5.2 The TFXIO format](#5.2)
        - [5.2.1 RecordBatch with pyarrow.record_batch()](#5.2.1)
        - [5.2.2 RecordBatch with from_pandas()](#5.2.2)
        - [5.2.3 RecordBatch with from_arrays()](#5.2.3)
        - [5.2.4 RecordBatch with from_struct_array (struct of lists)](#5.2.4)
        - [5.2.5 RecordBatch with from_struct_array (struct of scalars)](#5.2.5)
        - [5.2.6 tfxio.TensorRepresentative type alias](#5.2.6)
6. [Compatibility with TensorFlow](#6.0)
7. [Input and output with Apache Beam](#7.0)
    * [7.1 Pre-canned PCollection Sources (TFXIO)](#7.1)
8. [Example: "Census Income dataset"](#8.0)

---
<a id='2.0'></a><a name='2.0'></a>
# 2. Imports / Setup
<a href="#top">[back to top]</a>

In [1]:
import sys
import pathlib
import pprint
pp = pprint.PrettyPrinter(indent=2)
import tempfile

import tensorflow as tf
import tensorflow_transform as tft

# Module level imports for tensorflow_transform.beam.
# https://github.com/tensorflow/transform/blob/master/tensorflow_transform/beam/__init__.py
# https://www.tensorflow.org/tfx/transform/api_docs/python/tft_beam
import tensorflow_transform.beam as tft_beam

# In-memory representation of all metadata associated with a dataset.
# https://www.tensorflow.org/tfx/transform/api_docs/python/tft/DatasetMetadata
# https://github.com/tensorflow/transform/blob/master/tensorflow_transform/tf_metadata/dataset_metadata.py
from tensorflow_transform.tf_metadata import dataset_metadata

# Utilities for using the tf.Metadata Schema within TensorFlow
# https://github.com/tensorflow/transform/blob/master/tensorflow_transform/tf_metadata/schema_utils.py
from tensorflow_transform.tf_metadata import schema_utils

# Module level imports for tfx_bsl.public.tfxio.
# TFXIO defines a common in-memory data representation shared by all TFX libraries
# and components, as well as an I/O abstraction layer to produce such representations.
# https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio
# https://github.com/tensorflow/community/blob/master/rfcs/20191017-tfx-standardized-inputs.md
from tfx_bsl.public import tfxio


import os
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

np.set_printoptions(precision=3, suppress=True)

# global seed
tf.random.set_seed(42)


tf.get_logger().propagate = False
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel('ERROR') # DEBUG, INFO, WARN, ERROR, or FATAL


def HR():
    print("-"*40)
    
def dir_ex(obj):
    result = [x for x in dir(obj) if not x.startswith('_')]
    print(type(obj))
    print()
    for x in result:
        print(f'{x:<40}', end="")

print("Loaded libraries.")

Loaded libraries.


---
<a id='2.0'></a><a name='2.0'></a>
# 2. Introduction
<a href="#top">[back to top]</a>

This guide introduces the basic concepts of `tf.Transform` and how to use them. It will:

* Define a *preprocessing function*, a logical description of the pipeline that transforms the raw data into the data used to train a machine learning model.

* Show the Apache Beam implementation used to transform data by converting the *preprocessing function* into a *Beam pipeline*.

* Show additional usage example.

## Transform library for TFX and non-TFX users 

The `tft` module documentation is the only module that is relevant to TFX users. The `tft_beam` module is relevant only when using Transform as a standalone library. Typically, a TFX user constructs a `preprocessing_fn`, and the rest of the Transform library calls are made by the [TFX Transform component](https://www.tensorflow.org/tfx/guide/transform).

## Notes on tfxio

https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio

<sub>
    
**Classes**

* BeamRecordCsvTFXIO:  TFXIO implementation for CSV records in pcoll[bytes].
* CsvTFXIO:  TFXIO implementation for CSV.
* RecordBatchToExamplesEncoder:  Encodes pa.RecordBatch as a list of serialized tf.Examples.
* RecordBatchesOptions:  Options for TFXIO's RecordBatches.
* TFExampleBeamRecord:  TFXIO implementation for serialized tf.Examples in pcoll[bytes].
* TFExampleRecord:  TFXIO implementation for tf.Example on TFRecord.
* TFGraphRecordDecoder:  Base class for decoders that turns a list of bytes to (composite) tensors.
* TFSequenceExampleBeamRecord:  TFXIO implementation for serialized tf.SequenceExamples in pcoll[bytes].
* TFSequenceExampleRecord:  TFXIO implementation for tf.SequenceExample on TFRecord.
* TFXIO:  Abstract basic class of all TFXIO API implementations.
* TensorAdapter:  A TensorAdapter converts a RecordBatch to a collection of TF Tensors.
* TensorAdapterConfig:  Config to a TensorAdapter.
* TensorFlowDatasetOptions:  Options for TFXIO's TensorFlowDataset.
    
</sub>

---
<a id='3.0'></a><a name='3.0'></a>
# 3. Define a preprocessing function
<a href="#top">[back to top]</a>

The *preprocessing function* is the most important concept of `tf.Transform`. The preprocessing function is a logical description of a transformation of the dataset. The preprocessing function accepts and returns a dictionary of tensors, where a *tensor* means `Tensor` or `SparseTensor`. There are two kinds of functions used to define the preprocessing function:

1. Any function that accepts and returns tensors. These add TensorFlow operations to the graph that transform raw data into transformed data.

2. Any of the *analyzers* provided by `tf.Transform`. Analyzers also accept and return tensors, but unlike TensorFlow functions, they do not add operations to the graph. Instead, analyzers cause `tf.Transform` to compute a full-pass operation outside of TensorFlow. They use the input tensor values over the entire dataset to generate a constant tensor that is returned as the output. For example, `tft.min` computes the minimum of a tensor over the dataset. `tf.Transform` provides a fixed set of analyzers, but this will be extended in future versions.

<a id='3.1'></a><a name='3.1'></a>
## 3.1 Preprocessing function example
<a href="#top">[back to top]</a>

By combining analyzers and regular TensorFlow functions, users can create flexible pipelines for transforming data. The following preprocessing function transforms each of the three features in different ways, and combines two of the features.

[skipped: specific explanation about this function]

The preprocessing function defines a pipeline of operations on a dataset. In order to apply the pipeline, we rely on a concrete implementation of the `tf.Transform` API. The Apache Beam implementation provides `PTransform` which applies a user's preprocessing function to data. The typical workflow of a `tf.Transform` user will construct a preprocessing function, then incorporate this into a larger Beam pipeline, creating the data for training.

In [2]:
def preprocessing_fn(inputs):
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']
    
    # Cannot use this, get this error:
    # ValueError: tf.function only supports singleton tf.Variables created on 
    # the first call. Make sure the tf.Variable is only created once or created
    # outside tf.function.
    # See https://www.tensorflow.org/guide/function#creating_tfvariables for more information.
    # @tf.function

    # input_data = x
    # layer = tf.keras.layers.Normalization(axis=None)
    # test = layer(input_data)

    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = x_centered * y_normalized
    
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
        's_integerized': s_integerized
    }

<a id='3.2'></a><a name='3.2'></a>
## 3.2 Batching
<a href="#top">[back to top]</a>

Batching is an important part of TensorFlow. Since one of the goals of `tf.Transform` is to provide a TensorFlow graph for preprocessing that can be incorporated into the serving graph (and, optionally, the training graph), batching is also an important concept in `tf.Transform`.

While not obvious in the example above, the user defined preprocessing function is passed tensors representing *batches* and not individual instances, as happens during training and serving with TensorFlow. 

One the other hand, analyzers perform a computation over the entire dataset that returns a single value and not a batch of values. 

`x` is a `Tensor` with a shape of `(batch_size,)` while `tft.mean(x)` is a `Tensor` with a shape of `()`. 

The subtraction `x - tft.mean(x)` broadcasts where the value of `tft.mean(x)` is subtracted from every element of the batch represented by `x`.

---
<a id='4.0'></a><a name='4.0'></a>
# 4. Apache Beam Implementation
<a href="#top">[back to top]</a>

While the *preprocessing function* is intended as a logical description of a *preprocessing pipeline* implemented on multiple data processing frameworks, `tf.Transform` provides a canonical implementation used on Apache Beam. This implementation demonstrates the functionality required from an implementation. There is no formal API for this functionality, so each implementation can use an API that is idiomatic for its particular data processing framework.

Note: **canonical** in this context means "standard approved format", "normalized", "standardized".

The Apache Beam implementation provides two different `PTransform` functions to process data for a preprocessing function: 

1. The **"instance dict"** format, an intuitive format and is suitable for small datasets 

2. **TFXIO** (Apache Arrow) format, which provides improved performance and is suitble for large datasets.


<a id='4.1'></a><a name='4.1'></a>
## 4.1 "instance dict" TFT Beam tf.Transform implementation
<a href="#top">[back to top]</a>  

This is an intuitive format and is suitable for small datasets.

The following shows the usage for the composite function `tft_beam.AnalyzeAndTransformDataset`.

Caution: The "instance dict" format used with `DatasetMetadata` 
is much less efficient than TFXIO. For any serious workloads you should 
use TFXIO with a `tfxio.TensorAdapterConfig` instance as the metadata.

In [3]:
raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 2, 's': 'hello'},
]

# tft.tf_metadata.dataset_metadata.DatasetMetadata
# Metadata about a dataset used for the "instance dict" format.
raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    })
)

with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (
        (raw_data, raw_data_metadata) |
        tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
    )

2022-07-28 00:19:25.997094: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-07-28 00:19:29.518142: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.


In [4]:
transformed_data, transformed_metadata = transformed_dataset

The `transformed_data` content is shown below and contains the transformed columns in the same format as the raw data:

1. In particular, the values of `s_integerized` are `[0, 1, 0]`. These values depend on how the words `hello` and `world` were mapped to integers, which is deterministic. 

2. For the column `x_centered`, we subtracted the mean, so the values of the column `x`, which were `[1.0, 2.0, 3.0]` became `[-1.0, 0.0, 1.0]`. Similarly, the rest of the columns  match their expected values.

In [5]:
transformed_data

[{'s_integerized': 0,
  'x_centered': -1.0,
  'x_centered_times_y_normalized': -0.0,
  'y_normalized': 0.0},
 {'s_integerized': 1,
  'x_centered': 0.0,
  'x_centered_times_y_normalized': 0.0,
  'y_normalized': 1.0},
 {'s_integerized': 0,
  'x_centered': 1.0,
  'x_centered_times_y_normalized': 1.0,
  'y_normalized': 1.0}]

In [6]:
# Original dataset
raw_data

[{'x': 1, 'y': 1, 's': 'hello'},
 {'x': 2, 'y': 2, 's': 'world'},
 {'x': 3, 'y': 2, 's': 'hello'}]

Both `raw_data` and `transformed_data` are datasets. 

**The next two sections show how the Beam implementation represents datasets, and how to read and write data to disk.** The other return value, `transform_fn`, represents the transformation applied to the data, convered in detail below.

The `tft_beam.AnalyzeAndTransformDataset` class is the composition of the two fundamental transforms provided by the implementation `tft_beam.AnalyzeDataset` and `tft_beam.TransformDataset`. So the following two code snippets are equivalent:

In [7]:
my_data = (raw_data, raw_data_metadata)

In [8]:
# Version 1
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_data, transform_fn = (
        my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
    )



In [9]:
# Version 2
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transform_fn_2 = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
    transformed_data_2 = (my_data, transform_fn_2) | tft_beam.TransformDataset()



In [10]:
assert (transformed_data == transformed_data_2)

`transform_fn` is a pure function that represents an operation that is applied to each row of the dataset. In particular, the analyzer values are already computed and treated as constants. In the example, the `transform_fn` contains these constants:
* mean of column `x`
* min and max of column `y`
* vocabulary used to map the strings to integers

An important feature of `tf.Transform` is that `transform_fn` represents a map over *rows*. It is a pure function applied to each row separately. All of the computation for aggregating rows is done in `AnalyzeDataset`. Furthermore, the `transform_fn` is represented as a TensorFlow `Graph` which can be embedded into the serving graph.

`AnalyzeAndTransformDataset` is provided for optimizations in this special case. This is the same pattern used in scikit-learn, providing the `fit`, `transform`, and `fit_transform` methods.

In [11]:
transform_fn[1]

BeamDatasetMetadata(dataset_metadata={'_schema': feature {
  name: "s_integerized"
  type: INT
  int_domain {
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered_times_y_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
}, deferred_metadata=[{'_schema': feature {
  name: "s_integerized"
  type: INT
  int_domain {
    min: -1
    max: 1
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x_centered_times_y_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y_normalized"
  type: FLOAT
  presence {
    

---
<a id='5.0'></a><a name='5.0'></a>
# 5. Data Formats and Schema
<a href="#top">[back to top]</a>

TFT Beam implementation accepts two different input data formats:
1. The **"instance dict"** format (as seen in the example above) is an intuitive format and suitable for small datasets. 
2. The **TFXIO** (Apache Arrow) format provides improved performance and is suitable for large datasets.

The "metadata" accompanying the `PCollection` tells the Beam implementation the format of the `PCollection`.

1. If `raw_data_metadata` is a `dataset_metadata.DatasetMetadata`, then `raw_data` is expected to be in the "instance dict" format.

2. If `raw_data_metadata` is a `tfxio.TensorAdapterConfig`, then `raw_data` is expected to be in the TFXIO format.

In [12]:
# Check type of raw_data_metadata
try:
    assert(type(raw_data_metadata) == tft.tf_metadata.dataset_metadata.DatasetMetadata)
except:
    pass
else:
    print("raw_data_metadata is a dataset_metadata.DatasetMetadata")
    HR()


# We can confirm this is of type dataset_metadata.DatasetMetadata,
# hence raw_data needs to be in the "instance_dict" format.
print("raw_data:\n")
pp.pprint(raw_data)
HR()

print("A raw_data row:\n")
print(raw_data[0])
HR()

print('Check that raw_data rows is in the "instance dict" format:\n')
print(isinstance(raw_data[0], (dict)))

raw_data_metadata is a dataset_metadata.DatasetMetadata
----------------------------------------
raw_data:

[ {'s': 'hello', 'x': 1, 'y': 1},
  {'s': 'world', 'x': 2, 'y': 2},
  {'s': 'hello', 'x': 3, 'y': 2}]
----------------------------------------
A raw_data row:

{'x': 1, 'y': 1, 's': 'hello'}
----------------------------------------
Check that raw_data rows is in the "instance dict" format:

True


<a id='5.1'></a><a name='5.1'></a>
## 5.1 The "instance dict" format
<a href="#top">[back to top]</a>

The previous code examples used this format. 

The metadata contains the schema that defines the layout of the data, and how it is read from and written to various formats. 

However, this in-memory format is not self-describing and requires the schema in order to be interpreted as tensors.

Again, here is the definition of the schema for the example data.

In [13]:
raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    })
)

raw_data_metadata.schema

feature {
  name: "s"
  type: BYTES
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "x"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "y"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}

The `Schema` proto contains the information needed to parse the data from its on-disk or in-memory format, into tensors.

It is typically constructed by calling `schema_utils.schema_from_feature_spec` with a dict mapping feature keys to these parsing configurations:
1. `tf.io.FixedLenFeature`: Fixed-length input feature
2. `tf.io.VarLenFeature`: Variable-length input feature
3. `tf.io.SparseFeature`: Sparse input feature from an Example

`tf.parse.example` parses Example protos into a dict of tensors. [See the documentation for more details](https://www.tensorflow.org/api_docs/python/tf/io/parse_example).

Above, we use `tf.io.FixedLenFeature` to indicate that each feature contains a fixed number of values, in this case a single scalar value. Because `tf.Transform` batches instances, the actual `Tensor` representing the feature will have shape `(None,)` where the unknown dimension is the batch dimension.

<a id='5.2'></a><a name='5.2'></a>
## 5.2 The TFXIO format
<a href="#top">[back to top]</a>

With this format, the data is expected to be contained in a **[pyarrow.RecordBatch](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html)**.

For tabular data, our Apache Beam implementation accepts Arrow `RecordBatch`es that consist of columns of the following types:

1.  `pa.list_(<primitive>)` where `<primitive>` is: 
    * `pa.int64()`
    * `pa.float32()`
    * `pa.binary()`
    * `pa.large_binary()`
2. `pa.large_list(<primitive>)`


The toy input dataset we used looks like these listings (5.2.1 - 5.2.5) when represented as a `RecordBatch`.

---

Notes:

Similar to the `dataset_metadata.DatasetMetadata` instance that accompanies the "instance dict" format, a `tfxio.TensorAdapterConfig` must accompany the `RecordBatch`. It consists of the Arrow schema of the `RecordBatch`, and `tfxio.TensorRepresentations` to uniquely determine how columns in `RecordBatch`es can be interpreted as TensorFlow Tensors (including but not limited to `tf.Tensor`, `tf.SparseTensor`).


<a id='5.2.1'></a><a name='5.2.1'></a>
### 5.2.1 RecordBatch with pyarrow.record_batch()
<a href="#top">[back to top]</a>

[Create a pyarrow.RecordBatch from another Python data structure or sequence of arrays](https://arrow.apache.org/docs/python/generated/pyarrow.record_batch.html#pyarrow.record_batch)

    pyarrow.record_batch(
        data, 
        names=None, 
        schema=None, 
        metadata=None
    )

In [14]:
import pyarrow as pa

# Note: If we use TFX, we are limited to pyarrow v5
print(f"pyarrow version: {pa.__version__}")
HR()

# Constructing a RecordBatch from arrays:
raw_data_record_batch = [
    
    # Create a pyarrow.RecordBatch from another Python data structure or sequence of arrays.
    # https://arrow.apache.org/docs/python/generated/pyarrow.record_batch.html#pyarrow.record_batch
    pa.record_batch(
        # A DataFrame or list of arrays or chunked arrays.
        data = [
            # pa.array: Create pyarrow.Array instance from a Python object.
            # pa.list_: Create ListType instance from child data type or field.
            # pa.float32: Create single-precision floating point type.
            pa.array([[1], [2], [3]], pa.list_(pa.float32())),
            pa.array([[1], [2], [3]], pa.list_(pa.float32())),
            pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary()))
        ],
        names=['x', 'y', 's']
    )
]

print(raw_data_record_batch[0])
HR()
raw_data_record_batch[0].to_pandas()

pyarrow version: 5.0.0
----------------------------------------
pyarrow.RecordBatch
x: list<item: float>
  child 0, item: float
y: list<item: float>
  child 0, item: float
s: list<item: binary>
  child 0, item: binary
----------------------------------------


Unnamed: 0,x,y,s
0,[1.0],[1.0],[b'hello']
1,[2.0],[2.0],[b'world']
2,[3.0],[3.0],[b'hello']


<a id='5.2.2'></a><a name='5.2.2'></a>
### 5.2.2 RecordBatch with from_pandas()
<a href="#top">[back to top]</a>

[Convert pandas.DataFrame to an Arrow RecordBatch](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.from_pandas)

    from_pandas(
        type cls, 
        df, 
        Schema schema=None, 
        preserve_index=None, 
        nthreads=None, 
        columns=None
    )

In [15]:
import pandas as pd

raw_data = [
    {'x': [1], 'y': [1], 's': ['hello']},
    {'x': [2], 'y': [2], 's': ['world']},
    {'x': [3], 'y': [2], 's': ['hello']},
]

df = pd.DataFrame(raw_data)

my_schema = pa.schema([
    pa.field('x', pa.list_(pa.float32())),
    pa.field('y', pa.list_(pa.float32())),
    pa.field('s', pa.list_(pa.binary())),
])

test2 = pa.RecordBatch.from_pandas(df, schema=my_schema)
print(test2)
HR()

test2.to_pandas()

pyarrow.RecordBatch
x: list<item: float>
  child 0, item: float
y: list<item: float>
  child 0, item: float
s: list<item: binary>
  child 0, item: binary
----------------------------------------


Unnamed: 0,x,y,s
0,[1.0],[1.0],[b'hello']
1,[2.0],[2.0],[b'world']
2,[3.0],[2.0],[b'hello']


<a id='5.2.3'></a><a name='5.2.3'></a>
### 5.2.3 RecordBatch with from_arrays()
<a href="#top">[back to top]</a>

[Construct a RecordBatch from multiple pyarrow.Arrays](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.from_arrays)

    static from_arrays(
        list arrays, 
        names=None, 
        schema=None, 
        metadata=None
    )

In [16]:
x = pa.array([[1], [2], [3]])
y = pa.array([[1], [2], [2]])
s = pa.array([['hello'], ['world'], ['hello']])

my_schema = pa.schema([
    pa.field('x', pa.list_(pa.float32())),
    pa.field('y', pa.list_(pa.float32())),
    pa.field('s', pa.list_(pa.binary())),
])

test4 = pa.RecordBatch.from_arrays([x, y, s], schema=my_schema)

print(test4)
HR()

test4.to_pandas()

pyarrow.RecordBatch
x: list<item: float>
  child 0, item: float
y: list<item: float>
  child 0, item: float
s: list<item: binary>
  child 0, item: binary
----------------------------------------


Unnamed: 0,x,y,s
0,[1.0],[1.0],[b'hello']
1,[2.0],[2.0],[b'world']
2,[3.0],[2.0],[b'hello']


<a id='5.2.4'></a><a name='5.2.4'></a>
### 5.2.4 RecordBatch with from_struct_array (struct of lists)
<a href="#top">[back to top]</a>

[Construct a RecordBatch from a StructArray](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.from_struct_array)

    static from_arrays(
        list arrays, 
        names=None, 
        schema=None, 
        metadata=None
    )

In [17]:
fields = [
    ('x', pa.list_(pa.float32())),
    ('y', pa.list_(pa.float32())),
    ('s', pa.list_(pa.binary())),
]

struct = pa.array(
    [
        {'x': [1], 'y': [1], 's': ['hello']},
        {'x': [2], 'y': [2], 's': ['world']},
        {'x': [3], 'y': [2], 's': ['hello']},
    ],
    type=pa.struct(fields)
)

test5 = pa.RecordBatch.from_struct_array(struct)
print(test5)
HR()

test5.to_pandas()

pyarrow.RecordBatch
x: list<item: float>
  child 0, item: float
y: list<item: float>
  child 0, item: float
s: list<item: binary>
  child 0, item: binary
----------------------------------------


Unnamed: 0,x,y,s
0,[1.0],[1.0],[b'hello']
1,[2.0],[2.0],[b'world']
2,[3.0],[2.0],[b'hello']


<a id='5.2.5'></a><a name='5.2.5'></a>
### 5.2.5 RecordBatch with from_struct_array (struct of scalars)
<a href="#top">[back to top]</a>

[Construct a RecordBatch from a StructArray](https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.from_struct_array)

    static from_arrays(
        list arrays, 
        names=None, 
        schema=None, 
        metadata=None
    )

In [18]:
fields = [
    ('x', pa.float32()),
    ('y', pa.float32()),
    ('s', pa.binary()),
]

struct = pa.array(
    [
        {'x': 1, 'y': 1, 's': 'hello'},
        {'x': 2, 'y': 2, 's': 'world'},
        {'x': 3, 'y': 2, 's': 'hello'},
    ],
    type=pa.struct(fields)
)

print(type(struct))
HR()

test5 = pa.RecordBatch.from_struct_array(struct).to_pandas()
test5

<class 'pyarrow.lib.StructArray'>
----------------------------------------


Unnamed: 0,x,y,s
0,1.0,1.0,b'hello'
1,2.0,2.0,b'world'
2,3.0,2.0,b'hello'


<a id='5.2.6'></a><a name='5.2.6'></a>
### 5.2.6 tfxio.TensorRepresentative type alias
<a href="#top">[back to top]</a>

---

`tfxio.TensorRepresentative` is a type alias for a `Dict[str, tensorflow_metadata.proto.v0.schema_pb2.TensorRepresentation]`, which establishes the relationship between a Tensor that a `preprocessing_fn` accepts and columns in the `RecordBatch`. See the code listing below.


From this example, we can see this means `inputs['x']` in `preprocessing_fn` should be a dense `tf.Tensor`, whose values come from a column of name `col1` in the input `RecordBatch`es, and its (batched) shape should be `[batch_size, 2]`.

A `schema_pb2.TensorRepresentation` is a Protobuf defined in TensorFlow Metadata.

In [19]:
# TensorRepresentative example
from google.protobuf import text_format
from tensorflow_metadata.proto.v0 import schema_pb2

tensor_representation: schema_pb2.TensorRepresentation = {
    # Parses a text representation of a protocol message into a message.
    'x': text_format.Parse(
        # This message is a necessary argument
        """dense_tensor {column_name: "col1" shape {dim {size: 2}}}""",
        # GeneratedProtocolMessageType
        schema_pb2.TensorRepresentation()
    )
}

tensor_representation

{'x': dense_tensor {
   column_name: "col1"
   shape {
     dim {
       size: 2
     }
   }
 }}

---
<a id='6.0'></a><a name='6.0'></a>
# 6. Compatibility with TensorFlow
<a href="#top">[back to top]</a>

`tf.Transform` provides support for exporting the `transform_fn` as a SavedModel. The default behavior before the 0.30 release exported a TF 1.x SavedModel. Starting with the 0.30 release, the default behavior is to export a TF 2.x SavedModel, unless TF 2.x behaviors are explicitly disabled.

When exporting the `transform_fn` as a TF 2.x SavedModel, the `preprocessing_fn` is expected to be traceable using `tf.function`. Additionally, if running your pipeline remotely (for example with `DataflowRunner`), ensure that the `preprocessing_fn` and any dependencies are packaged properly.

---
<a id='7.0'></a><a name='7.0'></a>
# 7. Input and output with Apache Beam
<a href="#top">[back to top]</a>

Up to now, we have seen input and output data in the form of python lists (of `RecordBatch` or *"instance dictionaries"*). **This is a simplification that relies on Apache Beam's ability to work with lists as well as its main representation of data, the `PCollection`.**

A `PCollection` is a data representation that forms a part of a Beam pipeline:

* A Beam pipeline is formed by applying various `PTransform`s, including `AnalyzeDataset` and `TransformDataset`, and running the pipeline. 

* A `PCollection` is not created in the memory of the main binary, but instead is distributed among the workers (although this section uses the in-memory execution mode).

<a id='7.1'></a><a name='7.1'></a>
## 7.1 Pre-canned PCollection Sources (TFXIO)
<a href="#top">[back to top]</a>

**The `RecordBatch` format that our implementation accepts is a common format that other TFX libraries also accept.** 

Therefore, TFX offers convenient "sources" (eg `TFXIO` classes) that reads files of various formats on disk and produce `RecordBatch`es and can also return `tfxio.TensorAdapterConfig`, including inferred `tfxio.TensorRepresentations`.

These `TFXIO`s can be found in the package `tfx_bsl` ([`tfx_bsl.public.tfxio`](https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio))


**TFXIO Classes:**

* BeamRecordCsvTFXIO: TFXIO implementation for CSV records in pcoll[bytes].

* CsvTFXIO: TFXIO implementation for CSV.

* RecordBatchToExamplesEncoder: Encodes pa.RecordBatch as a list of serialized tf.Examples.

* RecordBatchesOptions: Options for TFXIO's RecordBatches.

* TFExampleBeamRecord: TFXIO implementation for serialized tf.Examples in pcoll[bytes].

* TFExampleRecord: TFXIO implementation for tf.Example on TFRecord.

* TFGraphRecordDecoder: Base * for decoders that turns a list of bytes to (composite) tensors.

* TFSequenceExampleBeamRecord: TFXIO implementation for serialized tf.SequenceExamples in pcoll[bytes].

* TFSequenceExampleRecord: TFXIO implementation for tf.SequenceExample on TFRecord.

* TFXIO: Abstract basic * of all TFXIO API implementations.

* TensorAdapter: A TensorAdapter converts a RecordBatch to a collection of TF Tensors.

* TensorAdapterConfig: Config to a TensorAdapter.

* TensorFlowDatasetOptions: Options for TFXIO's TensorFlowDataset.


---
<a id='8.0'></a><a name='8.0'></a>
# 8. Example: "Census Income dataset"
<a href="#top">[back to top]</a>

The following example requires both reading and writing data on disk, as well as representing data as a `PCollection`.

The "Census Income" dataset contains both categorical and numeric data.

In [20]:
from pathlib import Path

data_dir = Path('./chp13_tfx_04').absolute()

train_data_file = tf.keras.utils.get_file(
    'adult.data',
    'https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data',
    cache_subdir=data_dir
)
train_data_file 

Downloading data from https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/census/adult.data


'/Users/gb/Desktop/izumi-handson/1-misc-study/chp13_tfx_04/adult.data'

In [21]:
ORDERED_CSV_COLUMNS = [
    'age', 'workclass', 'fnlwgt', 'education', 'education-num',
    'marital-status', 'occupation', 'relationship', 'race', 'sex',
    'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'label'
]

CATEGORICAL_FEATURE_KEYS = [
    'workclass',
    'education',
    'marital-status',
    'occupation',
    'relationship',
    'race',
    'sex',
    'native-country',
]

NUMERIC_FEATURE_KEYS = [
    'age',
    'capital-gain',
    'capital-loss',
    'hours-per-week',
    'education-num',
]

LABEL_KEY = 'label'

RAW_DATA_FEATURE_SPEC = dict(
    [(name, tf.io.FixedLenFeature([], tf.string))
     for name in CATEGORICAL_FEATURE_KEYS] +
    [(name, tf.io.FixedLenFeature([], tf.float32))
     for name in NUMERIC_FEATURE_KEYS] +
    [(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))]
)

SCHEMA = tft.tf_metadata.dataset_metadata.DatasetMetadata(
    tft.tf_metadata.schema_utils.schema_from_feature_spec(
        RAW_DATA_FEATURE_SPEC
    )
).schema

type(SCHEMA)

tensorflow_metadata.proto.v0.schema_pb2.Schema

In [22]:
pd.read_csv(
    train_data_file,
    names = ORDERED_CSV_COLUMNS
).head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,label
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


The columns of the dataset are either categorical or numeric. This dataset describes a classification problem: predicting the last column where the individual earns more or less 50K per year. However, from the perspective of `tf.Transform`, this label is just another categorical problem.

We use pre-built `tfxio.BeamRecordCsvTFXIO` to translate the CSV lines into `RecordBatches`. `TFXIO` requires two important pieces of information:

1. TensorFlow Metadata Schema ([`tfmd.proto.schema_pb2`](https://www.tensorflow.org/tfx/tf_metadata/api_docs/python/tfmd/proto/schema_pb2)), that contains type and shape information about each CSV column. [`schema_pb2.TensorRepresentation`](https://www.tensorflow.org/tfx/tf_metadata/api_docs/python/tfmd/proto/schema_pb2/TensorRepresentation) is an optional part of the Schema. If not provided (which is the case in this example), they will be inferred from the type and shape information. We can get the Schema either by using helper functions we provide to translate from TF parsing specs (as shown in this example), or by running [`TensorFlow Data Validation`](https://www.tensorflow.org/tfx/tutorials/data_validation/tfdv_basic).

2. List of column names, in the order they appear in the CSV file. These names must match the feature names in the Schema.

In [23]:
from tfx_bsl.public import tfxio
from tfx_bsl.coders.example_coder import RecordBatchToExamples

import apache_beam as beam

In [24]:
# A pipeline object that manages a DAG of 
# :class:`~apache_beam.pvalue.PValue` s and their
# :class:`~apache_beam.transforms.ptransform.PTransform` s.
pipeline = beam.Pipeline()

# TFXIO implementation for CSV records in pcoll[bytes].
# Unlike tfxio.CsvTFXIO, this is a special TFXIO that does not actually do I/O 
# -- it relies on the caller to prepare a PCollection of bytes.
# In other words, we only translate to RecordBatch here.
# Reminder: A record batch is a collection of equal-length arrays 
# matching a particular Schema. It is a table-like data structure that is 
# semantically a sequence of fields, each a contiguous Arrow Array.
csv_tfxio_translate = tfxio.BeamRecordCsvTFXIO(
    physical_format = 'text',
    column_names=ORDERED_CSV_COLUMNS,
    schema = SCHEMA
)

# A multiple values (potentially huge) container.
# tfx_bsl.public.tfxio.CsvTFXIO.BeamSource: Returns a beam PTransform 
# that produces PCollection[pa.RecordBatch]
raw_data = (
    pipeline
    | 'ReadTrainData' >> beam.io.ReadFromText(
        train_data_file, 
        coder=beam.coders.BytesCoder())
    | 'FixCommasTrainData' >> beam.Map(
        lambda line: line.replace(b', ', b','))
    | 'DecodeTrainData' >> csv_tfxio_translate.BeamSource())


print(raw_data)

PCollection[[24]: DecodeTrainData/RawRecordToRecordBatch/CollectRecordBatchTelemetry/ProfileRecordBatches.None]


---
We had to do some additional steps after the CSV lines are read. Otherwise, we could use `tfxio.CsvTFXIO` to **handle both reading the file and translating** to `RecordBatch`, as in this following example:

---

Note: `tfxio.CsvTFXIO` automatically implements this IO functionality:

https://github.com/tensorflow/tfx-bsl/blob/master/tfx_bsl/tfxio/csv_tfxio.py#L238

<font size=2>
    
    def _CSVSource(self) -> beam.PTransform:
        """Returns a PTtransform that producese PCollection[bytes]."""
        return beam.io.ReadFromText(
            self._file_pattern,
            coder=beam.coders.BytesCoder(),
            validate=self._validate,
            skip_header_lines=self._skip_header_lines
    )
</font>

In [25]:
# TFXIO implementation for CSV.
# https://www.tensorflow.org/tfx/tfx_bsl/api_docs/python/tfx_bsl/public/tfxio/CsvTFXIO
csv_tfxio_read_translate = tfxio.CsvTFXIO(
    train_data_file,  # pattern to read csv files from. 
    telemetry_descriptors=[],  # identify the component that is instantiating this TFXIO.
    column_names=ORDERED_CSV_COLUMNS,  # Order must match the order in the CSV file
    schema=SCHEMA  # if provided, determines the data type of csv columns
)

# tfx_bsl.public.tfxio.CsvTFXIO.BeamSource: Returns a beam `PTransform` 
# that produces `PCollection[pa.RecordBatch]`.
raw_data_2 = (
    pipeline
    | 'TFXIORead' >> csv_tfxio_read_translate.BeamSource()
)

print(raw_data_2)

PCollection[[25]: TFXIORead/RawRecordToRecordBatch/CollectRecordBatchTelemetry/ProfileRecordBatches.None]


---
Preprocessing for this dataset is similar to the previous example, except the preprocessing function is programmatically generated instead of manually specifying each column. 

In the **preprocessing function** below, `NUMERICAL_COLUMNS` and `CATEGORICAL_COLUMNS` are lists that contain the names of the numeric and categorical columns:

In [26]:
original_preprocessing_fn = """

def preprocessing_fn(inputs):
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']
    
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = x_centered * y_normalized
    
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
        's_integerized': s_integerized
    }
"""

In [27]:
NUM_OOV_BUCKETS = 1

def preprocessing_fn(inputs):
    """Preprocess input columns into transposed columns."""
    # Since we are modifying some features and leaving others unchanged, 
    # we start by setting 'outputs' to a copy of 'inputs'
    outputs = inputs.copy()
    
    # Scale numeric columns to have range [0, 1]
    for key in NUMERIC_FEATURE_KEYS:
        outputs[key] = tft.scale_to_0_1(outputs[key])
        
    # For all categorical columns except the label column, we generate a
    # vocabulary but do not modify the feature. This vocabulary is instead
    # used in a trainer, by means of a feature column, to convert the feature
    # from a string to an integer id.
    for key in CATEGORICAL_FEATURE_KEYS:
        outputs[key] = tft.compute_and_apply_vocabulary(
            tf.strings.strip(inputs[key]),
            num_oov_buckets=NUM_OOV_BUCKETS,
            vocab_filename=key
        )
        
    # For the label column, we provide the mapping from string to index
    with tf.init_scope():
        # `init_scope` - Only initialize the table once
        initializer = tf.lookup.KeyValueTensorInitializer(
            keys=['>50K', '<=50K'],
            values=tf.cast(tf.range(2), tf.int64),
            key_dtype=tf.string,
            value_dtype=tf.int64
        )
        table = tf.lookup.StaticHashTable(initializer, default_value=-1)
        
    outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])
        
    return outputs    

One difference from the previous example is the label column manually specifies the mapping from the string to an index. So, `>50` is mapped to `0`, and `<=50K` is mapped to `1` because it's useful to know which index in the trained model corresponds to which label.

The `record_batches` variable represents a `PCollection` of `pyarrow.RecordBatch`. The `tensor_adapter_config` is given by `csv_tfxio`, which is inferred from `SCHEMA` (and ultimately in this example, from the TF parsing specs).

The final stage is to write the transformed data to disk. This has a similar procedure to reading the raw data. The schema used to do this is part of the output of `tft_beam.AnalyzeAndTransformDataset` which infers a schema for the output data. The code to write to disk is shown below. The schema is a part of the metadata but uses the two interchangeably in the `tf.Transform` API (eg pass the metadata to the `tft.coders.ExampleProtoCoder`). Be aware that this writes to a different format. Instead of `textio.WriteToText`, use Beam's built-in support for the `TFRecord` format, and use a coder to encode the data as `Example` protos. This is a better format to use for training, as shown in the next section.

`transformed_eval_data_base` provides the base filename for the individual shards that are written.








In [28]:
raw_dataset = (raw_data, csv_tfxio_read_translate.TensorAdapterConfig())
raw_dataset

(<PCollection[[24]: DecodeTrainData/RawRecordToRecordBatch/CollectRecordBatchTelemetry/ProfileRecordBatches.None] at 0x153862670>,
 <tfx_bsl.tfxio.tensor_adapter.TensorAdapterConfig at 0x15300f970>)

In [29]:
working_dir = tempfile.mkdtemp()

with tft_beam.Context(temp_dir=working_dir):
    transformed_dataset, transform_fn = (
        raw_dataset 
        # Combination of AnalyzeDataset and TransformDataset
        # Infers a schema for the output data
        | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn, output_record_batches=True
        )
    )

In [30]:
print(type(transformed_dataset[0]))
print(type(transformed_dataset[1]))
HR()
print(type(transform_fn[0]))
print(type(transform_fn[1]))

<class 'apache_beam.pvalue.PCollection'>
<class 'tensorflow_transform.beam.tft_beam_io.beam_metadata_io.BeamDatasetMetadata'>
----------------------------------------
<class 'apache_beam.pvalue.PCollection'>
<class 'tensorflow_transform.beam.tft_beam_io.beam_metadata_io.BeamDatasetMetadata'>


In [31]:
output_dir = 'chp13_tfx_04'

from pathlib import Path

!rm -fr {output_dir}
Path(output_dir).mkdir(parents=True, exist_ok=True)

!du -h {output_dir}
# Make sure to clean this up
# !rm -fr {output_dir}
print(f"Reset {output_dir}")

  0B	chp13_tfx_04
Reset chp13_tfx_04


In [32]:
transformed_data, _ = transformed_dataset

# Run this just as a side-effect
_ = (
    transformed_data
    | 'EncodeTrainData' >>
    beam.FlatMapTuple(lambda batch, _ : RecordBatchToExamples(batch))
    | 'WriteTrainData' >>
    beam.io.WriteToTFRecord(
        os.path.join(output_dir, 'transformed.tfrecord')
    )
)

In addition to the training data, `transform_fn` is also written out with the metadata.

In [33]:
_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir)
)

In [34]:
!du -h {output_dir}

  0B	chp13_tfx_04


---
Run the entire Beam pipeline with `pipeline.run().wait_until_finish()`. Up until this point, the Beam pipeline represents a deferred, distributed computation. It provides instructions for what will be done, but the instructions have not been executed. This final call executes the specified pipeline:

In [35]:
# Runs the pipeline. Returns whatever our runner returns after running.
# Waits until the pipeline finishes and returns the final status.
result = pipeline.run().wait_until_finish()



After running the pipeline, the output directory contains two artifacts:

* The transformed data, and the metadata describing it.
* The `tf.saved_model` containing the resulting `preprocessing_fn`

To see how to use these artifacts, refer to the [Advanced preprocessing tutorial](https://www.tensorflow.org/tfx/tutorials/transform/census).

In [36]:
!ls -lh {output_dir}

total 0
drwxr-xr-x  5 gb  staff   160B Jul 28 00:20 [34mtransform_fn[m[m
-rw-r--r--  1 gb  staff     0B Jul 28 00:20 transformed.tfrecord-00000-of-00001
drwxr-xr-x  4 gb  staff   128B Jul 28 00:20 [34mtransformed_metadata[m[m


In [37]:
!du -h {output_dir}

8.0K	chp13_tfx_04/transformed_metadata
8.0K	chp13_tfx_04/transform_fn/variables
 32K	chp13_tfx_04/transform_fn/assets
196K	chp13_tfx_04/transform_fn
204K	chp13_tfx_04
