Alexander S. Lundervold, 28.03.22

# Data ingestion with TensorFlow Extended

To get a pipeline up and running, we first need to ingest datasets. This notebook introduces some concepts and tools for data ingestion, in particular the `ExampleGen` component of TensorFlow Extended. We'll also connect two components together--`ExampleGen` and `StatisticsGen`, providing the first step of our ML pipeline journey. 

## Data set

We'll use the simplified PetFinder.my data set downloaded in `0.0-prepare_data.ipynb`. The `0.0` notebook must be run before this one. 

<img src="https://github.com/alu042/DAT255-2022/raw/master/Module-9-2-TensorFlow_Extended/nbs/assets/petfinder.png">

# Setup

In [None]:
%matplotlib inline
import os
from pathlib import Path

In [None]:
# Check whether we're running on Colab
try:
    import colab
    colab=True
except:
    colab=False

In [None]:
if colab:
    !pip install -U tfx

> If on Colab, restart the runtime after running the above cell

In [None]:
import tensorflow as tf
import tfx

In [None]:
print(tf.__version__)

In [None]:
print(tfx.__version__)

The data is stored locally in the repository:

In [None]:
if colab:
    from google.colab import drive
    drive.mount('./gdrive')
    DATA = Path('./gdrive/MyDrive/ColabData/petfinder-mini/csv')
else:
    NB_DIR = Path.cwd()
    DATA = NB_DIR/'..'/'data'/'petfinder-mini'/'csv'

## Run in an interactive context

When executing components in production one would use an orchestration engine, by specifying all the components in a `Pipeline` upfront and passing them to the orchestrator. The component execution order is determined by constructing a directed acyclic graph of the artifact dependencies. 

During development in Jupyter Notebook it's convenient to play the role as an orchestrator ourselves, running the notebook cells as ususal. This can be achieved using `InteractiveContext`, made for iterative development in Notebooks. 

In [None]:
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

In [None]:
context = InteractiveContext()

> In an interactive context, the MetadataStore will use an in-memory (i.e., ephemeral) database instance based on SQLite.

# Ingesting the PetFinder data

