Skip to content

Commit

Permalink
Added API function to run object directly.
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-shariff committed Apr 5, 2020
1 parent 3f83800 commit d6aedcd
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.org
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
** master
- Renamed dataset_file_path to data_asset in utils.Dataset
- Log dataloader parameters if DataloaderWrapper class is used for the dataloader
- Added API function to run experiment with instantiated experiment object
** 2.0.a.4 [2019-07-24 Wed]
- Added function to load previously executed experiment.
- The experiments are executed with the cwd set to the `experiments_dir`.
Expand Down
23 changes: 15 additions & 8 deletions examples/sample-project/sample_pipeline_execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import subprocess
from mlpipeline.api import (mlpipeline_execute_exeperiment,
mlpipeline_execute_exeperiment_from_script,
mlpipeline_execute_pipeline,
get_experiment,
ExperimentWrapper)
Expand All @@ -9,19 +10,19 @@
def train_experiment_with_whitelist():
print("*"*20, "EXPERIMENT WITH WHITELIST", "*"*20)
subprocess.run(["rm", "-rf", "experiments/outputs"])
mlpipeline_execute_exeperiment("experiments/sample_experiment.py",
"experiments",
ExperimentModeKeys.RUN,
whitelist_versions=["version5"])
mlpipeline_execute_exeperiment_from_script("experiments/sample_experiment.py",
"experiments",
ExperimentModeKeys.RUN,
whitelist_versions=["version5"])


def train_experiment_with_blacklist():
print("*"*20, "EXPERIMENT WITH BLACKLIST", "*"*20)
subprocess.run(["rm", "-rf", "experiments/outputs"])
mlpipeline_execute_exeperiment("experiments/sample_experiment.py",
"experiments",
ExperimentModeKeys.RUN,
blacklist_versions=["version2", "version5"])
mlpipeline_execute_exeperiment_from_script("experiments/sample_experiment.py",
"experiments",
ExperimentModeKeys.RUN,
blacklist_versions=["version2", "version5"])


def train_pipeline_with_whitelist():
Expand All @@ -48,9 +49,15 @@ def load_experiment():
"version5"))


def train_experiment_with_object():
from experiments.sample_experiment import EXPERIMENT
print("*"*20, "PIPELINE WITH OBJECT", "*"*20)
mlpipeline_execute_exeperiment(EXPERIMENT, ExperimentModeKeys.RUN)

if __name__ == "__main__":
train_pipeline_with_blacklist()
train_pipeline_with_whitelist()
train_experiment_with_blacklist()
train_experiment_with_whitelist()
load_experiment()
train_experiment_with_object()
55 changes: 27 additions & 28 deletions mlpipeline/_pipeline_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import mlflow
from multiprocessing import Process
from datetime import datetime
from pathlib import Path
from mlpipeline import (log,
MetricContainer)
from mlpipeline.utils import (log_special_tokens,
Expand Down Expand Up @@ -72,35 +73,31 @@ def __call__(self, message=None, reset_result_string=False, indent=True):
return self.result_string


def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions=None):
def _experiment_main_loop(current_experiment, version_name_s, clean_experiment_dir, config):
'''
Returns False if there are no more versions to execute or a version resulted in an exception
Returns True otherwise.
'''
current_experiment, version_name_s, \
clean_experiment_dir = _get_experiment(file_path,
whitelist_versions=whitelist_versions,
blacklist_versions=blacklist_versions)
_add_to_and_return_result_string = _AddToAndReturnResultString()
if current_experiment is None:
if CONFIG.cmd_mode:
if config.cmd_mode:
sys.exit(3)
else:
return False
log_special_tokens.log_experiment_started()
log("Experiment loaded: {0}".format(current_experiment.name))
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
if config.experiment_mode == ExperimentModeKeys.TEST:
log_special_tokens.log_mode_test()
elif CONFIG.experiment_mode == ExperimentModeKeys.EXPORT:
elif config.experiment_mode == ExperimentModeKeys.EXPORT:
log_special_tokens.log_mode_exporting()
else:
log_special_tokens.log_mode_train()

