In [2]:
# Memanggil berbagai library dan variabel yang dibutuhkan
import os
import sys
from typing import Text
from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

In [3]:
# Nama pipeline
PIPELINE_NAME = "patient-stress-pipeline"

# Pipeline inputs
DATA_ROOT = "data"  # Folder tempat data input berada
TRANSFORM_MODULE_FILE = "modules/patient_stress_transform.py"  # Transform module
TRAINER_MODULE_FILE = "modules/patient_stress_trainer.py"  # Trainer module

# Pipeline outputs
OUTPUT_BASE = "output"  # Root folder untuk output pipeline
SERVING_MODEL_DIR = os.path.join(OUTPUT_BASE, "serving_model")
PIPELINE_ROOT = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
METADATA_PATH = os.path.join(PIPELINE_ROOT, "metadata.sqlite")


In [4]:
# Fungsi untuk membuat TFX pipeline
def init_local_pipeline(components, pipeline_root: Text) -> pipeline.Pipeline:
    """
    Fungsi untuk inisialisasi TFX pipeline.
    Args:
        components: List komponen TFX yang akan digunakan dalam pipeline.
        pipeline_root: Lokasi output dari pipeline.
    Returns:
        pipeline: Objek pipeline yang akan dijalankan.
    """
    logging.info(f"Pipeline root set to: {pipeline_root}")
    
    # Apache Beam arguments
    beam_args = [
        "--direct_running_mode=multi_processing",  # Mode eksekusi menggunakan multiprocessing
        "--direct_num_workers=0"  # 0 berarti auto-detect jumlah CPU yang tersedia
    ]
    
    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,  # Menggunakan caching untuk menghemat waktu
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            METADATA_PATH
        ),
        beam_pipeline_args=beam_args,  # Argumen untuk Apache Beam
    )

In [7]:
# menjalankan pipeline
# membuat sebuah TFX component menggunakan fungsi init_components(). Selanjutnya, seluruh komponen tersebut disatukan menjadi sebuah machine learning pipeline dengan bantuan fungsi init_local_pipeline(). Pada bagian terakhir, pipeline tersebut dijalankan menggunakan Apache Beam sebagai pipeline orchestrator.
if __name__ == "__main__":
    logging.set_verbosity(logging.INFO)
    
    from modules.components import init_components
    
    components = init_components(
        DATA_ROOT,
        training_module=TRAINER_MODULE_FILE,
        transform_module=TRANSFORM_MODULE_FILE,
        training_steps=5000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir,
    )
    
    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)


ImportError: cannot import name 'init_components' from 'modules.components' (e:\G-Drive\Catatan Kerja\Pelatihan\DICODING\MLOPS\dicoding-mlops-sub-2\modules\components.py)