There are many ways to ingest data into our pipelines. From local or remote disks (f.ex. storage buckets like S3, GCS, etc) or directly from databases. We can ingest CSVs, TFRecords, Parquet, Avro, use BigQuery, Hadoop, Google Cloud Datastore, MongoDB, and much more (whatever is directly supported by TFX or [supported by Apache Beam](https://beam.apache.org/documentation/io/built-in/)).

We'll limit the below examples to ingesting from CSVs and TFRecords. 

## From CSV

There's a custom `ExampleGen` component for CSVs, based on the `FileBasedExampleGen` which is based on Apache Beam. 

In [None]:
from tfx.components import CsvExampleGen

In [None]:
#?CsvExampleGen

In [None]:
example_gen = CsvExampleGen(input_base=str(DATA)+'/', 
                            input_config=None, output_config=None, 
                            range_config=None)

Here's how `CsvExampleGen` is built:

In [None]:
#??example_gen

### Apache Beam and TFX

From https://www.tensorflow.org/tfx/guide/beam:

> _Apache Beam provides a framework for running batch and streaming data processing jobs that run on a variety of execution engines. Several of the TFX libraries use Beam for running tasks, which enables a high degree of scalability across compute clusters. Beam includes support for a variety of execution engines or "runners", including a direct runner which runs on a single compute node and is very useful for development, testing, or small deployments. Beam provides an abstraction layer which enables TFX to run on any supported runner without code modifications. TFX uses the Beam Python API, so it is limited to the runners that are supported by the Python API._

## `ExampleGen`

Let's have a look at what we got:

In [None]:
example_gen

There are currently no output artifacts as the component hasn't been run yet. In production, an orchestrator would execute the component as needed. During interactive development we can execute it in an interactive context. 

As we've seen, each TFX component has a **component specification**, an **executor class**, and some **inputs** and **outputs** wiring it to other components in a pipeline. We'll have a look at each of these below.

### Component specification

The component specification is what's printed above. Here it is again:

In [None]:
example_gen.spec.to_json_dict()

## Data spans and versioning data

> In our case, all the data is stored in a single _[Span](url)_, in the directory `DATA`. In practice, data would typically be split into multiple spans, for example stored in separate directories. A span is a snapshot of data, which can for example correspond to data generated per day, week, or any other grouping that makes sense in a specific use-case. This also allows for data versioning, which is a key concept for ML pipelines. Otherwise it is difficult to track changes in a machine learning-based program (which of course depends on the exact training data used during construction). https://www.tensorflow.org/tfx/guide/examplegen#span_version_and_split.


## Splitting data

When we constructed our `ExampleGen` above it set up a default split into a 2:1 ratio (train versus eval). We can customize the split, f.ex. split into a 7:2:1 ratio (70%, 20%, 10% for training, validation, testing), by defining a `split_config` in the `output_config` (which is passed to the `ExampleGen` component as a [protocol message](https://developers.google.com/protocol-buffers/docs/overview)). Note that it is of course possible to use predefined splits.

In [None]:
from tfx.proto import example_gen_pb2

In [None]:
output = example_gen_pb2.Output(
                split_config=example_gen_pb2.SplitConfig(splits=[
                    example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=7),
                    example_gen_pb2.SplitConfig.Split(name='valid', hash_buckets=2),
                    example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=1)
                        ]))

In [None]:
output

In [None]:
example_gen = CsvExampleGen(input_base=str(DATA)+'/', 
                            input_config=None, output_config=output, 
                            range_config=None)

Now we can see that there are splits in the component specification:

In [None]:
example_gen

## Execute the `ExampleGen`:

We run the component to obtain some artifacts. The executor converts CSV files into TensorFlow Examples (`tf.train.Examples`) using a Beam pipeline. 

In [None]:
context.run(example_gen)

Here are the artifacts. They are put in the underlying database and referenced in the `MetadataStore` (note the URIs below):

In [None]:
for artifact in example_gen.outputs['examples'].get():
    print(artifact)

## Taking a look at some training examples

We can look at the first couple of training examples stored as artifacts. 

In [None]:
train_uri = Path(example_gen.outputs['examples'].get()[0].uri)/"Split-train"
train_uri

We see that they are TFRecord files:

In [None]:
list(train_uri.iterdir())

In [None]:
tfrecord_fn = list(train_uri.iterdir())[0]
tfrecord_fn

From our previous explorations of TensorFlow, we know that we can load them as `TFRecordDataset`s:

In [None]:
train_dataset = tf.data.TFRecordDataset(tfrecord_fn, compression_type="GZIP")

In [None]:
train_dataset

Here are the first two records:

In [None]:
for tfrecord in train_dataset.take(2):
    serialized_example = tfrecord.numpy()
    example = tf.train.Example()
    example.ParseFromString(serialized_example)
    print(example)
    print("#"*40)
    print("#"*40)
    print()

## A closer look at the executor specification

In [None]:
example_gen.executor_spec.to_json_dict()

In [None]:
#??tfx.components.example_gen.csv_example_gen.executor.Executor

In [None]:
#??tfx.components.example_gen.base_example_gen_executor

# Connecting to a `StatisticsGen`

To illustrate how to connect TFX components, let's compute some statistics on the data using `StatisticsGen`. 

> In the next notebook we'll continue using this to look into data validation strategies. Here the main point is just to show how components can be assembled into pipelines.

In [None]:
from tfx.components import StatisticsGen

We define the component inputs by connecting it to our `ExampleGen`:

In [None]:
statistics_gen = StatisticsGen(
        examples=example_gen.outputs['examples'],
        schema=None,
        stats_options=None,
        exclude_splits=None
      )

In [None]:
statistics_gen

In [None]:
#?statistics_gen

Let's run the `StatisticsGen` component to generate some artifacts:

In [None]:
context.run(statistics_gen)

In [None]:
for artifact in statistics_gen.outputs['statistics'].get():
    print(artifact)

We can show the computed statistics in our interactive context:

In [None]:
context.show(statistics_gen.outputs['statistics'])

We now have two components connected in a pipeline: 

<img width=60% src="https://github.com/alu042/DAT255-2022/raw/master/Module-9-2-TensorFlow_Extended/nbs/assets/pipeline_1.png">

# What's next?

We'll explore how we can use the generated statistics to generate a **data schema** and perform various **data validation** steps.