In [1]:
import os
from typing import Text
 
from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

In [2]:
PIPELINE_NAME = "rivalhaikalhafizh-pipeline"
 
# pipeline inputs
DATA_ROOT = "data"
TRANSFORM_MODULE_FILE = "modules/bmi_transform.py"
TRAINER_MODULE_FILE = "modules/bmi_trainer.py"
TUNER_MODULE_FILE = "modules/bmi_tuner.py"
# requirement_file = os.path.join(root, "requirements.txt")
 
# pipeline outputs
OUTPUT_BASE = "output"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_roots = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_roots, "metadata.sqlite")

In [3]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    """Initiate tfx pipeline
    Args:
        pipeline_root (Text): a path to th pipeline directory
        pipeline_name (str): pipeline name
        metadata_path (str): a path to the metadata directory
        components (dict): tfx components
    Returns:
        pipeline.Pipeline: pipeline orchestration
    """
    
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        "--direct_running_mode=multi_processing"
        # 0 auto-detect based on on the number of CPUs available 
        # during execution time.
        "----direct_num_workers=0" 
    ]
    
    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )


In [4]:
if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)
    
    from modules.components import init_components
    components_args = {
        "data_dir": DATA_ROOT,
        "training_module": TRAINER_MODULE_FILE,
        "tuner_module": TUNER_MODULE_FILE,
        "transform_module": TRANSFORM_MODULE_FILE,
        "training_steps": 300,
        "eval_steps": 200,
        "serving_model_dir": serving_model_dir,
    }
    
    componentss = init_components(components_args)
    
    pipeline = init_local_pipeline(componentss, pipeline_roots)
    BeamDagRunner().run(pipeline=pipeline)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Pipeline root set to: outputs\rivalhaikalhafizh-pipeline
INFO:absl:Generating ephemeral wheel package for 'C:\\Users\\ACER\\MachineLearning\\MLOps\\pipelineakhir\\modules\\bmi_transform.py' (including modules: ['bmi_trainer', 'bmi_transform', 'bmi_tuner', 'components']).
INFO:absl:User module package has hash fingerprint version d2501d74274dc09e60935ad38c6386c7b723d4fe727ab7b36bfe796966d298e4.
INFO:absl:Executing: ['C:\\Users\\ACER\\.conda\\envs\\a443-churn\\python.exe', 'C:\\Users\\ACER\\AppData\\Local\\Temp\\tmp7uor30lk\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\ACER\\AppData\\Local\\Temp\\tmplzcn1_l8', '--dist-dir', 'C:\\Users\\ACER\\AppData\\Local\\Temp\\tmpu7qzh3h9']
INFO:absl:Successfully built user code wheel distribution at 'outputs\\rivalhaikalhafizh

INFO:absl:Node CsvExampleGen depends on [].
INFO:absl:Node CsvExampleGen is scheduled.
INFO:absl:Node Latest_blessed_model_resolver depends on [].
INFO:absl:Node Latest_blessed_model_resolver is scheduled.
INFO:absl:Node StatisticsGen depends on ['Run[CsvExampleGen]'].
INFO:absl:Node StatisticsGen is scheduled.
INFO:absl:Node SchemaGen depends on ['Run[StatisticsGen]'].
INFO:absl:Node SchemaGen is scheduled.
INFO:absl:Node ExampleValidator depends on ['Run[SchemaGen]', 'Run[StatisticsGen]'].
INFO:absl:Node ExampleValidator is scheduled.
INFO:absl:Node Transform depends on ['Run[CsvExampleGen]', 'Run[SchemaGen]'].
INFO:absl:Node Transform is scheduled.
INFO:absl:Node Tuner depends on ['Run[SchemaGen]', 'Run[Transform]'].
INFO:absl:Node Tuner is scheduled.
INFO:absl:Node Trainer depends on ['Run[SchemaGen]', 'Run[Transform]', 'Run[Tuner]'].
INFO:absl:Node Trainer is scheduled.
INFO:absl:Node Evaluator depends on ['Run[CsvExampleGen]', 'Run[Latest_blessed_model_resolver]', 'Run[Trainer]']

INFO:absl:node Latest_blessed_model_resolver is finished.
INFO:absl:node StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "rivalhaikalhafizh-pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "20230426-175403.011724"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "rivalhaikalhafizh-pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
          

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[Transform] Resolved inputs: ({'examples': [Artifact(artifact: id: 125
type_id: 16
uri: "outputs\\rivalhaikalhafizh-pipeline\\CsvExampleGen\\examples\\81"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:8318,xor_checksum:1640319368,sum_checksum:1640319368"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
c

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[Tuner] Resolved inputs: ({'transform_graph': [Artifact(artifact: id: 134
type_id: 22
uri: "outputs\\rivalhaikalhafizh-pipeline\\Transform\\transform_graph\\86"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1682506352678
last_update_time_since_epoch: 1682506352678
, artifact_type: id: 22
name: "TransformGraph"
)], 'examples': [Artifact(artifact: id: 130
type_id: 16
uri: "outputs\\rivalhaikalhafizh-pipeline\\Transform\\transformed_examples\\86"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_pro

INFO:absl:[Trainer] Resolved inputs: ({'transform_graph': [Artifact(artifact: id: 134
type_id: 22
uri: "outputs\\rivalhaikalhafizh-pipeline\\Transform\\transform_graph\\86"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1682506352678
last_update_time_since_epoch: 1682506352678
, artifact_type: id: 22
name: "TransformGraph"
)], 'schema': [Artifact(artifact: id: 127
type_id: 20
uri: "outputs\\rivalhaikalhafizh-pipeline\\SchemaGen\\schema\\84"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1682506331203
last_update_time_since_epoch: 168250

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:[Evaluator] Resolved inputs: ({'examples': [Artifact(artifact: id: 125
type_id: 16
uri: "outputs\\rivalhaikalhafizh-pipeline\\CsvExampleGen\\examples\\81"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:8318,xor_checksum:1640319368,sum_checksum:1640319368"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
c

INFO:absl:udf_utils.get_fn {'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "Precision"\n        },\n        {\n          "class_name": "Recall"\n        },\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "TruePositives"\n        },\n        {\n          "class_name": "FalsePositives"\n        },\n        {\n          "class_name": "TrueNegatives"\n        },\n        {\n          "class_name": "FalseNegatives"\n        },\n        {\n          "class_name": "SparseCategoricalAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.0\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\n    {\n      "label_key": "Index"\n    



INFO:absl:Using output\rivalhaikalhafizh-pipeline\Trainer\model\8\Format-Serving as baseline model.
INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "Precision"\n        },\n        {\n          "class_name": "Recall"\n        },\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "TruePositives"\n        },\n        {\n          "class_name": "FalsePositives"\n        },\n        {\n          "class_name": "TrueNegatives"\n        },\n        {\n          "class_name": "FalseNegatives"\n        },\n        {\n          "class_name": "SparseCategoricalAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n   



INFO:absl:Cleaning up stateless execution info.


RuntimeError: KeyboardInterrupt [while running 'Run[Evaluator]']