In [None]:
"""Initiate tfx pipeline components
"""

import os

import tensorflow as tf
import tensorflow_model_analysis as tfma
from tfx.components import (
    CsvExampleGen,
    StatisticsGen,
    SchemaGen,
    ExampleValidator,
    Transform,
    Trainer,
    Evaluator,
    Pusher
)
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.types import Channel
from tfx.dsl.components.common.resolver import Resolver
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import (
    LatestBlessedModelStrategy)

def init_components(
    data_dir,
    transform_module,
    training_module,
    training_steps,
    eval_steps,
    serving_model_dir,
):
    """Initiate tfx pipeline components

    Args:
        data_dir (str): a path to the data
        transform_module (str): a path to the transform_module
        training_module (str): a path to the transform_module
        training_steps (int): number of training steps
        eval_steps (int): number of eval steps
        serving_model_dir (str): a path to the serving model directory

    Returns:
        TFX components
    """
    output = example_gen_pb2.Output(
        split_config = example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name="train", hash_buckets=8),
            example_gen_pb2.SplitConfig.Split(name="eval", hash_buckets=2)
        ])
    )

    example_gen = CsvExampleGen(
        input_base=data_dir,
        output_config=output
    )

    statistics_gen = StatisticsGen(
        examples=example_gen.outputs["examples"]
    )

    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs["statistics"]
    )

    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema']
    )

    transform  = Transform(
        examples=example_gen.outputs['examples'],
        schema= schema_gen.outputs['schema'],
        module_file=os.path.abspath(transform_module)
    )

    trainer  = Trainer(
        module_file=os.path.abspath(training_module),
        examples = transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(
            splits=['train'],
            num_steps=training_steps),
        eval_args=trainer_pb2.EvalArgs(
            splits=['eval'],
            num_steps=eval_steps)
    )

    model_resolver = Resolver(
        strategy_class= LatestBlessedModelStrategy,
        model = Channel(type=Model),
        model_blessing = Channel(type=ModelBlessing)
    ).with_id('Latest_blessed_model_resolver')

    slicing_specs=[
        tfma.SlicingSpec(),
        tfma.SlicingSpec(feature_keys=[
            "gender",
            "Partner"
        ])
    ]

    metrics_specs = [
        tfma.MetricsSpec(metrics=[
                tfma.MetricConfig(class_name='AUC'),
                tfma.MetricConfig(class_name="Precision"),
                tfma.MetricConfig(class_name="Recall"),
                tfma.MetricConfig(class_name="ExampleCount"),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={'value':0.5}),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={'value':0.0001})
                        )
                )
            ])
    ]

    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key='Churn')],
        slicing_specs=slicing_specs,
        metrics_specs=metrics_specs
    )

    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config)

    pusher = Pusher(
        model=trainer.outputs["model"],
        model_blessing=evaluator.outputs["blessing"],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        ),
    )

    components = (
        example_gen,
        statistics_gen,
        schema_gen,
        example_validator,
        transform,
        trainer,
        model_resolver,
        evaluator,
        pusher
    )

    return components

In [None]:
import os
import sys
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

PIPELINE_NAME = "customer-churn-pipeline"

# pipeline inputs
DATA_ROOT = "/content/a443-cc-pipeline/data"
TRANSFORM_MODULE_FILE = "/content/a443-cc-pipeline/modules/customer_churn_transform.py"
TRAINER_MODULE_FILE = "/content/a443-cc-pipeline/modules/customer_churn_trainer.py"
# requirement_file = os.path.join(root, "requirements.txt")

# pipeline outputs
OUTPUT_BASE = "/content/a443-cc-pipeline/output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:

    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on on the number of CPUs available
        # during execution time.
        "----direct_num_workers=0"
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )

if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)

    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )

    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: /content/a443-cc-pipeline/output/customer-churn-pipeline
INFO:absl:Generating ephemeral wheel package for '/content/a443-cc-pipeline/modules/customer_churn_transform.py' (including modules: ['customer_churn_transform', 'components', 'customer_churn_trainer']).
INFO:absl:User module package has hash fingerprint version a6093693d7d4855a85bc3e5ba85a43d84689d55391e69ed75232711a696fb5ba.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmplteazxly/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmp6akwauz8', '--dist-dir', '/tmp/tmpbdrntmq6']
INFO:absl:Successfully built user code wheel distribution at '/content/a443-cc-pipeline/output/customer-churn-pipeline/_wheels/tfx_user_code_Transform-0.0+a6093693d7d4855a85bc3e5ba85a43d84689d55391e69ed75232711a696fb5

INFO:absl:Node CsvExampleGen depends on [].
INFO:absl:Node CsvExampleGen is scheduled.
INFO:absl:Node Latest_blessed_model_resolver depends on [].
INFO:absl:Node Latest_blessed_model_resolver is scheduled.
INFO:absl:Node StatisticsGen depends on ['Run[CsvExampleGen]'].
INFO:absl:Node StatisticsGen is scheduled.
INFO:absl:Node SchemaGen depends on ['Run[StatisticsGen]'].
INFO:absl:Node SchemaGen is scheduled.
INFO:absl:Node ExampleValidator depends on ['Run[SchemaGen]', 'Run[StatisticsGen]'].
INFO:absl:Node ExampleValidator is scheduled.
INFO:absl:Node Transform depends on ['Run[CsvExampleGen]', 'Run[SchemaGen]'].
INFO:absl:Node Transform is scheduled.
INFO:absl:Node Trainer depends on ['Run[SchemaGen]', 'Run[Transform]'].
INFO:absl:Node Trainer is scheduled.
INFO:absl:Node Evaluator depends on ['Run[CsvExampleGen]', 'Run[Latest_blessed_model_resolver]', 'Run[Trainer]'].
INFO:absl:Node Evaluator is scheduled.
INFO:absl:Node Pusher depends on ['Run[Evaluator]', 'Run[Trainer]'].
INFO:absl

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 InternetService_xf (InputL  [(None, 4)]                  0         []                            
 ayer)                                                                                            
                                                                                                  
 SeniorCitizen_xf (InputLay  [(None, 3)]                  0         []                            
 er)                                                                                              
                                                                                                  
 PaperlessBilling_xf (Input  [(None, 3)]                  0         []                            
 Layer)                                                                                       

INFO:absl:Feature Churn has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature InternetService has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature MonthlyCharges has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature PaperlessBilling has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature Partner has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature PhoneService has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature SeniorCitizen has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature StreamingTV has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature TotalCharges has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature customerID has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature gender has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature tenure has a shape dim {
  size: 1
}