Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Qwlouse committed Feb 27, 2015
1 parent efb0877 commit df3f082
Showing 1 changed file with 79 additions and 39 deletions.
118 changes: 79 additions & 39 deletions sacred/observers/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from sacred.dependencies import get_digest
from sacred.observers.base import RunObserver
import sacred.optional as opt
from sacred.optional import pymongo, bson, gridfs
import pymongo
from pymongo.errors import AutoReconnect
import bson
import gridfs

SON_MANIPULATORS = []

Expand Down Expand Up @@ -47,54 +50,91 @@ def transform_outgoing(self, son, collection):

class MongoObserver(RunObserver):
@staticmethod
def create(url, db_name='sacred', **kwargs):
def create(url, db_name='sacred', prefix='my', **kwargs):
client = pymongo.MongoClient(url, **kwargs)
database = client[db_name]
for manipulator in SON_MANIPULATORS:
database.add_son_manipulator(manipulator)
experiments_collection = database['experiments']
fs = gridfs.GridFS(database)
return MongoObserver(experiments_collection, fs)

def __init__(self, experiments_collection, fs):
self.collection = experiments_collection
experiments_collection = database[prefix + '.experiments']
runs_collection = database[prefix + '.runs']
hosts_collection = database[prefix + '.hosts']
fs = gridfs.GridFS(database, collection=prefix)
return MongoObserver(experiments_collection, runs_collection,
hosts_collection, fs)

def __init__(self, experiments_collection, runs_collection,
hosts_collection, fs):
self.experiments = experiments_collection
self.runs = runs_collection
self.hosts = hosts_collection
self.fs = fs
self.experiment_entry = None
self.run_entry = None
self.host_entry = None
self.experiment_and_host_saved = False

@staticmethod
def find_or_save(collection, document):
doc = collection.find_one(document)
if doc is None:
return bson.DBRef(collection=collection.name,
id=collection.save(document))
else:
return bson.DBRef(collection=collection.name,
id=doc['_id'])

def save(self):
try:
self.collection.save(self.experiment_entry)
except pymongo.AutoReconnect: # just wait for the next save
if not self.experiment_and_host_saved:
ex_ref = self.find_or_save(self.experiments,
self.experiment_entry)
self.run_entry['experiment'] = ex_ref

host_ref = self.find_or_save(self.hosts, self.host_entry)
self.run_entry['host'] = host_ref

self.experiment_and_host_saved = True

self.runs.save(self.run_entry)
except AutoReconnect: # just wait for the next save
pass

def final_save(self, attempts=10):
for i in range(attempts):
try:
self.collection.save(self.experiment_entry)
self.runs.save(self.run_entry)
return
except pymongo.AutoReconnect:
except AutoReconnect:
if i < attempts - 1:
time.sleep(1)

from tempfile import NamedTemporaryFile
with NamedTemporaryFile(suffix='.pickle', delete=False,
prefix='sacred_mongo_fail_') as f:
pickle.dump(self.experiment_entry, f)
pickle.dump([self.experiment_entry,
self.host_entry,
self.run_entry], f)

print("Warning: saving to MongoDB failed! "
"Stored experiment entry in '%s'" % f.name,
file=sys.stderr)

def started_event(self, name, ex_info, host_info, start_time, config):
self.experiment_and_host_saved = False
self.experiment_entry = dict()
self.experiment_entry['name'] = name
self.experiment_entry['experiment_info'] = ex_info
self.experiment_entry['host_info'] = host_info
self.experiment_entry['start_time'] = \
datetime.fromtimestamp(start_time)
self.experiment_entry['config'] = config
self.experiment_entry['status'] = 'RUNNING'
self.experiment_entry['resources'] = []
self.experiment_entry['artifacts'] = []
self.experiment_entry.update(ex_info)

self.host_entry = dict()
self.host_entry = host_info

self.run_entry = dict()
self.run_entry['start_time'] = datetime.fromtimestamp(start_time)
self.run_entry['config'] = config
self.run_entry['status'] = 'RUNNING'
self.run_entry['resources'] = []
self.run_entry['artifacts'] = []

self.save()

for source_name, md5 in ex_info['sources']:
Expand All @@ -103,57 +143,57 @@ def started_event(self, name, ex_info, host_info, start_time, config):
self.fs.put(f, filename=source_name)

def heartbeat_event(self, info, captured_out):
self.experiment_entry['info'] = info
self.experiment_entry['captured_out'] = captured_out
self.experiment_entry['heartbeat'] = datetime.now()
self.run_entry['info'] = info
self.run_entry['captured_out'] = captured_out
self.run_entry['heartbeat'] = datetime.now()
self.save()

def completed_event(self, stop_time, result):
self.experiment_entry['stop_time'] = \
self.run_entry['stop_time'] = \
datetime.fromtimestamp(stop_time)
self.experiment_entry['result'] = result
self.experiment_entry['status'] = 'COMPLETED'
self.run_entry['result'] = result
self.run_entry['status'] = 'COMPLETED'
self.final_save(attempts=10)

def interrupted_event(self, interrupt_time):
self.experiment_entry['stop_time'] = \
self.run_entry['stop_time'] = \
datetime.fromtimestamp(interrupt_time)
self.experiment_entry['status'] = 'INTERRUPTED'
self.run_entry['status'] = 'INTERRUPTED'
self.final_save(attempts=3)

def failed_event(self, fail_time, fail_trace):
self.experiment_entry['stop_time'] = \
self.run_entry['stop_time'] = \
datetime.fromtimestamp(fail_time)
self.experiment_entry['status'] = 'FAILED'
self.experiment_entry['fail_trace'] = fail_trace
self.run_entry['status'] = 'FAILED'
self.run_entry['fail_trace'] = fail_trace
self.final_save(attempts=1)

def resource_event(self, filename):
if self.fs.exists(filename=filename):
md5hash = get_digest(filename)
if self.fs.exists(filename=filename, md5=md5hash):
resource = (filename, md5hash)
if resource not in self.experiment_entry['resources']:
self.experiment_entry['resources'].append(resource)
if resource not in self.run_entry['resources']:
self.run_entry['resources'].append(resource)
return
with open(filename, 'rb') as f:
file_id = self.fs.put(f, filename=filename)
md5hash = self.fs.get(file_id).md5
self.experiment_entry['resources'].append((filename, md5hash))
self.run_entry['resources'].append((filename, md5hash))

def artifact_event(self, filename):
with open(filename, 'rb') as f:
head, tail = os.path.split(filename)
run_id = self.experiment_entry['_id']
run_id = self.run_entry['_id']
db_filename = 'artifact://{}/{}/{}'.format(
self.experiment_entry['name'], run_id, tail)
self.run_entry['name'], run_id, tail)
file_id = self.fs.put(f, filename=db_filename)
self.experiment_entry['artifacts'].append(file_id)
self.run_entry['artifacts'].append(file_id)
self.save()

def __eq__(self, other):
if isinstance(other, MongoObserver):
return self.collection == other.collection
return self.experiments == other.experiments
return False

def __ne__(self, other):
Expand Down

0 comments on commit df3f082

Please sign in to comment.