Skip to content

Commit

Permalink
FileObserver setup concurrent-safe (#473)
Browse files Browse the repository at this point in the history
* Create file storage directories concurrent-safe

* File storage uses same method to make run directory for start and queue events

* Remove newline

* Remove trailing whitespace

* Remove trailing whitespace and unused import

* Use fallback of os.makedirs for python 2

* Fix typo

* Test coverage also for python2

* Test make run directory can raise file exist error

* Skip test with unittest.mock if unsupported python version

* Improve readability of max call

Co-Authored-By: Rüdiger Busche <ruedigerbusche@web.de>

* Replace unittest.mock by pytest.monkeypatch

* Separate creation of next run dir to new method

* Revert "Test coverage also for python2"

This reverts commit 025c66a.

* Import python 2 fallback of file exists error

* FileExistsError fallback requires argument

* Avoid pylint errors when redefining builtins

* Shorten lines

* Move race condition catch to outer function when creating run dir
  • Loading branch information
dekuenstle authored and JarnoRFB committed May 16, 2019
1 parent 4cad418 commit 718b798
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 31 deletions.
70 changes: 41 additions & 29 deletions sacred/observers/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import json
import os
import os.path
import tempfile

from shutil import copyfile

from sacred.commandline_options import CommandLineOption
from sacred.dependencies import get_digest
from sacred.observers.base import RunObserver
from sacred.utils import FileNotFoundError, FileExistsError # py2 compat.
from sacred import optional as opt
from sacred.serializer import flatten
# pylint: disable=redefined-builtin
from sacred.utils import FileNotFoundError, FileExistsError # py2 compat.
# pylint: enable=redefined-builtin


DEFAULT_FILE_STORAGE_PRIORITY = 20
Expand Down Expand Up @@ -51,16 +52,47 @@ def __init__(self, basedir, resource_dir, source_dir, template,
self.cout = ""
self.cout_write_cursor = 0

def queued_event(self, ex_info, command, host_info, queue_time, config,
meta_info, _id):
if not os.path.exists(self.basedir):
os.makedirs(self.basedir)
@staticmethod
def _makedirs(name, mode=0o777, exist_ok=False):
""" Wrapper of os.makedirs with fallback
for exist_ok on python 2.
"""
try:
os.makedirs(name, mode, exist_ok)
except TypeError:
if not os.path.exists(name):
os.makedirs(name, mode)

def _make_next_dir(self):
dir_nrs = [int(d) for d in os.listdir(self.basedir)
if os.path.isdir(os.path.join(self.basedir, d)) and
d.isdigit()]
_id = max(dir_nrs + [0]) + 1
new_dir = os.path.join(self.basedir, str(_id))
os.mkdir(new_dir)
self.dir = new_dir # set only if mkdir is successful

def _make_run_dir(self, _id):
self._makedirs(self.basedir, exist_ok=True)
self.dir = None
if _id is None:
self.dir = tempfile.mkdtemp(prefix='run_', dir=self.basedir)
fail_count = 0
while self.dir is None:
try:
self._make_next_dir()
except FileExistsError: # Catch race conditions
if fail_count < 100:
fail_count += 1
else: # expect that something else went wrong
raise
else:
self.dir = os.path.join(self.basedir, str(_id))
os.mkdir(self.dir)

def queued_event(self, ex_info, command, host_info, queue_time, config,
meta_info, _id):
self._make_run_dir(_id)

self.run_entry = {
'experiment': dict(ex_info),
'command': command,
Expand Down Expand Up @@ -91,26 +123,7 @@ def save_sources(self, ex_info):

def started_event(self, ex_info, command, host_info, start_time, config,
meta_info, _id):
if not os.path.exists(self.basedir):
os.makedirs(self.basedir)
if _id is None:
for i in range(200):
dir_nrs = [int(d) for d in os.listdir(self.basedir)
if os.path.isdir(os.path.join(self.basedir, d)) and
d.isdigit()]
_id = max(dir_nrs + [0]) + 1
self.dir = os.path.join(self.basedir, str(_id))
try:
os.mkdir(self.dir)
break
except FileExistsError: # Catch race conditions
if i > 100:
# After some tries,
# expect that something other went wrong
raise
else:
self.dir = os.path.join(self.basedir, str(_id))
os.mkdir(self.dir)
self._make_run_dir(_id)

ex_info['sources'] = self.save_sources(ex_info)

Expand All @@ -137,8 +150,7 @@ def started_event(self, ex_info, command, host_info, start_time, config,
return os.path.relpath(self.dir, self.basedir) if _id is None else _id

def find_or_save(self, filename, store_dir):
if not os.path.exists(store_dir):
os.makedirs(store_dir)
self._makedirs(store_dir, exist_ok=True)
source_name, ext = os.path.splitext(os.path.basename(filename))
md5sum = get_digest(filename)
store_name = source_name + '_' + md5sum + ext
Expand Down
15 changes: 13 additions & 2 deletions tests/test_observers/test_file_storage_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import json

from sacred.observers.file_storage import FileStorageObserver
from sacred.serializer import restore
from sacred.metrics_logger import ScalarMetricLogEntry, linearize_metrics
# pylint: disable=redefined-builtin
from sacred.utils import FileExistsError # py2 compat.
# pylint: enable=redefined-builtin


T1 = datetime.datetime(1999, 5, 4, 3, 2, 1, 0)
Expand Down Expand Up @@ -91,7 +93,7 @@ def test_fs_observer_queued_event_creates_rundir(dir_obs, sample_run):
}


def test_fs_observer_started_event_creates_rundir(dir_obs, sample_run):
def test_fs_observer_started_event_creates_rundir(dir_obs, sample_run, monkeypatch):
basedir, obs = dir_obs
sample_run['_id'] = None
_id = obs.started_event(**sample_run)
Expand All @@ -115,6 +117,15 @@ def test_fs_observer_started_event_creates_rundir(dir_obs, sample_run):
"status": "RUNNING"
}

def mkdir_raises_file_exists(name):
raise FileExistsError("File already exists: " + name)

with monkeypatch.context() as m:
m.setattr('os.mkdir', mkdir_raises_file_exists)
with pytest.raises(FileExistsError):
sample_run['_id'] = None
_id = obs.started_event(**sample_run)


def test_fs_observer_started_event_stores_source(dir_obs, sample_run, tmpfile):
basedir, obs = dir_obs
Expand Down

0 comments on commit 718b798

Please sign in to comment.