Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make failed mongo observer dump configurable #462

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 0 additions & 71 deletions examples/my_runs/template.html

This file was deleted.

36 changes: 31 additions & 5 deletions sacred/observers/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
# coding=utf-8
from __future__ import division, print_function, unicode_literals

import mimetypes
import os.path
import pickle
import re
import os.path
import sys
import time
import mimetypes
from tempfile import NamedTemporaryFile

import sacred.optional as opt
from sacred.commandline_options import CommandLineOption
Expand Down Expand Up @@ -59,7 +60,23 @@ class MongoObserver(RunObserver):
@staticmethod
def create(url=None, db_name='sacred', collection='runs',
overwrite=None, priority=DEFAULT_MONGO_PRIORITY,
client=None, **kwargs):
client=None, failure_dir=None, **kwargs):
"""Factory method for MongoObserver.

Parameters
----------
url: Mongo URI to connect to.
db_name: Database to connect to.
collection: Collection to write the runs to. (default: "runs").
overwrite: _id of a run that should be overwritten.
priority: (default 30)
client: Client to connect to. Do not use client and URL together.
failure_dir: Directory to save the run of a failed observer to.

Returns
-------
An instantiated MongoObserver.
"""
import pymongo
import gridfs

Expand All @@ -81,10 +98,12 @@ def create(url=None, db_name='sacred', collection='runs',
return MongoObserver(runs_collection,
fs, overwrite=overwrite,
metrics_collection=metrics_collection,
failure_dir=failure_dir,
priority=priority)

def __init__(self, runs_collection,
fs, overwrite=None, metrics_collection=None,
failure_dir=None,
priority=DEFAULT_MONGO_PRIORITY):
self.runs = runs_collection
self.metrics = metrics_collection
Expand All @@ -100,6 +119,7 @@ def __init__(self, runs_collection,
self.overwrite = overwrite
self.run_entry = None
self.priority = priority
self.failure_dir = failure_dir

def queued_event(self, ex_info, command, host_info, queue_time, config,
meta_info, _id):
Expand Down Expand Up @@ -288,6 +308,8 @@ def final_save(self, attempts):
except pymongo.errors.AutoReconnect:
if i < attempts - 1:
time.sleep(1)
except pymongo.errors.ConnectionFailure:
pass
except pymongo.errors.InvalidDocument:
self.run_entry = force_bson_encodeable(self.run_entry)
print("Warning: Some of the entries of the run were not "
Expand All @@ -296,9 +318,13 @@ def final_save(self, attempts):
"Most likely it is either the 'info' or the 'result'.",
file=sys.stderr)

from tempfile import NamedTemporaryFile
if not os.path.exists(self.failure_dir):
os.makedirs(self.failure_dir)
with NamedTemporaryFile(suffix='.pickle', delete=False,
prefix='sacred_mongo_fail_') as f:
prefix='sacred_mongo_fail_{}_'.format(
self.run_entry["_id"]
),
dir=self.failure_dir) as f:
pickle.dump(self.run_entry, f)
print("Warning: saving to MongoDB failed! "
"Stored experiment entry in '{}'".format(f.name),
Expand Down
2 changes: 1 addition & 1 deletion sacred/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def _stop_heartbeat(self):
# only stop if heartbeat was started
if self._heartbeat is not None:
self._stop_heartbeat_event.set()
self._heartbeat.join(2)
self._heartbeat.join(timeout=2)

def _emit_queued(self):
self.status = 'QUEUED'
Expand Down
1 change: 1 addition & 0 deletions sacred/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ def create(cls, func, interval=10):
return stop_event, timer_thread

def __init__(self, event, func, interval=10.):
# TODO use super here.
threading.Thread.__init__(self)
self.stopped = event
self.func = func
Expand Down
64 changes: 64 additions & 0 deletions tests/test_observers/failing_mongo_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@

import mongomock
import pymongo
import pymongo.errors



class FailingMongoClient(mongomock.MongoClient):
def __init__(self, max_calls_before_failure=2,
exception_to_raise=pymongo.errors.AutoReconnect, **kwargs):
super(FailingMongoClient, self).__init__(**kwargs)
self._max_calls_before_failure = max_calls_before_failure
self.exception_to_raise = exception_to_raise

def get_database(self, name, codec_options=None, read_preference=None,
write_concern=None):
db = self._databases.get(name)
if db is None:
db = self._databases[name] = FailingDatabase(
max_calls_before_failure=self._max_calls_before_failure,
exception_to_raise=self.exception_to_raise, client=self,
name=name, )
return db


class FailingDatabase(mongomock.Database):
def __init__(self, max_calls_before_failure, exception_to_raise=None,
**kwargs):
super(FailingDatabase, self).__init__(**kwargs)
self._max_calls_before_failure = max_calls_before_failure
self.exception_to_raise = exception_to_raise

def get_collection(self, name, codec_options=None, read_preference=None,
write_concern=None):
collection = self._collections.get(name)
if collection is None:
collection = self._collections[name] = FailingCollection(
max_calls_before_failure=self._max_calls_before_failure,
exception_to_raise=self.exception_to_raise, db=self,
name=name, )
return collection


class FailingCollection(mongomock.Collection):
def __init__(self, max_calls_before_failure, exception_to_raise, **kwargs):
super(FailingCollection, self).__init__(**kwargs)
self._max_calls_before_failure = max_calls_before_failure
self.exception_to_raise = exception_to_raise
self._calls = 0

def insert_one(self, document):
self._calls += 1
if self._calls > self._max_calls_before_failure:
raise pymongo.errors.ConnectionFailure
else:
return super(FailingCollection, self).insert_one(document)

def update_one(self, filter, update, upsert=False):
self._calls += 1
if self._calls > self._max_calls_before_failure:
raise pymongo.errors.ConnectionFailure
else:
return super(FailingCollection, self).update_one(filter, update,
upsert)
Loading