In [1]:
import os
import sys
from typing import Text
 
from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

In [2]:
PIPELINE_NAME = "customer-churn-pipeline"

# pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/customer_churn_transform.py"
TRAINER_MODULE_FILE = "modules/customer_churn_trainer.py"
# requirement_file = os.path.join(root, "requirements.txt")

# pipeline outputs
OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

In [3]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on on the number of CPUs available 
        # during execution time.
        "----direct_num_workers=0" 
    ]
    
    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )

In [4]:
if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)
    
    from modules.components import init_components
    
    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )
    
    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: output\customer-churn-pipeline
INFO:absl:Generating ephemeral wheel package for 'd:\\Coolyeah\\Dicoding\\Submission_Dicoding\\ml_ops\\Latihan 2\\modules\\customer_churn_transform.py' (including modules: ['components', 'customer_churn_trainer', 'customer_churn_transform']).
INFO:absl:User module package has hash fingerprint version ab366fa6a9beb811b09b347c778e2224f498a129d47e1a42aa6beb3e7d74a5d6.
INFO:absl:Executing: ['d:\\Coolyeah\\Dicoding\\Submission_Dicoding\\ml_ops\\envs\\mlops-churn\\python.exe', 'C:\\Users\\User\\AppData\\Local\\Temp\\tmpd2e7sf72\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\User\\AppData\\Local\\Temp\\tmpt4h31lu5', '--dist-dir', 'C:\\Users\\User\\AppData\\Local\\Temp\\tmp6ej1bcqx']
INFO:absl:Successfully built user c

INFO:absl:Node CsvExampleGen depends on [].
INFO:absl:Node CsvExampleGen is scheduled.
INFO:absl:Node Latest_blessed_model_resolver depends on [].
INFO:absl:Node Latest_blessed_model_resolver is scheduled.
INFO:absl:Node StatisticsGen depends on ['Run[CsvExampleGen]'].
INFO:absl:Node StatisticsGen is scheduled.
INFO:absl:Node SchemaGen depends on ['Run[StatisticsGen]'].
INFO:absl:Node SchemaGen is scheduled.
INFO:absl:Node ExampleValidator depends on ['Run[SchemaGen]', 'Run[StatisticsGen]'].
INFO:absl:Node ExampleValidator is scheduled.
INFO:absl:Node Transform depends on ['Run[CsvExampleGen]', 'Run[SchemaGen]'].
INFO:absl:Node Transform is scheduled.
INFO:absl:Node Trainer depends on ['Run[SchemaGen]', 'Run[Transform]'].
INFO:absl:Node Trainer is scheduled.
INFO:absl:Node Evaluator depends on ['Run[CsvExampleGen]', 'Run[Latest_blessed_model_resolver]', 'Run[Trainer]'].
INFO:absl:Node Evaluator is scheduled.
INFO:absl:Node Pusher depends on ['Run[Evaluator]', 'Run[Trainer]'].
INFO:absl



INFO:absl:Using output\customer-churn-pipeline\Trainer\model\7\Format-Serving as baseline model.




INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'fairness_indicator_thresholds': 'null', 'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "Precision"\n        },\n        {\n          "class_name": "Recall"\n        },\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "BinaryAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.5\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\n    {\n      "label_key": "Churn"\n    }\n  ],\n  "slicing_specs": [\n    {},\n    {\n      "feature_keys": [\n        "gender",\n        "Partner"\n      ]\n    }\n 































































INFO:absl:Evaluation complete. Results written to output\customer-churn-pipeline\Evaluator\evaluation\17.
INFO:absl:Checking validation results.


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
INFO:absl:Blessing result False written to output\customer-churn-pipeline\Evaluator\blessing\17.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 17 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'blessing': [Artifact(artifact: uri: "output\\customer-churn-pipeline\\Evaluator\\blessing\\17"
, artifact_type: name: "ModelBlessing"
)], 'evaluation': [Artifact(artifact: uri: "output\\customer-churn-pipeline\\Evaluator\\evaluation\\17"
, artifact_type: name: "ModelEvaluation"
)]}) for execution 17
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Evaluator is finished.
INFO:absl:node Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: 