Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileObserver setup concurrent-safe #473

Merged
merged 19 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 42 additions & 29 deletions sacred/observers/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import json
import os
import os.path
import tempfile

from shutil import copyfile

from sacred.commandline_options import CommandLineOption
from sacred.dependencies import get_digest
from sacred.observers.base import RunObserver
from sacred.utils import FileNotFoundError, FileExistsError # py2 compat.
from sacred import optional as opt
from sacred.serializer import flatten
# pylint: disable=redefined-builtin
from sacred.utils import FileNotFoundError, FileExistsError # py2 compat.
# pylint: enable=redefined-builtin


DEFAULT_FILE_STORAGE_PRIORITY = 20
Expand Down Expand Up @@ -51,16 +52,48 @@ def __init__(self, basedir, resource_dir, source_dir, template,
self.cout = ""
self.cout_write_cursor = 0

def queued_event(self, ex_info, command, host_info, queue_time, config,
meta_info, _id):
if not os.path.exists(self.basedir):
os.makedirs(self.basedir)
@staticmethod
def _makedirs(name, mode=0o777, exist_ok=False):
""" Wrapper of os.makedirs with fallback
for exist_ok on python 2.
"""
try:
os.makedirs(name, mode, exist_ok)
except TypeError:
if not os.path.exists(name):
os.makedirs(name, mode)

def _try_set_next_dir(self, _id, raise_error):
JarnoRFB marked this conversation as resolved.
Show resolved Hide resolved
dir_nrs = [int(d) for d in os.listdir(self.basedir)
if os.path.isdir(os.path.join(self.basedir, d)) and
d.isdigit()]
_id = max(dir_nrs + [0]) + 1
new_dir = os.path.join(self.basedir, str(_id))
try:
os.mkdir(new_dir)
self.dir = new_dir
except FileExistsError: # Catch race conditions
if raise_error:
# After some tries,
# expect that something other went wrong
raise

def _make_run_dir(self, _id):
self._makedirs(self.basedir, exist_ok=True)
self.dir = None
if _id is None:
self.dir = tempfile.mkdtemp(prefix='run_', dir=self.basedir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole block could be placed in its own method to increase readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separated to a new method. Please review whether it still works like you expect.

fail_count = 0
while self.dir is None:
self._try_set_next_dir(_id, raise_error=(fail_count > 100))
fail_count += 1
else:
self.dir = os.path.join(self.basedir, str(_id))
os.mkdir(self.dir)

def queued_event(self, ex_info, command, host_info, queue_time, config,
meta_info, _id):
self._make_run_dir(_id)

self.run_entry = {
'experiment': dict(ex_info),
'command': command,
Expand Down Expand Up @@ -91,26 +124,7 @@ def save_sources(self, ex_info):

def started_event(self, ex_info, command, host_info, start_time, config,
meta_info, _id):
if not os.path.exists(self.basedir):
os.makedirs(self.basedir)
if _id is None:
for i in range(200):
dir_nrs = [int(d) for d in os.listdir(self.basedir)
if os.path.isdir(os.path.join(self.basedir, d)) and
d.isdigit()]
_id = max(dir_nrs + [0]) + 1
self.dir = os.path.join(self.basedir, str(_id))
try:
os.mkdir(self.dir)
break
except FileExistsError: # Catch race conditions
if i > 100:
# After some tries,
# expect that something other went wrong
raise
else:
self.dir = os.path.join(self.basedir, str(_id))
os.mkdir(self.dir)
self._make_run_dir(_id)

ex_info['sources'] = self.save_sources(ex_info)

Expand All @@ -137,8 +151,7 @@ def started_event(self, ex_info, command, host_info, start_time, config,
return os.path.relpath(self.dir, self.basedir) if _id is None else _id

def find_or_save(self, filename, store_dir):
if not os.path.exists(store_dir):
os.makedirs(store_dir)
self._makedirs(store_dir, exist_ok=True)
source_name, ext = os.path.splitext(os.path.basename(filename))
md5sum = get_digest(filename)
store_name = source_name + '_' + md5sum + ext
Expand Down
15 changes: 13 additions & 2 deletions tests/test_observers/test_file_storage_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import json

from sacred.observers.file_storage import FileStorageObserver
from sacred.serializer import restore
from sacred.metrics_logger import ScalarMetricLogEntry, linearize_metrics
# pylint: disable=redefined-builtin
from sacred.utils import FileExistsError # py2 compat.
# pylint: enable=redefined-builtin


T1 = datetime.datetime(1999, 5, 4, 3, 2, 1, 0)
Expand Down Expand Up @@ -91,7 +93,7 @@ def test_fs_observer_queued_event_creates_rundir(dir_obs, sample_run):
}


def test_fs_observer_started_event_creates_rundir(dir_obs, sample_run):
def test_fs_observer_started_event_creates_rundir(dir_obs, sample_run, monkeypatch):
basedir, obs = dir_obs
sample_run['_id'] = None
_id = obs.started_event(**sample_run)
Expand All @@ -115,6 +117,15 @@ def test_fs_observer_started_event_creates_rundir(dir_obs, sample_run):
"status": "RUNNING"
}

def mkdir_raises_file_exists(name):
raise FileExistsError("File already exists: " + name)

with monkeypatch.context() as m:
m.setattr('os.mkdir', mkdir_raises_file_exists)
with pytest.raises(FileExistsError):
sample_run['_id'] = None
_id = obs.started_event(**sample_run)


def test_fs_observer_started_event_stores_source(dir_obs, sample_run, tmpfile):
basedir, obs = dir_obs
Expand Down