Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions via an internal global session var #2569

Open
wants to merge 48 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7001f1a
Sketch of transaction context manager
juannyG Aug 30, 2021
04e2f1a
Scaffold session management for transactions
juannyG Aug 31, 2021
5ccd144
Using a global session var within mongoengine for transactions
juannyG Sep 1, 2021
a77c88c
Add unit tests for transaction context manager
juannyG Sep 1, 2021
bc8f409
Add test cases for cross database transactions
juannyG Sep 5, 2021
09f09da
Initialize mongo replica set in github workflow
juannyG Sep 30, 2021
21ef092
Add checks for versions of Mongo
juannyG Oct 1, 2021
447648a
Explicitly check if mongo major version is >=4
juannyG Oct 1, 2021
9bc8e7b
Update tests to account for pymongo support
juannyG Oct 12, 2021
237d007
Update tests to use recommended requirement decorators
juannyG Nov 9, 2021
77f76db
Merge branch 'master' into transactions-global-session-ver
juannyG Nov 9, 2021
ccd8a0f
Revert item frequenices map reduce formatting structure to original
juannyG Nov 9, 2021
7fbd52c
Add test cases to depict nested transaction behavior
juannyG Nov 9, 2021
471a700
Provide thread safety for sessions
juannyG Nov 10, 2021
9765278
Add tests showing behavior of session management in nested txs
juannyG Nov 10, 2021
9861da6
Merge branch 'MongoEngine:master' into transactions-global-session-ver
juannyG Nov 30, 2021
8009c6a
Upon disconnect, clear any and all outstanding sessions
juannyG Nov 30, 2021
196c1af
Merge branch 'master' into transactions-global-session-ver
juannyG Jan 27, 2022
d7a6b02
Reset map reduce function definition
juannyG Jan 27, 2022
19ef553
Merge branch 'MongoEngine:master' into transactions-global-session-ver
juannyG Mar 25, 2022
16ba0b9
Add minimal placeholders for pymongo session and transaction kwargs t…
juannyG Mar 29, 2022
7032387
Add myself to AUTHORS
juannyG Mar 29, 2022
533220f
Merge branch 'MongoEngine:master' into transactions-global-session-ver
juannyG Jun 17, 2022
1707630
Update context_managers.py based on black
juannyG Jun 21, 2022
7d3db87
Experiment to deal with max lock request timeout
juannyG Jun 21, 2022
df87f82
Add missing pymongo import
juannyG Jun 21, 2022
aa6036a
Setup retry mechanism to account for lock request timeout exception
juannyG Jun 22, 2022
db279df
Strengthen the stability of thread test
juannyG Jun 23, 2022
3fd1867
Explicitly handle retry of TransientTransactionError, per docs
juannyG Jun 23, 2022
e02d294
Fix bug: errorLabels is a key, not a property
juannyG Jun 23, 2022
bb2efb0
Handle TransientTransactionError in nested tx test case
juannyG Jun 23, 2022
44c9c30
Account for pymongo>=4 host name management in replSet connections
juannyG Jun 23, 2022
1124532
Merge branch 'master' into transactions-global-session-ver
juannyG Jun 23, 2022
4a76e95
Merge branch 'master' into transactions-global-session-ver
juannyG Feb 24, 2023
6df06ab
Merge branch 'master' of github.com:juannyG/mongoengine
juannyG Feb 24, 2023
4ab761f
Merge branch 'master' into transactions-global-session-ver
juannyG Feb 24, 2023
de1496d
Merge branch 'master' of github.com:juannyG/mongoengine
juannyG Aug 26, 2023
60c0224
Merge branch 'master' into transactions-global-session-ver
juannyG Aug 26, 2023
3c0ae4f
Address PR feedback
juannyG Aug 26, 2023
4a7d24a
Add doc string for run_in_transaction context manager
juannyG Aug 26, 2023
a1233fd
Merge branch 'MongoEngine:master' into master
juannyG Nov 4, 2023
882f1e3
Abstract `with_transaction` as opposed to `start_transaction`
juannyG Nov 4, 2023
68e2d94
Wrap transaction test wtih gte_40 decorator
juannyG Nov 4, 2023
58cfa14
Merge branch 'master' into transactions-global-session-ver
juannyG Nov 4, 2023
de951dc
Remove tests related to nested transactions
juannyG Nov 4, 2023
13cc855
Move `run_in_transaction` and related tests
juannyG Nov 4, 2023
0729223
Simplify test_multiple_connection_settings assertion
juannyG Nov 8, 2023
9ca4cdd
Clean up run_in_transaction doc string
juannyG Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/start_mongo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ MONGODB=$1
mongodb_dir=$(find ${PWD}/ -type d -name "mongodb-linux-x86_64*")

