diff --git a/diesel/protocols/mongodb.py b/diesel/protocols/mongodb.py index 12d6dd1..4be7e86 100644 --- a/diesel/protocols/mongodb.py +++ b/diesel/protocols/mongodb.py @@ -10,6 +10,8 @@ _ZERO = "\x00\x00\x00\x00" HEADER_SIZE = 16 +class MongoOperationalError(Exception): pass + def _full_name(parent, child): return "%s.%s" % (parent, child) @@ -33,14 +35,14 @@ class Collection(TraversesCollections): def find(self, spec=None, fields=None, skip=0, limit=0): yield up(MongoCursor(self.name, self.client, spec, fields, skip, limit)) - def update(self, spec, doc, upsert=False, multi=False): - yield self.client.update(self.name, spec, doc, upsert, multi) + def update(self, spec, doc, upsert=False, multi=False, safe=True): + yield self.client.update(self.name, spec, doc, upsert, multi, safe) - def insert(self, doc_or_docs): - yield self.client.insert(self.name, doc_or_docs) + def insert(self, doc_or_docs, safe=True): + yield self.client.insert(self.name, doc_or_docs, safe) - def delete(self, spec): - yield self.client.delete(self.name, spec) + def delete(self, spec, safe=True): + yield self.client.delete(self.name, spec, safe) class MongoClient(Client): collection_class = None @@ -99,22 +101,36 @@ def get_more(self, cursor): else: yield response([]) + def _put_gle_command(self): + msg = Ops.query('admin.$cmd', {'getlasterror' : 1}, 0, 0, -1) + res = yield self._put_request_get_response(Ops.OP_QUERY, msg) + _, _, _, r = res + doc = r[0] + if doc.get('err'): + raise MongoOperationalError(doc['error']) + @call - def update(self, col, spec, doc, upsert=False, multi=False): + def update(self, col, spec, doc, upsert=False, multi=False, safe=True): data = Ops.update(col, spec, doc, upsert, multi) yield self._put_request(Ops.OP_UPDATE, data) + if safe: + yield self._put_gle_command() yield response(None) @call - def insert(self, col, doc_or_docs): + def insert(self, col, doc_or_docs, safe=True): data = Ops.insert(col, doc_or_docs) yield self._put_request(Ops.OP_INSERT, data) + if safe: + yield self._put_gle_command() yield response(None) @call - def delete(self, col, spec): + def delete(self, col, spec, safe=True): data = Ops.delete(col, spec) yield self._put_request(Ops.OP_DELETE, data) + if safe: + yield self._put_gle_command() yield response(None) @call