if CONFIG.experiment_mode == ExperimentModeKeys.EXPORT:
if config.experiment_mode == ExperimentModeKeys.EXPORT:
for version_name, version_spec in version_name_s:
experiment_dir, _ = _get_experiment_dir(current_experiment.name.split(".")[-2],
experiment_dir, _ = _get_experiment_dir(Path(current_experiment.name).stem,
version_spec,
CONFIG.experiment_mode)
config.experiment_mode)
current_experiment._current_version = version_spec
current_experiment._experiment_dir = experiment_dir
dataloader = version_spec[version_parameters.DATALOADER]
Expand All @@ -114,7 +111,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
current_experiment.export_model()
log("Exported model {}".format(version_name))
log_special_tokens.log_experiment_ended()
if CONFIG.cmd_mode:
if config.cmd_mode:
sys.exit(3)
else:
return False
Expand All @@ -130,7 +127,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
else:
log("version loaded: {0} [{1}/{2}]".format(
version_name,
len(CONFIG.executed_experiments[current_experiment.name].version.executed_versions) + 1,
len(config.executed_experiments[current_experiment.name].version.executed_versions) + 1,
len(current_experiment.versions.get_version_names())),
modifier_1=console_colors.GREEN_FG,
modifier_2=console_colors.BOLD)
Expand All @@ -144,10 +141,10 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions

log("Version_spec: {}".format(version_spec))

experiment_dir, tracking_uri = _get_experiment_dir(current_experiment.name.split(".")[-2],
experiment_dir, tracking_uri = _get_experiment_dir(Path(current_experiment.name).stem,
version_spec,
CONFIG.experiment_mode)
record_training = True if CONFIG.experiment_mode != ExperimentModeKeys.TEST else False
config.experiment_mode)
record_training = True if config.experiment_mode != ExperimentModeKeys.TEST else False
if clean_experiment_dir and current_experiment.allow_delete_experiment_dir:
try:
current_experiment.clean_experiment_dir(experiment_dir)
Expand Down Expand Up @@ -188,7 +185,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
except NotImplementedError:
log("`setup_model` not implemented. Ignoring.")
try:
current_experiment.pre_execution_hook(mode=CONFIG.experiment_mode)
current_experiment.pre_execution_hook(mode=config.experiment_mode)
except NotImplementedError:
log("`pre_execution_hook` not implemented. Ignoring.")
os.makedirs(experiment_dir, exist_ok=True)
Expand All @@ -201,7 +198,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
train_eval_steps = dataloader.get_train_sample_count()
except NotImplementedError:
train_eval_steps = None
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
if config.experiment_mode == ExperimentModeKeys.TEST:
test__eval_steps = 1 if test__eval_steps is not None else None
train_eval_steps = 1 if train_eval_steps is not None else None

Expand Down Expand Up @@ -231,7 +228,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
train_results = "Training loop failed: {0}".format(str(e))
log(train_results, logging.ERROR)
log(traceback.format_exc(), logging.ERROR)
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
if config.experiment_mode == ExperimentModeKeys.TEST:
raise

try:
Expand Down Expand Up @@ -263,7 +260,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
train_results = "Training evaluation failed: {0}".format(str(e))
log(train_results, logging.ERROR)
log(traceback.format_exc(), logging.ERROR)
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
if config.experiment_mode == ExperimentModeKeys.TEST:
raise

try:
Expand All @@ -290,13 +287,13 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
eval_results = "Test evaluation failed: {0}".format(str(e))
log(eval_results, logging.ERROR)
log(traceback.format_exc(), logging.ERROR)
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
if config.experiment_mode == ExperimentModeKeys.TEST:
raise
else:
log('Not executing `evaluate_loop` as testing input data is `None`')

try:
current_experiment.post_execution_hook(mode=CONFIG.experiment_mode)
current_experiment.post_execution_hook(mode=config.experiment_mode)
except NotImplementedError:
log("`post_execution_hook` not implemented. Ignoring.")

Expand All @@ -314,17 +311,17 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
_add_to_and_return_result_string("-------------------------------------------")
_add_to_and_return_result_string("DATALOADER SUMMERY:")
_add_to_and_return_result_string(dataloader.summery)
if record_training and not CONFIG.no_log:
if record_training and not config.no_log:
_save_results_to_file(_add_to_and_return_result_string(), current_experiment)

