Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transaction model aiopg #219

Merged
merged 2 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions aiopg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from .connection import connect, Connection, TIMEOUT as DEFAULT_TIMEOUT
from .cursor import Cursor
from .pool import create_pool, Pool

from .transaction import IsolationLevel, Transaction

__all__ = ('connect', 'create_pool', 'Connection', 'Cursor', 'Pool',
'version', 'version_info', 'DEFAULT_TIMEOUT')
'version', 'version_info', 'DEFAULT_TIMEOUT', 'IsolationLevel',
'Transaction')

__version__ = '0.13.1'

version = __version__ + ' , Python ' + sys.version


VersionInfo = namedtuple('VersionInfo',
'major minor micro releaselevel serial')

Expand All @@ -40,6 +40,6 @@ def _parse_version(ver):

version_info = _parse_version(__version__)


# make pyflakes happy
(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT)
(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT,
IsolationLevel, Transaction)
15 changes: 13 additions & 2 deletions aiopg/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import psycopg2

from .log import logger
from .utils import PY_35, PY_352
from .transaction import Transaction, IsolationLevel
from .utils import PY_35, PY_352, _TransactionBeginContextManager


class Cursor:

def __init__(self, conn, impl, timeout, echo):
self._conn = conn
self._impl = impl
self._timeout = timeout
self._echo = echo
self._transaction = Transaction(self, IsolationLevel.repeatable_read)

@property
def echo(self):
Expand Down Expand Up @@ -146,6 +147,16 @@ def callproc(self, procname, parameters=None, *, timeout=None):
else:
yield from self._conn._poll(waiter, timeout)

def begin(self):
return _TransactionBeginContextManager(self._transaction.begin())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not covered

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def begin_nested(self):
if not self._transaction.is_begin:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not called by tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return _TransactionBeginContextManager(
self._transaction.begin())
else:
return self._transaction.point()

