diff --git a/rl_coach/architectures/tensorflow_components/distributed_tf_utils.py b/rl_coach/architectures/tensorflow_components/distributed_tf_utils.py index ccf3e89ae..2ea216d3a 100644 --- a/rl_coach/architectures/tensorflow_components/distributed_tf_utils.py +++ b/rl_coach/architectures/tensorflow_components/distributed_tf_utils.py @@ -94,7 +94,7 @@ def create_monitored_session(target: tf.train.Server, task_index: int, is_chief=is_chief, hooks=[], checkpoint_dir=checkpoint_dir, - checkpoint_save_secs=checkpoint_save_secs, + save_checkpoint_secs=checkpoint_save_secs, config=config ) diff --git a/rl_coach/base_parameters.py b/rl_coach/base_parameters.py index e23226ebd..03dbf2504 100644 --- a/rl_coach/base_parameters.py +++ b/rl_coach/base_parameters.py @@ -432,14 +432,17 @@ def path(self): class TaskParameters(Parameters): def __init__(self, framework_type: Frameworks=Frameworks.tensorflow, evaluate_only: bool=False, use_cpu: bool=False, - experiment_path='/tmp', seed=None, checkpoint_save_secs=None): + experiment_path='/tmp', seed=None, checkpoint_save_secs=None, checkpoint_restore_dir=None, + checkpoint_save_dir=None): """ :param framework_type: deep learning framework type. currently only tensorflow is supported :param evaluate_only: the task will be used only for evaluating the model :param use_cpu: use the cpu for this task :param experiment_path: the path to the directory which will store all the experiment outputs - :param checkpoint_save_secs: the number of seconds between each checkpoint saving :param seed: a seed to use for the random numbers generator + :param checkpoint_save_secs: the number of seconds between each checkpoint saving + :param checkpoint_restore_dir: the directory to restore the checkpoints from + :param checkpoint_save_dir: the directory to store the checkpoints in """ self.framework_type = framework_type self.task_index = 0 # TODO: not really needed @@ -447,6 +450,8 @@ def __init__(self, framework_type: Frameworks=Frameworks.tensorflow, evaluate_on self.use_cpu = use_cpu self.experiment_path = experiment_path self.checkpoint_save_secs = checkpoint_save_secs + self.checkpoint_restore_dir = checkpoint_restore_dir + self.checkpoint_save_dir = checkpoint_save_dir self.seed = seed @@ -454,7 +459,8 @@ class DistributedTaskParameters(TaskParameters): def __init__(self, framework_type: Frameworks, parameters_server_hosts: str, worker_hosts: str, job_type: str, task_index: int, evaluate_only: bool=False, num_tasks: int=None, num_training_tasks: int=None, use_cpu: bool=False, experiment_path=None, dnd=None, - shared_memory_scratchpad=None, seed=None): + shared_memory_scratchpad=None, seed=None, checkpoint_save_secs=None, checkpoint_restore_dir=None, + checkpoint_save_dir=None): """ :param framework_type: deep learning framework type. currently only tensorflow is supported :param evaluate_only: the task will be used only for evaluating the model @@ -469,9 +475,13 @@ def __init__(self, framework_type: Frameworks, parameters_server_hosts: str, wor :param experiment_path: the path to the directory which will store all the experiment outputs :param dnd: an external DND to use for NEC. This is a workaround needed for a shared DND not using the scratchpad. :param seed: a seed to use for the random numbers generator + :param checkpoint_save_secs: the number of seconds between each checkpoint saving + :param checkpoint_restore_dir: the directory to restore the checkpoints from + :param checkpoint_save_dir: the directory to store the checkpoints in """ super().__init__(framework_type=framework_type, evaluate_only=evaluate_only, use_cpu=use_cpu, - experiment_path=experiment_path, seed=seed) + experiment_path=experiment_path, seed=seed, checkpoint_save_secs=checkpoint_save_secs, + checkpoint_restore_dir=checkpoint_restore_dir, checkpoint_save_dir=checkpoint_save_dir) self.parameters_server_hosts = parameters_server_hosts self.worker_hosts = worker_hosts self.job_type = job_type diff --git a/rl_coach/coach.py b/rl_coach/coach.py index c9cd3045a..19daf40f0 100644 --- a/rl_coach/coach.py +++ b/rl_coach/coach.py @@ -213,13 +213,10 @@ def parse_arguments(parser: argparse.ArgumentParser) -> argparse.Namespace: screen.error("The requested checkpoint folder to load from does not exist.") # no preset was given. check if the user requested to play some environment on its own - if args.preset is None and args.play: - if args.environment_type: - args.agent_type = 'Human' - else: - screen.error('When no preset is given for Coach to run, and the user requests human control over ' - 'the environment, the user is expected to input the desired environment_type and level.' - '\nAt least one of these parameters was not given.') + if args.preset is None and args.play and not args.environment_type: + screen.error('When no preset is given for Coach to run, and the user requests human control over ' + 'the environment, the user is expected to input the desired environment_type and level.' + '\nAt least one of these parameters was not given.') elif args.preset and args.play: screen.error("Both the --preset and the --play flags were set. These flags can not be used together. " "For human control, please use the --play flag together with the environment type flag (-et)") @@ -428,24 +425,8 @@ def main(): parser.add_argument('-dm', '--dump_mp4', help="(flag) Enable the mp4 saving functionality.", action='store_true') - parser.add_argument('-at', '--agent_type', - help="(string) Choose an agent type class to override on top of the selected preset. " - "If no preset is defined, a preset can be set from the command-line by combining settings " - "which are set by using --agent_type, --experiment_type, --environemnt_type", - default=None, - type=str) parser.add_argument('-et', '--environment_type', - help="(string) Choose an environment type class to override on top of the selected preset." - "If no preset is defined, a preset can be set from the command-line by combining settings " - "which are set by using --agent_type, --experiment_type, --environemnt_type", - default=None, - type=str) - parser.add_argument('-ept', '--exploration_policy_type', - help="(string) Choose an exploration policy type class to override on top of the selected " - "preset." - "If no preset is defined, a preset can be set from the command-line by combining settings " - "which are set by using --agent_type, --experiment_type, --environemnt_type" - , + help="(string) Choose an environment type class to override on top of the selected preset.", default=None, type=str) parser.add_argument('-lvl', '--level', @@ -546,13 +527,16 @@ def main(): # Single-threaded runs if args.num_workers == 1: # Start the training or evaluation - task_parameters = TaskParameters(framework_type=args.framework, - evaluate_only=args.evaluate, - experiment_path=args.experiment_path, - seed=args.seed, - use_cpu=args.use_cpu, - checkpoint_save_secs=args.checkpoint_save_secs) - task_parameters.__dict__ = add_items_to_dict(task_parameters.__dict__, args.__dict__) + task_parameters = TaskParameters( + framework_type=args.framework, + evaluate_only=args.evaluate, + experiment_path=args.experiment_path, + seed=args.seed, + use_cpu=args.use_cpu, + checkpoint_save_secs=args.checkpoint_save_secs, + checkpoint_restore_dir=args.checkpoint_restore_dir, + checkpoint_save_dir=args.checkpoint_save_dir + ) start_graph(graph_manager=graph_manager, task_parameters=task_parameters) @@ -575,19 +559,24 @@ class CommManager(BaseManager): def start_distributed_task(job_type, task_index, evaluation_worker=False, shared_memory_scratchpad=shared_memory_scratchpad): - task_parameters = DistributedTaskParameters(framework_type=args.framework, - parameters_server_hosts=ps_hosts, - worker_hosts=worker_hosts, - job_type=job_type, - task_index=task_index, - evaluate_only=evaluation_worker, - use_cpu=args.use_cpu, - num_tasks=total_tasks, # training tasks + 1 evaluation task - num_training_tasks=args.num_workers, - experiment_path=args.experiment_path, - shared_memory_scratchpad=shared_memory_scratchpad, - seed=args.seed+task_index if args.seed is not None else None) # each worker gets a different seed - task_parameters.__dict__ = add_items_to_dict(task_parameters.__dict__, args.__dict__) + task_parameters = DistributedTaskParameters( + framework_type=args.framework, + parameters_server_hosts=ps_hosts, + worker_hosts=worker_hosts, + job_type=job_type, + task_index=task_index, + evaluate_only=evaluation_worker, + use_cpu=args.use_cpu, + num_tasks=total_tasks, # training tasks + 1 evaluation task + num_training_tasks=args.num_workers, + experiment_path=args.experiment_path, + shared_memory_scratchpad=shared_memory_scratchpad, + seed=args.seed+task_index if args.seed is not None else None, # each worker gets a different seed + checkpoint_save_secs=args.checkpoint_save_secs, + checkpoint_restore_dir=args.checkpoint_restore_dir, + checkpoint_save_dir=args.checkpoint_save_dir + ) + # we assume that only the evaluation workers are rendering graph_manager.visualization_parameters.render = args.render and evaluation_worker p = Process(target=start_graph, args=(graph_manager, task_parameters)) @@ -607,7 +596,7 @@ def start_distributed_task(job_type, task_index, evaluation_worker=False, workers.append(start_distributed_task("worker", task_index)) # evaluation worker - if args.evaluation_worker: + if args.evaluation_worker or args.render: evaluation_worker = start_distributed_task("worker", args.num_workers, evaluation_worker=True) # wait for all workers diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index 77438853b..422f724fd 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -100,6 +100,8 @@ def __init__(self, self.preset_validation_params = PresetValidationParameters() self.reset_required = False + # timers + self.graph_creation_time = None self.last_checkpoint_saving_time = time.time() # counters @@ -520,6 +522,8 @@ def occasionally_save_checkpoint(self): self.save_checkpoint() def save_checkpoint(self): + if self.task_parameters.checkpoint_save_dir is None: + self.task_parameters.checkpoint_save_dir = os.path.join(self.task_parameters.experiment_path, 'checkpoint') checkpoint_path = os.path.join(self.task_parameters.checkpoint_save_dir, "{}_Step-{}.ckpt".format( self.checkpoint_id,