except Exception as e:
mlflow.end_run(mlflow.entities.RunStatus.to_string(mlflow.entities.RunStatus.FAILED))
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
if config.experiment_mode == ExperimentModeKeys.TEST:
raise
else:
log("Exception: {0}".format(str(e)), logging.ERROR)
log(traceback.format_exc(), logging.ERROR)
if CONFIG.cmd_mode:
if config.cmd_mode:
sys.exit(1)
else:
return False
Expand Down Expand Up @@ -583,9 +580,11 @@ def _execute_exeperiment(file_path,
output = _get_experiment(file_path, whitelist_versions, blacklist_versions, True)
multiprocessing_version_quque.put(output[0].versions.get_version_names())
else:
output = _experiment_main_loop(file_path,
whitelist_versions=whitelist_versions,
blacklist_versions=blacklist_versions)
current_experiment, version_name_s, \
clean_experiment_dir = _get_experiment(file_path,
whitelist_versions=whitelist_versions,
blacklist_versions=blacklist_versions)
output = _experiment_main_loop(current_experiment, version_name_s, clean_experiment_dir, CONFIG)
os.chdir(cwd)
return output

Expand Down
68 changes: 58 additions & 10 deletions mlpipeline/api.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
from mlpipeline._pipeline import (_mlpipeline_main_loop, _init_pipeline)
from mlpipeline._pipeline_subprocess import (_execute_exeperiment,
_get_experiment_dir)
from mlpipeline.utils import _load_file_as_module
from mlpipeline.base import ExperimentWrapper
from mlpipeline.entities import ExperimentModeKeys
_get_experiment_dir,
_ExecutedExperiment,
_experiment_main_loop)
from mlpipeline.utils import (_load_file_as_module, PipelineConfig, _VersionLog, set_logger)
from mlpipeline.base import ExperimentWrapper, ExperimentABC
from mlpipeline.entities import (ExperimentModeKeys, console_colors)
from mlpipeline import log

__all__ = ['mlpipeline_execute_exeperiment', 'mlpipeline_execute_pipeline', 'get_experiment', 'ExperimentWrapper']
__all__ = ['mlpipeline_execute_exeperiment', 'mlpipeline_execute_exeperiment_from_script', 'mlpipeline_execute_pipeline', 'get_experiment', 'ExperimentWrapper']


def mlpipeline_execute_pipeline(experiments,
Expand All @@ -26,14 +29,59 @@ def mlpipeline_execute_pipeline(experiments,
_mlpipeline_main_loop(experiments)


def mlpipeline_execute_exeperiment(file_path,
experiments_dir,
# Need to integrate the functionality of the pipeline tracking training processes.
# For now the mlpipeline_execute_exeperiment_from_script is recommended
def mlpipeline_execute_exeperiment(experiment,
experiment_mode=ExperimentModeKeys.TEST,
no_log=False,
whitelist_versions=None,
blacklist_versions=None,
mlflow_tracking_uri=None,
experiments_output_dir=None):
pipeline_config=None):
if pipeline_config is None:
pipeline_config = PipelineConfig()
pipeline_config.output_file = os.path.join(pipeline_config.experiments_dir, "output")
pipeline_config.history_file = os.path.join(pipeline_config.experiments_dir, "history")
pipeline_config.training_history_log_file = os.path.join(pipeline_config.experiments_dir, "t_history")
pipeline_config.log_file = os.path.join(pipeline_config.experiments_dir, "log")
pipeline_config.logger = set_logger(experiment_mode=experiment_mode,
no_log=False,
log_file=pipeline_config.log_file)
if not isinstance(experiment, ExperimentABC):
log("`experiment` is not of type `mlpipeline.base.ExperimentABC`", 20)
experiment.name = experiment.__class__.__name__
experiment._collect_related_files(pipeline_config.experiments_dir)
versions = experiment.versions

log("{0}{1}Processing experiment: {2}{3}".format(console_colors.BOLD,
console_colors.BLUE_FG,
experiment.name,
console_colors.RESET))
if whitelist_versions is not None or blacklist_versions is not None:
versions.filter_versions(whitelist_versions=whitelist_versions,
blacklist_versions=blacklist_versions)

pipeline_config.executed_experiments[experiment.name] = _ExecutedExperiment(_VersionLog(), 0)
if experiment_mode == ExperimentModeKeys.EXPORT:
_experiment_main_loop(experiment, versions.get_versions(), True, pipeline_config)
else:
for v, k in versions.get_versions():
if _experiment_main_loop(experiment, v, True, pipeline_config):
pipeline_config.executed_experiments[experiment.name].version.addExecutingVersion(v, 0)
else:
log("Pipeline Stoped", 30)






def mlpipeline_execute_exeperiment_from_script(file_path,
experiments_dir,
experiment_mode=ExperimentModeKeys.TEST,
no_log=False,
whitelist_versions=None,
blacklist_versions=None,
mlflow_tracking_uri=None,
experiments_output_dir=None):
experiments_dir = os.path.abspath(experiments_dir)
file_path = os.path.relpath(os.path.abspath(file_path), experiments_dir)
while _execute_exeperiment(file_path,
Expand Down

0 comments on commit d6aedcd

Please sign in to comment.