From 801aed5e1000e9a741034163e3fdea2b0d245f57 Mon Sep 17 00:00:00 2001 From: gouravr Date: Sat, 15 Dec 2018 12:26:31 -0800 Subject: [PATCH 01/10] Changes to avoid memory leak in rollout worker Currently in rollout worker, we call restore_checkpoint repeatedly to load the latest model in memory. The restore checkpoint functions calls checkpoint_saver. Checkpoint saver uses GlobalVariablesSaver which does not release the references of the previous model variables. This leads to the situation where the memory keeps on growing before crashing the rollout worker. This change avoid using the checkpoint saver in the rollout worker as I believe it is not needed in this code path. Also added a test to easily reproduce the issue using CartPole example. We were also seeing this issue with the AWS DeepRacer implementation and the current implementation avoid the memory leak there as well. --- rl_coach/graph_managers/graph_manager.py | 3 ++- .../test_basic_rl_graph_manager.py | 26 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index d13a59b28..b9013fd95 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -562,7 +562,8 @@ def restore_checkpoint(self): screen.warning("No checkpoint to restore in: {}".format(self.task_parameters.checkpoint_restore_dir)) else: screen.log_title("Loading checkpoint: {}".format(checkpoint.model_checkpoint_path)) - self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) + if not hasattr(self.agent_params.memory, 'memory_backend_params') or self.agent_params.memory.memory_backend_params.run_type != str(RunType.ROLLOUT_WORKER): + self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index 214ef31e3..a572fd9c6 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -1,8 +1,10 @@ import os import sys +import gc sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) import tensorflow as tf -from rl_coach.base_parameters import TaskParameters, DistributedTaskParameters, Frameworks +from rl_coach.base_parameters import TaskParameters, DistributedTaskParameters, Frameworks, RunType +from rl_coach.memories.backend.memory import MemoryBackendParameters from rl_coach.utils import get_open_port from multiprocessing import Process from tensorflow import logging @@ -41,12 +43,34 @@ def test_basic_rl_graph_manager_with_cartpole_dqn(): experiment_path="./experiments/test")) # graph_manager.improve() +# Test for identifying memory leak in restore_checkpoint +@pytest.mark.unit_test +def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore(): + tf.reset_default_graph() + from rl_coach.presets.CartPole_DQN import graph_manager + assert graph_manager + graph_manager.create_graph(task_parameters=TaskParameters(framework_type=Frameworks.tensorflow, + experiment_path="./experiments/test", + apply_stop_condition=True)) + graph_manager.improve() + graph_manager.save_checkpoint() + + graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" + graph_manager.agent_params.memory.register_var('memory_backend_params', + MemoryBackendParameters(store_type=None, + orchestrator_type=None, + run_type=str(RunType.ROLLOUT_WORKER))) + while True: + graph_manager.restore_checkpoint() + gc.collect() + if __name__ == '__main__': pass # test_basic_rl_graph_manager_with_pong_a3c() # test_basic_rl_graph_manager_with_ant_a3c() # test_basic_rl_graph_manager_with_pong_nec() + # test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore() # test_basic_rl_graph_manager_with_cartpole_dqn() #test_basic_rl_graph_manager_multithreaded_with_pong_a3c() #test_basic_rl_graph_manager_with_doom_basic_dqn() \ No newline at end of file From b8d21c73bf939ee160538082709b281a790618ef Mon Sep 17 00:00:00 2001 From: gouravr Date: Sun, 16 Dec 2018 10:56:00 -0800 Subject: [PATCH 02/10] comment out the part of test in 'test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore' that run in infinite loop --- .../test_basic_rl_graph_manager.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index a572fd9c6..fb5ba9df7 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -52,17 +52,17 @@ def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restor graph_manager.create_graph(task_parameters=TaskParameters(framework_type=Frameworks.tensorflow, experiment_path="./experiments/test", apply_stop_condition=True)) - graph_manager.improve() - graph_manager.save_checkpoint() - - graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" - graph_manager.agent_params.memory.register_var('memory_backend_params', - MemoryBackendParameters(store_type=None, - orchestrator_type=None, - run_type=str(RunType.ROLLOUT_WORKER))) - while True: - graph_manager.restore_checkpoint() - gc.collect() + # graph_manager.improve() + # graph_manager.save_checkpoint() + # + # graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" + # graph_manager.agent_params.memory.register_var('memory_backend_params', + # MemoryBackendParameters(store_type=None, + # orchestrator_type=None, + # run_type=str(RunType.ROLLOUT_WORKER))) + # while True: + # graph_manager.restore_checkpoint() + # gc.collect() if __name__ == '__main__': From c694766faddd72b2b3966430f9f56ace27693606 Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Tue, 25 Dec 2018 20:50:34 -0800 Subject: [PATCH 03/10] Avoid Memory Leak in Rollout worker ISSUE: When we restore checkpoints, we create new nodes in the Tensorflow graph. This happens when we assign new value (op node) to RefVariable in GlobalVariableSaver. With every restore the size of TF graph increases as new nodes are created and old unused nodes are not removed from the graph. This causes the memory leak in restore_checkpoint codepath. FIX: We reset the Tensorflow graph and recreate the Global, Online and Target networks on every restore. This ensures that the old unused nodes in TF graph is dropped. --- rl_coach/agents/agent.py | 6 ++++++ rl_coach/graph_managers/graph_manager.py | 18 +++++++++++------- .../test_basic_rl_graph_manager.py | 4 ---- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/rl_coach/agents/agent.py b/rl_coach/agents/agent.py index a6b529764..059cae8cb 100644 --- a/rl_coach/agents/agent.py +++ b/rl_coach/agents/agent.py @@ -953,6 +953,12 @@ def restore_checkpoint(self, checkpoint_dir: str) -> None: self.input_filter.restore_state_from_checkpoint(checkpoint_dir, checkpoint_prefix) self.pre_network_filter.restore_state_from_checkpoint(checkpoint_dir, checkpoint_prefix) + if self.ap.task_parameters.framework_type == Frameworks.tensorflow: + import tensorflow as tf + tf.reset_default_graph() + # Recreate all the networks of the agent + self.networks = self.create_networks() + # no output filters currently have an internal state to restore # self.output_filter.restore_state_from_checkpoint(checkpoint_dir) diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index b9013fd95..10f314a54 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -150,7 +150,7 @@ def create_graph(self, task_parameters: TaskParameters=TaskParameters()): # create a session (it needs to be created after all the graph ops were created) self.sess = None - self.create_session(task_parameters=task_parameters) + self.restore_checkpoint() self._phase = self.phase = RunPhase.UNDEFINED @@ -261,8 +261,6 @@ def create_session(self, task_parameters: TaskParameters): self.checkpoint_saver = SaverCollection() for level in self.level_managers: self.checkpoint_saver.update(level.collect_savers()) - # restore from checkpoint if given - self.restore_checkpoint() def save_graph(self) -> None: """ @@ -558,14 +556,20 @@ def restore_checkpoint(self): else: checkpoint = get_checkpoint_state(self.task_parameters.checkpoint_restore_dir) + # As part of this restore, Agent recreates the global, target and online networks + [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] + + # Recreate the session to use the new TF Graphs + self.create_session(self.task_parameters) + if checkpoint is None: screen.warning("No checkpoint to restore in: {}".format(self.task_parameters.checkpoint_restore_dir)) else: screen.log_title("Loading checkpoint: {}".format(checkpoint.model_checkpoint_path)) - if not hasattr(self.agent_params.memory, 'memory_backend_params') or self.agent_params.memory.memory_backend_params.run_type != str(RunType.ROLLOUT_WORKER): - self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) - - [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] + self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) + else: + # Create the session to use the new TF Graphs + self.create_session(self.task_parameters) def _get_checkpoint_state_tf(self): import tensorflow as tf diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index fb5ba9df7..e0d958518 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -56,10 +56,6 @@ def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restor # graph_manager.save_checkpoint() # # graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" - # graph_manager.agent_params.memory.register_var('memory_backend_params', - # MemoryBackendParameters(store_type=None, - # orchestrator_type=None, - # run_type=str(RunType.ROLLOUT_WORKER))) # while True: # graph_manager.restore_checkpoint() # gc.collect() From 740f7937cd08b61a7f794cf747374e9adad90e54 Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Tue, 25 Dec 2018 21:52:07 -0800 Subject: [PATCH 04/10] Updated comments --- rl_coach/graph_managers/graph_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index 10f314a54..83d54d331 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -559,7 +559,7 @@ def restore_checkpoint(self): # As part of this restore, Agent recreates the global, target and online networks [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] - # Recreate the session to use the new TF Graphs + # Recreate session. This will help use the new Tensorflow Graph created in above step. self.create_session(self.task_parameters) if checkpoint is None: @@ -568,7 +568,7 @@ def restore_checkpoint(self): screen.log_title("Loading checkpoint: {}".format(checkpoint.model_checkpoint_path)) self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) else: - # Create the session to use the new TF Graphs + # Create new session self.create_session(self.task_parameters) def _get_checkpoint_state_tf(self): From 2461892c9e374deca34b1e7b71edf7eb1c3acc1b Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Wed, 2 Jan 2019 22:33:37 -0800 Subject: [PATCH 05/10] Revert "Updated comments" This reverts commit 740f7937cd08b61a7f794cf747374e9adad90e54. --- rl_coach/graph_managers/graph_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index 83d54d331..10f314a54 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -559,7 +559,7 @@ def restore_checkpoint(self): # As part of this restore, Agent recreates the global, target and online networks [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] - # Recreate session. This will help use the new Tensorflow Graph created in above step. + # Recreate the session to use the new TF Graphs self.create_session(self.task_parameters) if checkpoint is None: @@ -568,7 +568,7 @@ def restore_checkpoint(self): screen.log_title("Loading checkpoint: {}".format(checkpoint.model_checkpoint_path)) self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) else: - # Create new session + # Create the session to use the new TF Graphs self.create_session(self.task_parameters) def _get_checkpoint_state_tf(self): From 6dd7ae2343df2f5def237a778210493d8ef6ee22 Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Wed, 2 Jan 2019 22:35:06 -0800 Subject: [PATCH 06/10] Revert "Avoid Memory Leak in Rollout worker" This reverts commit c694766faddd72b2b3966430f9f56ace27693606. --- rl_coach/agents/agent.py | 6 ------ rl_coach/graph_managers/graph_manager.py | 18 +++++++----------- .../test_basic_rl_graph_manager.py | 4 ++++ 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/rl_coach/agents/agent.py b/rl_coach/agents/agent.py index 059cae8cb..a6b529764 100644 --- a/rl_coach/agents/agent.py +++ b/rl_coach/agents/agent.py @@ -953,12 +953,6 @@ def restore_checkpoint(self, checkpoint_dir: str) -> None: self.input_filter.restore_state_from_checkpoint(checkpoint_dir, checkpoint_prefix) self.pre_network_filter.restore_state_from_checkpoint(checkpoint_dir, checkpoint_prefix) - if self.ap.task_parameters.framework_type == Frameworks.tensorflow: - import tensorflow as tf - tf.reset_default_graph() - # Recreate all the networks of the agent - self.networks = self.create_networks() - # no output filters currently have an internal state to restore # self.output_filter.restore_state_from_checkpoint(checkpoint_dir) diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index 10f314a54..b9013fd95 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -150,7 +150,7 @@ def create_graph(self, task_parameters: TaskParameters=TaskParameters()): # create a session (it needs to be created after all the graph ops were created) self.sess = None - self.restore_checkpoint() + self.create_session(task_parameters=task_parameters) self._phase = self.phase = RunPhase.UNDEFINED @@ -261,6 +261,8 @@ def create_session(self, task_parameters: TaskParameters): self.checkpoint_saver = SaverCollection() for level in self.level_managers: self.checkpoint_saver.update(level.collect_savers()) + # restore from checkpoint if given + self.restore_checkpoint() def save_graph(self) -> None: """ @@ -556,20 +558,14 @@ def restore_checkpoint(self): else: checkpoint = get_checkpoint_state(self.task_parameters.checkpoint_restore_dir) - # As part of this restore, Agent recreates the global, target and online networks - [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] - - # Recreate the session to use the new TF Graphs - self.create_session(self.task_parameters) - if checkpoint is None: screen.warning("No checkpoint to restore in: {}".format(self.task_parameters.checkpoint_restore_dir)) else: screen.log_title("Loading checkpoint: {}".format(checkpoint.model_checkpoint_path)) - self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) - else: - # Create the session to use the new TF Graphs - self.create_session(self.task_parameters) + if not hasattr(self.agent_params.memory, 'memory_backend_params') or self.agent_params.memory.memory_backend_params.run_type != str(RunType.ROLLOUT_WORKER): + self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) + + [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] def _get_checkpoint_state_tf(self): import tensorflow as tf diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index e0d958518..fb5ba9df7 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -56,6 +56,10 @@ def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restor # graph_manager.save_checkpoint() # # graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" + # graph_manager.agent_params.memory.register_var('memory_backend_params', + # MemoryBackendParameters(store_type=None, + # orchestrator_type=None, + # run_type=str(RunType.ROLLOUT_WORKER))) # while True: # graph_manager.restore_checkpoint() # gc.collect() From 779d3694b4ebf12ec1896feff7e4fc231c89dc42 Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Wed, 2 Jan 2019 22:36:05 -0800 Subject: [PATCH 07/10] Revert "comment out the part of test in 'test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore' that run in infinite loop" This reverts commit b8d21c73bf939ee160538082709b281a790618ef. --- .../test_basic_rl_graph_manager.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index fb5ba9df7..a572fd9c6 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -52,17 +52,17 @@ def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restor graph_manager.create_graph(task_parameters=TaskParameters(framework_type=Frameworks.tensorflow, experiment_path="./experiments/test", apply_stop_condition=True)) - # graph_manager.improve() - # graph_manager.save_checkpoint() - # - # graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" - # graph_manager.agent_params.memory.register_var('memory_backend_params', - # MemoryBackendParameters(store_type=None, - # orchestrator_type=None, - # run_type=str(RunType.ROLLOUT_WORKER))) - # while True: - # graph_manager.restore_checkpoint() - # gc.collect() + graph_manager.improve() + graph_manager.save_checkpoint() + + graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" + graph_manager.agent_params.memory.register_var('memory_backend_params', + MemoryBackendParameters(store_type=None, + orchestrator_type=None, + run_type=str(RunType.ROLLOUT_WORKER))) + while True: + graph_manager.restore_checkpoint() + gc.collect() if __name__ == '__main__': From c377363e505d3223df0136f14e415ecd5e2bceaa Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Wed, 2 Jan 2019 22:37:12 -0800 Subject: [PATCH 08/10] Revert "Changes to avoid memory leak in rollout worker" This reverts commit 801aed5e1000e9a741034163e3fdea2b0d245f57. --- rl_coach/graph_managers/graph_manager.py | 3 +-- .../test_basic_rl_graph_manager.py | 26 +------------------ 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/rl_coach/graph_managers/graph_manager.py b/rl_coach/graph_managers/graph_manager.py index b9013fd95..d13a59b28 100644 --- a/rl_coach/graph_managers/graph_manager.py +++ b/rl_coach/graph_managers/graph_manager.py @@ -562,8 +562,7 @@ def restore_checkpoint(self): screen.warning("No checkpoint to restore in: {}".format(self.task_parameters.checkpoint_restore_dir)) else: screen.log_title("Loading checkpoint: {}".format(checkpoint.model_checkpoint_path)) - if not hasattr(self.agent_params.memory, 'memory_backend_params') or self.agent_params.memory.memory_backend_params.run_type != str(RunType.ROLLOUT_WORKER): - self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) + self.checkpoint_saver.restore(self.sess, checkpoint.model_checkpoint_path) [manager.restore_checkpoint(self.task_parameters.checkpoint_restore_dir) for manager in self.level_managers] diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index a572fd9c6..214ef31e3 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -1,10 +1,8 @@ import os import sys -import gc sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) import tensorflow as tf -from rl_coach.base_parameters import TaskParameters, DistributedTaskParameters, Frameworks, RunType -from rl_coach.memories.backend.memory import MemoryBackendParameters +from rl_coach.base_parameters import TaskParameters, DistributedTaskParameters, Frameworks from rl_coach.utils import get_open_port from multiprocessing import Process from tensorflow import logging @@ -43,34 +41,12 @@ def test_basic_rl_graph_manager_with_cartpole_dqn(): experiment_path="./experiments/test")) # graph_manager.improve() -# Test for identifying memory leak in restore_checkpoint -@pytest.mark.unit_test -def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore(): - tf.reset_default_graph() - from rl_coach.presets.CartPole_DQN import graph_manager - assert graph_manager - graph_manager.create_graph(task_parameters=TaskParameters(framework_type=Frameworks.tensorflow, - experiment_path="./experiments/test", - apply_stop_condition=True)) - graph_manager.improve() - graph_manager.save_checkpoint() - - graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" - graph_manager.agent_params.memory.register_var('memory_backend_params', - MemoryBackendParameters(store_type=None, - orchestrator_type=None, - run_type=str(RunType.ROLLOUT_WORKER))) - while True: - graph_manager.restore_checkpoint() - gc.collect() - if __name__ == '__main__': pass # test_basic_rl_graph_manager_with_pong_a3c() # test_basic_rl_graph_manager_with_ant_a3c() # test_basic_rl_graph_manager_with_pong_nec() - # test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore() # test_basic_rl_graph_manager_with_cartpole_dqn() #test_basic_rl_graph_manager_multithreaded_with_pong_a3c() #test_basic_rl_graph_manager_with_doom_basic_dqn() \ No newline at end of file From 619ea0944e1f652a426045038ee2977996733758 Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Wed, 2 Jan 2019 23:06:44 -0800 Subject: [PATCH 09/10] Avoid Memory Leak in Rollout worker ISSUE: When we restore checkpoints, we create new nodes in the Tensorflow graph. This happens when we assign new value (op node) to RefVariable in GlobalVariableSaver. With every restore the size of TF graph increases as new nodes are created and old unused nodes are not removed from the graph. This causes the memory leak in restore_checkpoint codepath. FIX: We use TF placeholder to update the variables which avoids the memory leak. --- .../tensorflow_components/savers.py | 15 +++++++++++-- .../test_basic_rl_graph_manager.py | 21 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/rl_coach/architectures/tensorflow_components/savers.py b/rl_coach/architectures/tensorflow_components/savers.py index 38a36ee94..0f7ddbf8e 100644 --- a/rl_coach/architectures/tensorflow_components/savers.py +++ b/rl_coach/architectures/tensorflow_components/savers.py @@ -32,6 +32,14 @@ def __init__(self, name): # target network is never saved or restored directly from checkpoint, so we are removing all its variables from the list # the target network would be synched back from the online network in graph_manager.improve(...), at the beginning of the run flow. self._variables = [v for v in self._variables if '/target' not in v.name] + + # Using a placeholder to update the variable during restore to avoid memory leak. + # Ref: https://github.com/tensorflow/tensorflow/issues/4151 + self._variable_placeholders = [tf.placeholder(v.dtype, shape=v.get_shape()) for v in self._variables] + self._variable_update_ops = [] + for i in range(len(self._variables)): + self._variable_update_ops.append(self._variables[i].assign(self._variable_placeholders[i])) + self._saver = tf.train.Saver(self._variables) @property @@ -66,8 +74,11 @@ def restore(self, sess: Any, restore_path: str): # TODO: Can this be more generic so that `global/` and `online/` are not hardcoded here? new_name = var_name.replace('global/', 'online/') variables[new_name] = reader.get_tensor(var_name) - # Assign all variables - sess.run([v.assign(variables[v.name.split(':')[0]]) for v in self._variables]) + + # Assign all variables using placeholder + for i in range(len(self._variables)): + variable_name = self._variables[i].name.split(':')[0] + sess.run(self._variable_update_ops[i], {self._variable_placeholders[i]: variables[variable_name]}) def merge(self, other: 'Saver'): """ diff --git a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py index 214ef31e3..fbdc09803 100644 --- a/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py +++ b/rl_coach/tests/graph_managers/test_basic_rl_graph_manager.py @@ -1,8 +1,10 @@ +import gc import os import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) import tensorflow as tf from rl_coach.base_parameters import TaskParameters, DistributedTaskParameters, Frameworks +from rl_coach.core_types import EnvironmentSteps from rl_coach.utils import get_open_port from multiprocessing import Process from tensorflow import logging @@ -41,6 +43,24 @@ def test_basic_rl_graph_manager_with_cartpole_dqn(): experiment_path="./experiments/test")) # graph_manager.improve() +# Test for identifying memory leak in restore_checkpoint +@pytest.mark.unit_test +def test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore(): + tf.reset_default_graph() + from rl_coach.presets.CartPole_DQN import graph_manager + assert graph_manager + graph_manager.create_graph(task_parameters=TaskParameters(framework_type=Frameworks.tensorflow, + experiment_path="./experiments/test", + apply_stop_condition=True)) + # graph_manager.improve() + # graph_manager.evaluate(EnvironmentSteps(1000)) + # graph_manager.save_checkpoint() + # + # graph_manager.task_parameters.checkpoint_restore_dir = "./experiments/test/checkpoint" + # while True: + # graph_manager.restore_checkpoint() + # graph_manager.evaluate(EnvironmentSteps(1000)) + # gc.collect() if __name__ == '__main__': pass @@ -48,5 +68,6 @@ def test_basic_rl_graph_manager_with_cartpole_dqn(): # test_basic_rl_graph_manager_with_ant_a3c() # test_basic_rl_graph_manager_with_pong_nec() # test_basic_rl_graph_manager_with_cartpole_dqn() + # test_basic_rl_graph_manager_with_cartpole_dqn_and_repeated_checkpoint_restore() #test_basic_rl_graph_manager_multithreaded_with_pong_a3c() #test_basic_rl_graph_manager_with_doom_basic_dqn() \ No newline at end of file From b1e9ea48d86807382c5feca0d18a6bf71f5caa03 Mon Sep 17 00:00:00 2001 From: Gourav Roy Date: Thu, 3 Jan 2019 15:08:34 -0800 Subject: [PATCH 10/10] Refactored GlobalVariableSaver --- .../architectures/tensorflow_components/savers.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/rl_coach/architectures/tensorflow_components/savers.py b/rl_coach/architectures/tensorflow_components/savers.py index 0f7ddbf8e..67c0c8b67 100644 --- a/rl_coach/architectures/tensorflow_components/savers.py +++ b/rl_coach/architectures/tensorflow_components/savers.py @@ -35,10 +35,12 @@ def __init__(self, name): # Using a placeholder to update the variable during restore to avoid memory leak. # Ref: https://github.com/tensorflow/tensorflow/issues/4151 - self._variable_placeholders = [tf.placeholder(v.dtype, shape=v.get_shape()) for v in self._variables] + self._variable_placeholders = [] self._variable_update_ops = [] - for i in range(len(self._variables)): - self._variable_update_ops.append(self._variables[i].assign(self._variable_placeholders[i])) + for v in self._variables: + variable_placeholder = tf.placeholder(v.dtype, shape=v.get_shape()) + self._variable_placeholders.append(variable_placeholder) + self._variable_update_ops.append(v.assign(variable_placeholder)) self._saver = tf.train.Saver(self._variables) @@ -76,9 +78,8 @@ def restore(self, sess: Any, restore_path: str): variables[new_name] = reader.get_tensor(var_name) # Assign all variables using placeholder - for i in range(len(self._variables)): - variable_name = self._variables[i].name.split(':')[0] - sess.run(self._variable_update_ops[i], {self._variable_placeholders[i]: variables[variable_name]}) + placeholder_dict = {ph: variables[v.name.split(':')[0]] for ph, v in zip(self._variable_placeholders, self._variables)} + sess.run(self._variable_update_ops, placeholder_dict) def merge(self, other: 'Saver'): """