@asyncio.coroutine
def mogrify(self, operation, parameters=None):
"""Return a query string after arguments binding.
Expand Down
191 changes: 191 additions & 0 deletions aiopg/transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import asyncio
import enum
import uuid
import warnings
from abc import ABC, abstractmethod

import psycopg2
from aiopg.utils import PY_35, _TransactionPointContextManager

__all__ = ('IsolationLevel', 'Transaction')


class IsolationCompiler(ABC):
name = ''

__slots__ = ('_readonly', '_deferrable')

def __init__(self, readonly, deferrable):
self._readonly = readonly
self._deferrable = deferrable
self._check_readonly_deferrable()

def _check_readonly_deferrable(self):
available = self._readonly or self._deferrable
if not isinstance(self, SerializableCompiler) and available:
raise ValueError('Is only available for serializable transactions')

def savepoint(self, unique_id):
return 'SAVEPOINT {}'.format(unique_id)

def release_savepoint(self, unique_id):
return 'RELEASE SAVEPOINT {}'.format(unique_id)

def rollback_savepoint(self, unique_id):
return 'ROLLBACK TO SAVEPOINT {}'.format(unique_id)

def commit(self):
return 'COMMIT'

def rollback(self):
return 'ROLLBACK'

@abstractmethod
def begin(self):
raise NotImplementedError("Please Implement this method")

def __repr__(self):
return self.name


class ReadCommittedCompiler(IsolationCompiler):
name = 'Read committed'

def begin(self):
return 'BEGIN'


class RepeatableReadCompiler(IsolationCompiler):
name = 'Repeatable read'

def begin(self):
return 'BEGIN ISOLATION LEVEL REPEATABLE READ'


class SerializableCompiler(IsolationCompiler):
name = 'Serializable'

def begin(self):
query = 'BEGIN ISOLATION LEVEL SERIALIZABLE'

if self._readonly:
query += ' READ ONLY'

if self._deferrable:
query += ' DEFERRABLE'

return query


class IsolationLevel(enum.Enum):
serializable = SerializableCompiler
repeatable_read = RepeatableReadCompiler
read_committed = ReadCommittedCompiler

def __call__(self, readonly, deferrable):
return self.value(readonly, deferrable)


class Transaction:
__slots__ = ('_cur', '_is_begin', '_isolation', '_unique_id')

def __init__(self, cur, isolation_level,
readonly=False, deferrable=False):
self._cur = cur
self._is_begin = False
self._unique_id = None
self._isolation = isolation_level(readonly, deferrable)

@property
def is_begin(self):
return self._is_begin

@asyncio.coroutine
def begin(self):
if self._is_begin:
raise psycopg2.ProgrammingError(
'You are trying to open a new transaction, use the save point')
self._is_begin = True
yield from self._cur.execute(self._isolation.begin())
return self

@asyncio.coroutine
def commit(self):
self._check_commit_rollback()
yield from self._cur.execute(self._isolation.commit())
self._is_begin = False

@asyncio.coroutine
def rollback(self):
self._check_commit_rollback()
yield from self._cur.execute(self._isolation.rollback())
self._is_begin = False

@asyncio.coroutine
def rollback_savepoint(self):
self._check_release_rollback()
yield from self._cur.execute(
self._isolation.rollback_savepoint(self._unique_id))
self._unique_id = None

@asyncio.coroutine
def release_savepoint(self):
self._check_release_rollback()
yield from self._cur.execute(
self._isolation.release_savepoint(self._unique_id))
self._unique_id = None

@asyncio.coroutine
def savepoint(self):
self._check_commit_rollback()
if self._unique_id is not None:
raise psycopg2.ProgrammingError('You do not shut down savepoint')

self._unique_id = 's{}'.format(uuid.uuid1().hex)
yield from self._cur.execute(
self._isolation.savepoint(self._unique_id))

return self

def point(self):
return _TransactionPointContextManager(self.savepoint())

def _check_commit_rollback(self):
if not self._is_begin:
raise psycopg2.ProgrammingError('You are trying to commit '
'the transaction does not open')

def _check_release_rollback(self):
self._check_commit_rollback()
if self._unique_id is None:
raise psycopg2.ProgrammingError('You do not start savepoint')

def __repr__(self):
return "<{} transaction={} id={:#x}>".format(
self.__class__.__name__,
self._isolation,
id(self)
)

def __del__(self):
if self._is_begin:
warnings.warn(
"You have not closed transaction {!r}".format(self),
ResourceWarning)

if self._unique_id is not None:
warnings.warn(
"You have not closed savepoint {!r}".format(self),
ResourceWarning)

if PY_35:
@asyncio.coroutine
def __aenter__(self):
return (yield from self.begin())

@asyncio.coroutine
def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
yield from self.rollback()
else:
yield from self.commit()
35 changes: 28 additions & 7 deletions aiopg/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
import sys


PY_35 = sys.version_info >= (3, 5)
PY_352 = sys.version_info >= (3, 5, 2)

if PY_35:
from collections.abc import Coroutine

base = Coroutine
else:
base = object


try:
ensure_future = asyncio.ensure_future
except AttributeError:
Expand All @@ -26,7 +25,6 @@ def create_future(loop):


class _ContextManager(base):

__slots__ = ('_coro', '_obj')

def __init__(self, coro):
Expand Down Expand Up @@ -84,7 +82,6 @@ def __aexit__(self, exc_type, exc, tb):


class _SAConnectionContextManager(_ContextManager):

if PY_35: # pragma: no branch
if PY_352:
def __aiter__(self):
Expand All @@ -97,7 +94,6 @@ def __aiter__(self):


class _PoolContextManager(_ContextManager):

if PY_35:
@asyncio.coroutine
def __aexit__(self, exc_type, exc, tb):
Expand All @@ -106,8 +102,33 @@ def __aexit__(self, exc_type, exc, tb):
self._obj = None


class _TransactionContextManager(_ContextManager):
class _TransactionPointContextManager(_ContextManager):
if PY_35:

@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
yield from self._obj.rollback_savepoint()
else:
yield from self._obj.release_savepoint()

self._obj = None


class _TransactionBeginContextManager(_ContextManager):
if PY_35:

@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not covered by tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

yield from self._obj.rollback()
else:
yield from self._obj.commit()

self._obj = None


class _TransactionContextManager(_ContextManager):
if PY_35:

@asyncio.coroutine
Expand All @@ -121,7 +142,6 @@ def __aexit__(self, exc_type, exc, tb):


class _PoolAcquireContextManager(_ContextManager):

__slots__ = ('_coro', '_conn', '_pool')

def __init__(self, coro, pool):
Expand Down Expand Up @@ -228,6 +248,7 @@ def __exit__(self, *args):
if not PY_35:
try:
from asyncio import coroutines

coroutines._COROUTINE_TYPES += (_ContextManager,)
except:
pass