## Import Libraries

In [1]:
import os
os.chdir('C:\\Users\\M S I\\Downloads\\Submission 2 MLOPS Dicoding')
from typing import Text, List
from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

## Directory Configuration

In [2]:
PIPELINE_NAME = 'warlord194-pipeline'
DATA_ROOT = 'data'
TRANSFORM_MODULE_FILE = 'modules/diabetes_churn_transform.py'
TRAINER_MODULE_FILE = 'modules/diabetes_churn_trainer.py'
OUTPUT_BASE = 'output'

SERVING_MODEL_DIR = os.path.join(OUTPUT_BASE, 'serving_model')
PIPELINE_ROOT = os.path.normpath('C:/')
METADATA_PATH = os.path.join(PIPELINE_ROOT, 'metadata.sqlite')

## Run Pipeline

In [5]:
def create_beam_pipeline_args() -> List[Text]:
    """
    Generate Beam pipeline arguments for local execution.

    Returns:
        List[Text]: A list of Beam pipeline arguments.
    """
    return [
        '--direct_running_mode=multi_processing',
        '--direct_num_workers=0'
    ]


def initialize_pipeline(components, pipeline_root: Text) -> pipeline.Pipeline:
    """
    Initialize a local TFX pipeline.

    Args:
        components: A list of TFX components to be included in the pipeline.
        pipeline_root (Text): Root directory for pipeline output artifacts.

    Returns:
        pipeline.Pipeline: A TFX pipeline object.
    """
    logging.info(f"Initializing pipeline with root directory: {pipeline_root}")

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            METADATA_PATH
        ),
        beam_pipeline_args=create_beam_pipeline_args(),
    )


def main():
    """
    Main function to set up and run the pipeline.
    """
    logging.set_verbosity(logging.INFO)

    from modules.components import create_pipeline_components
    components = create_pipeline_components(
        data_dir=DATA_ROOT,
        trainer_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        train_steps=5000,
        eval_steps=1000,
        serving_model_dir=SERVING_MODEL_DIR,
    )

    pipeline = initialize_pipeline(components, PIPELINE_ROOT)
    BeamDagRunner().run(pipeline=pipeline)


if __name__ == '__main__':
    main()

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Initializing pipeline with root directory: C:\
INFO:absl:Generating ephemeral wheel package for 'C:\\Users\\M S I\\Downloads\\Submission 2 MLOPS Dicoding\\modules\\diabetes_churn_transform.py' (including modules: ['components', 'diabetes_churn_trainer', 'diabetes_churn_transform']).
INFO:absl:User module package has hash fingerprint version b88156b9114becfc43c0343b712657addf66b28c7fc18ced049202aae4fd3187.
INFO:absl:Executing: ['c:\\ProgramData\\miniconda3\\envs\\diabetes-churn\\python.exe', 'C:\\Users\\MSI~1\\AppData\\Local\\Temp\\tmpujskk17y\\_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', 'C:\\Users\\MSI~1\\AppData\\Local\\Temp\\tmpw51q6529', '--dist-dir', 'C:\\Users\\MSI~1\\AppData\\Local\\Temp\\tmpk_awsggt']
INFO:absl:Successfully built user code wheel distribution at 'C:\



INFO:absl:Using C:\Trainer\model\7\Format-Serving as baseline model.




INFO:absl:The 'example_splits' parameter is not set, using 'eval' split.
INFO:absl:Evaluating model.
INFO:absl:udf_utils.get_fn {'fairness_indicator_thresholds': 'null', 'example_splits': 'null', 'eval_config': '{\n  "metrics_specs": [\n    {\n      "metrics": [\n        {\n          "class_name": "AUC"\n        },\n        {\n          "class_name": "Precision"\n        },\n        {\n          "class_name": "Recall"\n        },\n        {\n          "class_name": "ExampleCount"\n        },\n        {\n          "class_name": "BinaryAccuracy",\n          "threshold": {\n            "change_threshold": {\n              "absolute": 0.0001,\n              "direction": "HIGHER_IS_BETTER"\n            },\n            "value_threshold": {\n              "lower_bound": 0.5\n            }\n          }\n        }\n      ]\n    }\n  ],\n  "model_specs": [\n    {\n      "label_key": "diabetes"\n    }\n  ],\n  "slicing_specs": [\n    {},\n    {\n      "feature_keys": [\n        "gender"\n      ]\







INFO:absl:Evaluation complete. Results written to C:\Evaluator\evaluation\96.
INFO:absl:Checking validation results.


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
INFO:absl:Blessing result False written to C:\Evaluator\blessing\96.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 96 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'blessing': [Artifact(artifact: uri: "C:\\Evaluator\\blessing\\96"
, artifact_type: name: "ModelBlessing"
)], 'evaluation': [Artifact(artifact: uri: "C:\\Evaluator\\evaluation\\96"
, artifact_type: name: "ModelEvaluation"
)]}) for execution 96
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:node Evaluator is finished.
INFO:absl:node Pusher is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.pusher.component.Pusher"
    base_type: DEPLOY
  }
  id: "Pusher"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "warlord194-pipe

In [2]:
pip freeze > requirements.txt

Note: you may need to restart the kernel to use updated packages.
