Skip to content

Commit

Permalink
Added current_version, experiment_dir and dataloader to be experiment…
Browse files Browse the repository at this point in the history
…s properties
  • Loading branch information
ahmed-shariff committed Jul 23, 2019
1 parent a4c2d12 commit ea1e81f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 33 deletions.
17 changes: 10 additions & 7 deletions examples/sample-project/experiments/sample_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ class TestingExperiment(ExperimentABC):
def __init__(self, versions, **args):
super().__init__(versions, **args)

def setup_model(self, version, experiment_dir):
def setup_model(self, ):
self.model = An_ML_Model()
self.model.hyperparameter = version["hyperparameter"]
self.model.hyperparameter = self.current_version["hyperparameter"]

def pre_execution_hook(self, version, experiment_dir, exec_mode=ExecutionModeKeys.TEST):
def pre_execution_hook(self, mode=ExecutionModeKeys.TEST):
self.log("Pre execution")
self.log("Version spec: {}".format(version))
self.log("Version spec: {}".format(self.current_version))
self.log(f"Experiment dir: {self.experiment_dir}")
self.log(f"Dataloader: {self.dataloader}")
self.current_version = self.current_version

def get_trained_step_count(self):
return 10

def train_loop(self, input_fn, steps, version):
def train_loop(self, input_fn, steps):
metric_container = MetricContainer(metrics=['1', 'b', 'c'], track_average_epoc_count=5)
metric_container = MetricContainer(metrics=[{'metrics': ['a', 'b', 'c']},
{'metrics': ['2', 'd', 'e'],
Expand All @@ -71,7 +74,7 @@ def train_loop(self, input_fn, steps, version):
self.log("trained: {}".format(self.model.train()))
self.copy_related_files("experiments/exports")

def evaluate_loop(self, input_fn, steps, version):
def evaluate_loop(self, input_fn, steps):
self.log("steps: {}".format(steps))
self.log("calling input fn")
input_fn()
Expand All @@ -80,7 +83,7 @@ def evaluate_loop(self, input_fn, steps, version):
metrics.b.update(2, 1)
return metrics

def export_model(self, version):
def export_model(self):
self.log("YAY! Exported!")


Expand Down
26 changes: 16 additions & 10 deletions mlpipeline/_pipeline_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
current_experiment._experiment_dir = experiment_dir

try:
current_experiment.setup_model(version_spec, experiment_dir)
current_experiment.setup_model()
except NotImplementedError:
log("`setup_model` not implemented. Ignoring.")
log("Exporting model for version: {}".format(version_name))
current_experiment.export_model(version_spec)
current_experiment.export_model()
log("Exported model {}".format(version_name))
log_special_tokens.log_experiment_ended()
if CONFIG.cmd_mode:
Expand Down Expand Up @@ -155,6 +155,11 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
"contents in the experiment_dir will not be changed", level=logging.WARNING)

run_id = _get_mlflow_run_id(tracking_uri, current_experiment, clean_experiment_dir, version_name)

current_experiment._current_version = version_spec
current_experiment._experiment_dir = experiment_dir
current_experiment._dataloader = dataloader

mlflow.start_run(run_name=version_name, run_id=run_id)
# Logging the versions params
for k, v in version_spec.items():
Expand All @@ -167,11 +172,11 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions

try:
try:
current_experiment.setup_model(version_spec, experiment_dir)
current_experiment.setup_model()
except NotImplementedError:
log("`setup_model` not implemented. Ignoring.")
try:
current_experiment.pre_execution_hook(version_spec, experiment_dir)
current_experiment.pre_execution_hook(mode=CONFIG.experiment_mode)
except NotImplementedError:
log("`pre_execution_hook` not implemented. Ignoring.")
os.makedirs(experiment_dir, exist_ok=True)
Expand Down Expand Up @@ -205,8 +210,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
try:
train_output = current_experiment.train_loop(
input_fn=input_fn,
steps=classification_steps,
version=version_spec)
steps=classification_steps)
except Exception as e:
train_results = "Training loop failed: {0}".format(str(e))
log(train_results, logging.ERROR)
Expand All @@ -230,8 +234,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
.format(train_eval_steps if train_eval_steps is not None else 'unspecified'))
train_results = current_experiment.evaluate_loop(
input_fn=input_fn,
steps=train_eval_steps,
version=version_spec)
steps=train_eval_steps)
log("Eval on train set: ")
if isinstance(train_results, MetricContainer):
train_results = train_results.log_metrics(complete_epoc=True, name_prefix="TRAIN_")
Expand All @@ -258,8 +261,7 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
log("Testing evaluation started: {0} steps".
format(test__eval_steps if test__eval_steps is not None else 'unspecified'))
eval_results = current_experiment.evaluate_loop(input_fn=input_fn,
steps=test__eval_steps,
version=version_spec)
steps=test__eval_steps)
log("Eval on train set:")
if isinstance(eval_results, MetricContainer):
eval_results = eval_results.log_metrics(complete_epoc=True, name_prefix="TEST_")
Expand All @@ -276,6 +278,10 @@ def _experiment_main_loop(file_path, whitelist_versions=None, blacklist_versions
log(traceback.format_exc(), logging.ERROR)
if CONFIG.experiment_mode == ExperimentModeKeys.TEST:
raise
try:
current_experiment.post_execution_hook(mode=CONFIG.experiment_mode)
except NotImplementedError:
log("`post_execution_hook` not implemented. Ignoring.")

log("Experiment evaluation complete")
_add_to_and_return_result_string("Eval on train set: {0}".format(train_results))
Expand Down
73 changes: 57 additions & 16 deletions mlpipeline/base/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from mlpipeline.utils import log
from mlpipeline.utils import copy_related_files
from mlpipeline.utils import _collect_related_files
import logging


class ExperimentABC():
Expand All @@ -24,7 +25,7 @@ def __init__(self, versions, allow_delete_experiment_dir=False, reset_steps=Fals
passed to the `pre_execution_hook` will be cleared, essentially removing any
saved information of the experiment. This can be used when the experiment
training needs to be reset.
reset_steps -- if true, the number of steps that has elapsed will be ignored and number of steps will be
reset_steps -- (DEPRECATING) if true, the number of steps that has elapsed will be ignored and number of steps will be
calculated as if no training as occurred. if false, the steps will be calucated by deducting
the value returned by `get_trained_step_count`.
'''
Expand All @@ -34,33 +35,73 @@ def __init__(self, versions, allow_delete_experiment_dir=False, reset_steps=Fals
raise ValueError("versions should be an instance of `Versions` class, but recived: {0}".format(type(versions)))
self.allow_delete_experiment_dir = allow_delete_experiment_dir
self.reset_steps = reset_steps
self._current_version = None
self._experiment_dir = None
self._dataloader = None

def _set_dataloader(self, value):
self.log("`dataloader` is being set, which is not recommended.", level=logging.WARN)
self._dataloader = value

def _set_experiment_dir(self, value):
self.log("`experiment_dir` is being set, which is not recommended.", level=logging.WARN)
self._experiment_dir = value

def _set_current_version(self, value):
self.log("`current_version` is being set, which is not recommended.", level=logging.WARN)
self._current_version = value

def _get_dataloader(self):
return self._dataloader

def _get_experiment_dir(self):
return self._experiment_dir

def _get_current_version(self):
return self._current_version

current_version = property(
fget=_get_current_version,
fset=_set_current_version,
doc="The current version being executed. Will set by the pipeline")
experiment_dir = property(
fget=_get_experiment_dir,
fset=_set_experiment_dir,
doc="The experiment directory for the current run. Will be set by the mlpipeline")
dataloader = property(
fget=_get_dataloader,
fset=_set_dataloader,
doc="The dataloader set for a given execution of a experiment. Will be set by the mlpipeline")

# TODO: Does the exec_mode have to be here?
def pre_execution_hook(self, version, experiment_dir, exec_mode=ExecutionModeKeys.TEST):
def pre_execution_hook(self, mode=ExecutionModeKeys.TEST):
'''
Before execution, this method will be called to set the version obtained from `self.versions`. Also `experiment_dir` will provide the destination to save the experiment in as specified in the config file. The exec_mode will be passed, with on of the keys as specified in `ExecutionModeKeys`. This function can be used to define the experiment's hyperparameters based on the information of the version being executed duering an iteration. This method will be once called before `train_loop` for each version.
'''
raise NotImplementedError
raise NotImplementedError()

def post_execution_hook(self, mode=ExecutionModeKeys.TEST):
raise NotImplementedError()

def setup_model(self, version, experiment_dir):
def setup_model(self):
'''
This function will be called before the 'export_model' and 'pre_execution_hook'. It expects to set the 'self.model' of the Experiment class here. This will be callaed before the train_loop function and the 'export_model' methods. The current version spec will passed to this method.
'''
raise NotImplementedError()

def train_loop(self, input_fn, steps, version):
def train_loop(self, input_fn, steps):
'''
This will be called when the experiment is entering the traning phase. Ideally, what needs to happen in this function is to use the `input_fn` and execute the training loop for a given number of steps which will be passed through `steps`. The input_fn passed here will be the object returned by the `get_train_input` method of the dataloader. In addition, other functionalities can be included here as well, such as saving the experiment parameters during training, etc. Th return value of the method will be logged. The current version spec will passed to this method.
'''
raise NotImplementedError
raise NotImplementedError()

def evaluate_loop(self, input_fn, steps, version):
def evaluate_loop(self, input_fn, steps):
'''
This will be called when the experiment is entering the testing phase following the training phase. Ideally, what needs to happen in this function is to use the input_fn to obtain the inputs and execute the evaluation loop for a given number of steps. The input function passed here will be the object returned by the `get_train_input` and `get_test_input` methods of the dataloader. In addition to that other functionalities can be included here as well, such as saving the experiment parameters, producing additional statistics etc. the return value of the method will be logged. The current version spec will passed to this method.
'''
raise NotImplementedError
raise NotImplementedError()

def export_model(self, version):
def export_model(self):
'''
This method is called when a model is called with the export settings. Either by setting the respecitve command line argument or passing the export parameter in the versions.
'''
Expand All @@ -70,13 +111,13 @@ def get_trained_step_count(self):
'''
This function must return either `None` or a positive integer. The is used to determine how many steps have been completed and assess the number of steps the training should take. This is delegated to the `Experiment` as the process of determining the number is library specific.
'''
raise NotImplementedError
raise NotImplementedError()

def clean_experiment_dir(self, experiment_dir):
'''
This function will be called when a experiment needs to be reset and the directory `experiment_dir` needs to be cleared as well.
'''
raise NotImplementedError
raise NotImplementedError()

def add_to_summery(self, content):
'''
Expand Down Expand Up @@ -115,31 +156,31 @@ def get_train_input(self, mode=ExecutionModeKeys.TRAIN, **kargs):
'''
This function returns an object which will be passed to the `Experiment.train_loop` when executing the training function of the experiment, the same function will be used for evaluation following training using `Experiment.evaluate_loop` . The the object returned by this function would depend on the how the return function will be used in the experiment. (eg: for Tensorflow models the returnn value can be a function object, for pyTorch it can be a Dataset object. In both cases the output of this function will be providing the data used for training)
'''
raise NotImplementedError
raise NotImplementedError()

def get_test_input(self, **kargs):
'''
This function returns an object which will be used to execute the evaluataion following training using `Experiment.evaluate_loop`. The the object returned by this function would depend on the how the return function will be used in the experiment. (eg: for Tensorflow models the returnn value can be a function object, for pyTorch it can be a Dataset object. In both cases the output of this function will be providing the data used for evaluation)
'''
raise NotImplementedError
raise NotImplementedError()

def get_dataloader_summery(self, **kargs):
'''
This function will be called to log a summery of the dataloader when logging the results of a experiment
'''
raise NotImplementedError
raise NotImplementedError()

def get_train_sample_count(self):
'''
returns the number of datapoints being used as the training dataset. This will be used to assess the number of epocs during training and evaluating.
'''
raise NotImplementedError
raise NotImplementedError()

def get_test_sample_count(self):
'''
returns the number of datapoints being used as the testing dataset. This will be used to assess the number of epocs during training and evaluating.
'''
raise NotImplementedError
raise NotImplementedError()

def add_to_summery(self, content):
'''
Expand Down

0 comments on commit ea1e81f

Please sign in to comment.