Skip to content

Commit

Permalink
Make failed mongo observer dump configurable (#462)
Browse files Browse the repository at this point in the history
* Fix some typos

In comments and forbidden collections

* Make mongo observer dump dir configurable

* Fix py27 and flake8 warnings

* Fix some py27 errors and move mock to own file

* Make Python 2 compatible
  • Loading branch information
JarnoRFB committed Jun 2, 2019
1 parent 24d84a2 commit 7f7e1e0
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 156 deletions.
71 changes: 0 additions & 71 deletions examples/my_runs/template.html

This file was deleted.

36 changes: 31 additions & 5 deletions sacred/observers/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# coding=utf-8
from __future__ import division, print_function, unicode_literals

import mimetypes
import os.path
import pickle
import re
import os.path
import sys
import time
import mimetypes
from tempfile import NamedTemporaryFile

import sacred.optional as opt
from sacred.commandline_options import CommandLineOption
Expand Down Expand Up @@ -59,7 +60,23 @@ class MongoObserver(RunObserver):
@staticmethod
def create(url=None, db_name='sacred', collection='runs',
overwrite=None, priority=DEFAULT_MONGO_PRIORITY,
client=None, **kwargs):
client=None, failure_dir=None, **kwargs):
"""Factory method for MongoObserver.
Parameters
----------
url: Mongo URI to connect to.
db_name: Database to connect to.
collection: Collection to write the runs to. (default: "runs").
overwrite: _id of a run that should be overwritten.
priority: (default 30)
client: Client to connect to. Do not use client and URL together.
failure_dir: Directory to save the run of a failed observer to.
Returns
-------
An instantiated MongoObserver.
"""
import pymongo
import gridfs

Expand All @@ -81,10 +98,12 @@ def create(url=None, db_name='sacred', collection='runs',
return MongoObserver(runs_collection,
fs, overwrite=overwrite,
metrics_collection=metrics_collection,
failure_dir=failure_dir,
priority=priority)

def __init__(self, runs_collection,
fs, overwrite=None, metrics_collection=None,
failure_dir=None,
priority=DEFAULT_MONGO_PRIORITY):
self.runs = runs_collection
self.metrics = metrics_collection
Expand All @@ -100,6 +119,7 @@ def __init__(self, runs_collection,
self.overwrite = overwrite
self.run_entry = None
self.priority = priority
self.failure_dir = failure_dir

def queued_event(self, ex_info, command, host_info, queue_time, config,
meta_info, _id):
Expand Down Expand Up @@ -288,6 +308,8 @@ def final_save(self, attempts):
except pymongo.errors.AutoReconnect:
if i < attempts - 1:
time.sleep(1)
except pymongo.errors.ConnectionFailure:
pass
except pymongo.errors.InvalidDocument:
self.run_entry = force_bson_encodeable(self.run_entry)
print("Warning: Some of the entries of the run were not "
Expand All @@ -296,9 +318,13 @@ def final_save(self, attempts):
"Most likely it is either the 'info' or the 'result'.",
file=sys.stderr)

from tempfile import NamedTemporaryFile
if not os.path.exists(self.failure_dir):
os.makedirs(self.failure_dir)
with NamedTemporaryFile(suffix='.pickle', delete=False,
prefix='sacred_mongo_fail_') as f:
prefix='sacred_mongo_fail_{}_'.format(
self.run_entry["_id"]
),
dir=self.failure_dir) as f:
pickle.dump(self.run_entry, f)
print("Warning: saving to MongoDB failed! "
"Stored experiment entry in '{}'".format(f.name),
Expand Down
2 changes: 1 addition & 1 deletion sacred/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def _stop_heartbeat(self):
# only stop if heartbeat was started
if self._heartbeat is not None:
self._stop_heartbeat_event.set()
self._heartbeat.join(2)
self._heartbeat.join(timeout=2)

def _emit_queued(self):
self.status = 'QUEUED'
Expand Down
1 change: 1 addition & 0 deletions sacred/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ def create(cls, func, interval=10):
return stop_event, timer_thread

def __init__(self, event, func, interval=10.):
# TODO use super here.
threading.Thread.__init__(self)
self.stopped = event
self.func = func
Expand Down
64 changes: 64 additions & 0 deletions tests/test_observers/failing_mongo_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@

import mongomock
import pymongo
import pymongo.errors



class FailingMongoClient(mongomock.MongoClient):
def __init__(self, max_calls_before_failure=2,
exception_to_raise=pymongo.errors.AutoReconnect, **kwargs):
super(FailingMongoClient, self).__init__(**kwargs)
self._max_calls_before_failure = max_calls_before_failure
self.exception_to_raise = exception_to_raise

def get_database(self, name, codec_options=None, read_preference=None,
write_concern=None):
db = self._databases.get(name)
if db is None:
db = self._databases[name] = FailingDatabase(
max_calls_before_failure=self._max_calls_before_failure,
exception_to_raise=self.exception_to_raise, client=self,
name=name, )
return db


class FailingDatabase(mongomock.Database):
def __init__(self, max_calls_before_failure, exception_to_raise=None,
**kwargs):
super(FailingDatabase, self).__init__(**kwargs)
self._max_calls_before_failure = max_calls_before_failure
self.exception_to_raise = exception_to_raise

def get_collection(self, name, codec_options=None, read_preference=None,
write_concern=None):
collection = self._collections.get(name)
if collection is None:
collection = self._collections[name] = FailingCollection(
max_calls_before_failure=self._max_calls_before_failure,
exception_to_raise=self.exception_to_raise, db=self,
name=name, )
return collection


class FailingCollection(mongomock.Collection):
def __init__(self, max_calls_before_failure, exception_to_raise, **kwargs):
super(FailingCollection, self).__init__(**kwargs)
self._max_calls_before_failure = max_calls_before_failure
self.exception_to_raise = exception_to_raise
self._calls = 0

def insert_one(self, document):
self._calls += 1
if self._calls > self._max_calls_before_failure:
raise pymongo.errors.ConnectionFailure
else:
return super(FailingCollection, self).insert_one(document)

def update_one(self, filter, update, upsert=False):
self._calls += 1
if self._calls > self._max_calls_before_failure:
raise pymongo.errors.ConnectionFailure
else:
return super(FailingCollection, self).update_one(filter, update,
upsert)
Loading

0 comments on commit 7f7e1e0

Please sign in to comment.