In [1]:
# import lib
import os
import sys
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from modules import components


In [2]:
DATA_ROOT = "data"
PIPELINE_NAME = "annisams11-pipeline"

TRANSFORM_MODULE_FILE = "modules/stroke_pred_transform.py"
TUNER_MODULE_FILE = "modules/stroke_pred_tuner.py"
TRAINER_MODULE_FILE = "modules/stroke_pred_trainer.py"

OUTPUT_BASE = "outputs"
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")

In [3]:
def init_local_pipeline(
    components, pipeline_root: Text
) -> pipeline.Pipeline:
    """Init local pipeline

    Args:
        components (dict): tfx components
        pipeline_root (Text): path to pipeline directory

    Returns:
        pipeline.Pipeline: apache beam pipeline orchestration
    """
    logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_args = [
        '--direct_running_mode=multi_processing'
        # 0 auto-detect based on on the number of CPUs available
        # during execution time.
        '----direct_num_workers=0'
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        eam_pipeline_args=beam_args
    )

logging.set_verbosity(logging.INFO)

components = components.init_components({
    'data_dir': DATA_ROOT,
    'transform_module': TRANSFORM_MODULE_FILE,
    'tuner_module': TUNER_MODULE_FILE,
    'training_module': TRAINER_MODULE_FILE,
    'training_steps': 5000,
    'eval_steps': 1000,
    'serving_model_dir': serving_model_dir
})

pipeline = init_local_pipeline(components, pipeline_root)
BeamDagRunner().run(pipeline=pipeline)

Trial 2 Complete [00h 00m 16s]
val_binary_accuracy: 0.9593999981880188

Best val_binary_accuracy So Far: 0.9611999988555908
Total elapsed time: 00h 00m 31s
INFO:tensorflow:Oracle triggered exit


INFO:tensorflow:Oracle triggered exit
INFO:absl:Finished tuning... Tuner ID: tuner0
INFO:absl:Best HyperParameters: {'space': [{'class_name': 'Choice', 'config': {'name': 'units_1', 'default': 128, 'conditions': [], 'values': [128, 256, 512], 'ordered': True}}, {'class_name': 'Choice', 'config': {'name': 'units_2', 'default': 32, 'conditions': [], 'values': [32, 64, 128], 'ordered': True}}, {'class_name': 'Choice', 'config': {'name': 'units_3', 'default': 8, 'conditions': [], 'values': [8, 16, 32], 'ordered': True}}, {'class_name': 'Choice', 'config': {'name': 'learning_rate', 'default': 0.01, 'conditions': [], 'values': [0.01, 0.001, 0.0001], 'ordered': True}}], 'values': {'units_1': 256, 'units_2': 128, 'units_3': 16, 'learning_rate': 0.01}}
INFO:absl:Best Hyperparameters are written to output\annisams11-pipeline\Tuner\best_hyperparameters\15\best_hyperparameters.txt.
INFO:absl:Tuner results are written to output\annisams11-pipeline\Tuner\tuner_results\15\tuner_results.json.
INFO:abs

Results summary
Results in output\annisams11-pipeline\Tuner\.system\executor_execution\15\.temp\15\stroke_pred_tuning
Showing 10 best trials
<keras_tuner.engine.objective.Objective object at 0x0000016BAC9CC340>
Trial summary
Hyperparameters:
units_1: 256
units_2: 128
units_3: 16
learning_rate: 0.01
Score: 0.9611999988555908
Trial summary
Hyperparameters:
units_1: 512
units_2: 32
units_3: 8
learning_rate: 0.0001
Score: 0.9593999981880188


INFO:absl:[Trainer] Resolved inputs: ({'schema': [Artifact(artifact: id: 3
type_id: 20
uri: "output\\annisams11-pipeline\\SchemaGen\\schema\\4"
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_since_epoch: 1721464533068
last_update_time_since_epoch: 1721464533068
, artifact_type: id: 20
name: "Schema"
)], 'examples': [Artifact(artifact: id: 17
type_id: 16
uri: "output\\annisams11-pipeline\\Transform\\transformed_examples\\13"
properties {
  key: "split_names"
  value {
    string_value: "[\"eval\", \"train\"]"
  }
}
custom_properties {
  key: "is_external"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "state"
  value {
    string_value: "published"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.11.0"
  }
}
state: LIVE
create_time_s

ImportError: Could not import requested module path 'stroke_pred_trainer@output\\annisams11-pipeline\\_wheels\\tfx_user_code_Trainer-0.0+c9842627d14a7260429e87a5ee0bbb6a28ca35a48a05d0344e6aa6eeb4a3cc2f-py3-none-any.whl'. [while running 'Run[Trainer]']

In [None]:
# def init_local_pipeline(
#     components, pipeline_root: Text
# ) -> pipeline.Pipeline:
#     logging.info(f"Pipeline root set to: {pipeline_root}")
#     beam_args = [
#         "--direct_running_mode=multi_processing"
#         # 0 auto-detect based on on the number of CPUs available
#         # during execution time.
#         "----direct_num_workers=0" 
#     ]

#     return pipeline.Pipeline(
#         pipeline_name=PIPELINE_NAME,
#         pipeline_root=pipeline_root,
#         components=components,
#         enable_cache=True,
#         metadata_connection_config=metadata.sqlite_metadata_connection_config(
#             metadata_path
#         ),
#         eam_pipeline_args=beam_args
#     )

# if __name__ == "__main__":
#     logging.set_verbosity(logging.INFO)

#     from modules.components import init_components

#     components = init_components(
#         DATA_ROOT,
#         transform_module=TRANSFORM_MODULE_FILE,
#         tuner_module=TUNER_MODULE_FILE,
#         training_module=TRAINER_MODULE_FILE,
#         training_steps=5000,
#         eval_steps=1000,
#         serving_model_dir=serving_model_dir,
#     )

#     pipeline = init_local_pipeline(components, pipeline_root)
#     BeamDagRunner().run(pipeline=pipeline)