mkdir $mongodb_dir/data
$mongodb_dir/bin/mongod --dbpath $mongodb_dir/data --logpath $mongodb_dir/mongodb.log --fork
$mongodb_dir/bin/mongod --dbpath $mongodb_dir/data --logpath $mongodb_dir/mongodb.log --fork --replSet mongoengine
mongo --eval 'db.version();' # Make sure mongo is awake
mongo --eval "rs.initiate()"
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,4 @@ that much better:
* oleksandr-l5 (https://github.com/oleksandr-l5)
* Ido Shraga (https://github.com/idoshr)
* Terence Honles (https://github.com/terencehonles)
* Juan Gutierrez (https://github.com/juannyg)
90 changes: 90 additions & 0 deletions mongoengine/connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import collections
import threading
import warnings

from pymongo import MongoClient, ReadPreference, uri_parser
Expand Down Expand Up @@ -26,6 +28,7 @@
"get_connection",
"get_db",
"register_connection",
"run_in_transaction",
]


Expand All @@ -38,6 +41,7 @@
_connections = {}
_dbs = {}


READ_PREFERENCE = ReadPreference.PRIMARY


Expand Down Expand Up @@ -463,3 +467,89 @@ def connect(db=None, alias=DEFAULT_CONNECTION_NAME, **kwargs):
# Support old naming convention
_get_connection = get_connection
_get_db = get_db


class _LocalSessions(threading.local):
def __init__(self):
self.sessions = collections.deque()

def append(self, session):
self.sessions.append(session)

def get_current(self):
if len(self.sessions):
return self.sessions[len(self.sessions) - 1]

def clear_current(self):
if len(self.sessions):
self.sessions.pop()

def clear_all(self):
self.sessions.clear()


_local_sessions = _LocalSessions()


def _set_session(session):
_local_sessions.append(session)


def _get_session():
return _local_sessions.get_current()


def _clear_session():
return _local_sessions.clear_current()


def run_in_transaction(
callback,
alias=DEFAULT_CONNECTION_NAME,
session_kwargs=None,
transaction_kwargs=None,
):
"""Execute queries within a MongoDB transaction.

Usage:

.. code-block:: python

class A(Document):
name = StringField()

def callback(session):
a_doc = A.objects.create(name="a")
a_doc.update(name="b")
run_in_transaction(callback)

# With custom args/kwargs
def callback(session, custom_arg, customer_kwarg=None):
a_doc.update(name=f'{custom_arg}-{custom_kwarg}')
run_in_transaction(
lambda s: callback(s, 'arg', custom_kwarg='kwarg')
)

Be aware that:
- Mongo transactions run inside a session which is bound to a connection. If you attempt to
execute a transaction across a different connection alias, pymongo will raise an exception. In
other words: you cannot create a transaction that crosses different database connections.

For more information regarding pymongo transactions: https://pymongo.readthedocs.io/en/stable/api/pymongo/client_session.html#transactions
"""

if PYMONGO_VERSION < (3, 9):
raise mongoengine.errors.OperationError(
"pymongo>=3.9 is required to use transactions"
)

conn = get_connection(alias)
session_kwargs = session_kwargs or {}
with conn.start_session(**session_kwargs) as session:
transaction_kwargs = transaction_kwargs or {}
transaction_kwargs["callback"] = callback
_set_session(session)
try:
session.with_transaction(**transaction_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyMongo's with_transaction returns the callback's value so I believe this should be return session.with_transaction(...). Otherwise it becomes unwieldy to return a value.

finally:
_clear_session()
10 changes: 7 additions & 3 deletions mongoengine/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from pymongo.write_concern import WriteConcern

from mongoengine.common import _import_class
from mongoengine.connection import DEFAULT_CONNECTION_NAME, get_db
from mongoengine.connection import (
DEFAULT_CONNECTION_NAME,
_get_session,
get_db,
)
from mongoengine.pymongo_support import count_documents

__all__ = (
Expand Down Expand Up @@ -210,11 +214,11 @@ def __init__(self, alias=DEFAULT_CONNECTION_NAME):
}

def _turn_on_profiling(self):
profile_update_res = self.db.command({"profile": 0})
profile_update_res = self.db.command({"profile": 0}, session=_get_session())
self.initial_profiling_level = profile_update_res["was"]

self.db.system.profile.drop()
self.db.command({"profile": 2})
self.db.command({"profile": 2}, session=_get_session())

def _resets_profiling(self):
self.db.command({"profile": self.initial_profiling_level})
Expand Down
8 changes: 5 additions & 3 deletions mongoengine/dereference.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
get_document,
)
from mongoengine.base.datastructures import LazyReference
from mongoengine.connection import get_db
from mongoengine.connection import _get_session, get_db
from mongoengine.document import Document, EmbeddedDocument
from mongoengine.fields import (
DictField,
Expand Down Expand Up @@ -187,13 +187,15 @@ def _fetch_objects(self, doc_type=None):

if doc_type:
references = doc_type._get_db()[collection].find(
{"_id": {"$in": refs}}
{"_id": {"$in": refs}}, session=_get_session()
)
for ref in references:
doc = doc_type._from_son(ref)
object_map[(collection, doc.id)] = doc
else:
references = get_db()[collection].find({"_id": {"$in": refs}})
references = get_db()[collection].find(
{"_id": {"$in": refs}}, session=_get_session()
)
for ref in references:
if "_cls" in ref:
doc = get_document(ref["_cls"])._from_son(ref)
Expand Down
36 changes: 25 additions & 11 deletions mongoengine/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
get_document,
)
from mongoengine.common import _import_class
from mongoengine.connection import DEFAULT_CONNECTION_NAME, get_db
from mongoengine.connection import (
DEFAULT_CONNECTION_NAME,
_get_session,
get_db,
)
from mongoengine.context_managers import (
set_write_concern,
switch_collection,
Expand Down Expand Up @@ -269,7 +273,7 @@ def _get_capped_collection(cls):
if max_documents:
opts["max"] = max_documents

return db.create_collection(collection_name, **opts)
return db.create_collection(collection_name, session=_get_session(), **opts)

def to_mongo(self, *args, **kwargs):
data = super().to_mongo(*args, **kwargs)
Expand Down Expand Up @@ -479,17 +483,21 @@ def _save_create(self, doc, force_insert, write_concern):
collection = self._get_collection()
with set_write_concern(collection, write_concern) as wc_collection:
if force_insert:
return wc_collection.insert_one(doc).inserted_id
return wc_collection.insert_one(doc, session=_get_session()).inserted_id
# insert_one will provoke UniqueError alongside save does not
# therefore, it need to catch and call replace_one.
if "_id" in doc:
select_dict = {"_id": doc["_id"]}
select_dict = self._integrate_shard_key(doc, select_dict)
raw_object = wc_collection.find_one_and_replace(select_dict, doc)
raw_object = wc_collection.find_one_and_replace(
select_dict, doc, session=_get_session()
)
if raw_object:
return doc["_id"]

object_id = wc_collection.insert_one(doc).inserted_id
object_id = wc_collection.insert_one(
doc, session=_get_session()
).inserted_id

return object_id

Expand Down Expand Up @@ -547,7 +555,7 @@ def _save_update(self, doc, save_condition, write_concern):
upsert = save_condition is None
with set_write_concern(collection, write_concern) as wc_collection:
last_error = wc_collection.update_one(
select_dict, update_doc, upsert=upsert
select_dict, update_doc, upsert=upsert, session=_get_session()
).raw_result
if not upsert and last_error["n"] == 0:
raise SaveConditionError(
Expand Down Expand Up @@ -850,7 +858,7 @@ def drop_collection(cls):
)
cls._collection = None
db = cls._get_db()
db.drop_collection(coll_name)
db.drop_collection(coll_name, session=_get_session())

@classmethod
def create_index(cls, keys, background=False, **kwargs):
Expand All @@ -867,7 +875,9 @@ def create_index(cls, keys, background=False, **kwargs):
index_spec["background"] = background
index_spec.update(kwargs)

return cls._get_collection().create_index(fields, **index_spec)
return cls._get_collection().create_index(
fields, session=_get_session(), **index_spec
)

@classmethod
def ensure_indexes(cls):
Expand Down Expand Up @@ -913,7 +923,9 @@ def ensure_indexes(cls):
if "cls" in opts:
del opts["cls"]

collection.create_index(fields, background=background, **opts)
collection.create_index(
fields, background=background, session=_get_session(), **opts
)

# If _cls is being used (for polymorphism), it needs an index,
# only if another index doesn't begin with _cls
Expand All @@ -923,7 +935,9 @@ def ensure_indexes(cls):
if "cls" in index_opts:
del index_opts["cls"]

collection.create_index("_cls", background=background, **index_opts)
collection.create_index(
"_cls", background=background, session=_get_session(), **index_opts
)

@classmethod
def list_indexes(cls):
Expand Down Expand Up @@ -1001,7 +1015,7 @@ def compare_indexes(cls):

existing = []
collection = cls._get_collection()
for info in collection.index_information().values():
for info in collection.index_information(session=_get_session()).values():
if "_fts" in info["key"][0]:
# Useful for text indexes (but not only)
index_type = info["key"][0][1]
Expand Down