# Pipeline interactivo de ingestión, generación de estadísticas, esquema, y validación de anomalías

In [None]:
import os
import sys  
import pprint
import tempfile
import urllib
import tfx
import tensorflow as tf
from tfx.components import SchemaGen
from tfx.proto import example_gen_pb2
from tfx.components import StatisticsGen
import tensorflow_data_validation as tfdv
from tfx.components import ExampleValidator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

## Componente de ingestión

The ExampleGen TFX Pipeline component ingests data into TFX pipelines. It consumes external files/services to generate Examples which will be read by other TFX components.

In [None]:
context = InteractiveContext(  pipeline_name='notebook_pipeline_test',
  pipeline_root="./output_notebook_pipeline")

DATA_RAW_JOIN_PATH = "../data/processed/split/"

input = example_gen_pb2.Input(
      splits=[
          example_gen_pb2.Input.Split(name="train", pattern="train/*"),
          example_gen_pb2.Input.Split(name="validation", pattern="val/*"),
          example_gen_pb2.Input.Split(name="test", pattern="test/*"),
      ]
  )
data_ingestion = tfx.components.CsvExampleGen(input_base=DATA_RAW_JOIN_PATH, input_config=input)
context.run(data_ingestion)

### Visualización de registros del dataset creado

In [None]:
pp = pprint.PrettyPrinter()
artifact = data_ingestion.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

### Visualizar datos de mi example

In [None]:
train_uri = os.path.join(data_ingestion.outputs['examples'].get()[0].uri, 'split-train')

tfrecord_filenames = [os.path.join(train_uri, name) for name in os.listdir(train_uri)]

dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

## Componente de generación de estadísticas

The StatisticsGen TFX pipeline component generates features statistics over both training and serving data, which can be used by other pipeline components

In [None]:
statistics_gen = StatisticsGen(    
    examples=data_ingestion.outputs['examples'])    
context.run(statistics_gen)
context.show(statistics_gen.outputs['statistics'])

## Componente de generación del esquema de los datos

Some TFX components use a description of your input data called a schema. The schema is an instance of schema.proto. It can specify data types for feature values, whether a feature has to be present in all examples, allowed value ranges, and other properties. A SchemaGen pipeline component will automatically generate a schema by inferring types, categories, and ranges from the training data.

In [None]:
schema_gen = SchemaGen(    
    statistics=statistics_gen.outputs['statistics'],   
    infer_feature_shape=True)    

context.run(schema_gen)
context.show(schema_gen.outputs['schema'])

## Componente de validación

The ExampleValidator component uses Tensorflow Data Validation to validate the statistics of some splits on input examples against a schema.

The ExampleValidator component identifies anomalies in training and serving data. The component can be configured to detect different classes of anomalies in the data. It can:

perform validity checks by comparing data statistics against a schema that codifies expectations of the user.
Schema Based Example Validation The ExampleValidator component identifies any anomalies in the example data by comparing data statistics computed by the StatisticsGen component against a schema. The schema codifies properties which the input data is expected to satisfy, and is provided and maintained by the user.


In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])

context.run(example_validator)

In [None]:
# Assume that other_path points to another TFRecord file
stats = tfdv.generate_statistics_from_tfrecord(data_location="./output_notebook_pipeline/CsvExampleGen/examples/1/Split-test/data_tfrecord-00000-of-00001.gz")
other_stats = tfdv.generate_statistics_from_tfrecord(data_location="./output_notebook_pipeline/CsvExampleGen/examples/1/Split-validation/data_tfrecord-00000-of-00001.gz")

schema = tfdv.infer_schema(stats)
anomalies = tfdv.validate_statistics(statistics=other_stats, schema=schema)


In [None]:
tfdv.display_anomalies(anomalies)