Skip to content

Commit

Permalink
#197 Add transaction model aiopg
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Firsov committed Nov 29, 2017
1 parent fea8aa5 commit d825cb6
Show file tree
Hide file tree
Showing 9 changed files with 632 additions and 16 deletions.
10 changes: 5 additions & 5 deletions aiopg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from .connection import connect, Connection, TIMEOUT as DEFAULT_TIMEOUT
from .cursor import Cursor
from .pool import create_pool, Pool

from .transaction import IsolationLevel, Transaction

__all__ = ('connect', 'create_pool', 'Connection', 'Cursor', 'Pool',
'version', 'version_info', 'DEFAULT_TIMEOUT')
'version', 'version_info', 'DEFAULT_TIMEOUT', 'IsolationLevel',
'Transaction')

__version__ = '0.13.1'

version = __version__ + ' , Python ' + sys.version


VersionInfo = namedtuple('VersionInfo',
'major minor micro releaselevel serial')

Expand All @@ -40,6 +40,6 @@ def _parse_version(ver):

version_info = _parse_version(__version__)


# make pyflakes happy
(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT)
(connect, create_pool, Connection, Cursor, Pool, DEFAULT_TIMEOUT,
IsolationLevel, Transaction)
15 changes: 13 additions & 2 deletions aiopg/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import psycopg2

from .log import logger
from .utils import PY_35, PY_352
from .transaction import Transaction, IsolationLevel
from .utils import PY_35, PY_352, _TransactionBeginContextManager


class Cursor:

def __init__(self, conn, impl, timeout, echo):
self._conn = conn
self._impl = impl
self._timeout = timeout
self._echo = echo
self._transaction = Transaction(self, IsolationLevel.repeatable_read)

@property
def echo(self):
Expand Down Expand Up @@ -146,6 +147,16 @@ def callproc(self, procname, parameters=None, *, timeout=None):
else:
yield from self._conn._poll(waiter, timeout)

def begin(self):
return _TransactionBeginContextManager(self._transaction.begin())

def begin_nested(self):
if not self._transaction.is_begin:
return _TransactionBeginContextManager(
self._transaction.begin())
else:
return self._transaction.point()

@asyncio.coroutine
def mogrify(self, operation, parameters=None):
"""Return a query string after arguments binding.
Expand Down
191 changes: 191 additions & 0 deletions aiopg/transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import asyncio
import enum
import uuid
import warnings
from abc import ABC, abstractmethod

import psycopg2
from aiopg.utils import PY_35, _TransactionPointContextManager

__all__ = ('IsolationLevel', 'Transaction')


class IsolationCompiler(ABC):
name = ''

__slots__ = ('_readonly', '_deferrable')

def __init__(self, readonly, deferrable):
self._readonly = readonly
self._deferrable = deferrable
self._check_readonly_deferrable()

def _check_readonly_deferrable(self):
available = self._readonly or self._deferrable
if not isinstance(self, SerializableCompiler) and available:
raise ValueError('Is only available for serializable transactions')

def savepoint(self, unique_id):
return 'SAVEPOINT {}'.format(unique_id)

def release_savepoint(self, unique_id):
return 'RELEASE SAVEPOINT {}'.format(unique_id)

def rollback_savepoint(self, unique_id):
return 'ROLLBACK TO SAVEPOINT {}'.format(unique_id)

def commit(self):
return 'COMMIT'

def rollback(self):
return 'ROLLBACK'

@abstractmethod
def begin(self):
raise NotImplementedError("Please Implement this method")

def __repr__(self):
return self.name


class ReadCommittedCompiler(IsolationCompiler):
name = 'Read committed'

def begin(self):
return 'BEGIN'


class RepeatableReadCompiler(IsolationCompiler):
name = 'Repeatable read'

def begin(self):
return 'BEGIN ISOLATION LEVEL REPEATABLE READ'


class SerializableCompiler(IsolationCompiler):
name = 'Serializable'

def begin(self):
query = 'BEGIN ISOLATION LEVEL SERIALIZABLE'

if self._readonly:
query += ' READ ONLY'

if self._deferrable:
query += ' DEFERRABLE'

return query


class IsolationLevel(enum.Enum):
serializable = SerializableCompiler
repeatable_read = RepeatableReadCompiler
read_committed = ReadCommittedCompiler

def __call__(self, readonly, deferrable):
return self.value(readonly, deferrable)


class Transaction:
__slots__ = ('_cur', '_is_begin', '_isolation', '_unique_id')

def __init__(self, cur, isolation_level,
readonly=False, deferrable=False):
self._cur = cur
self._is_begin = False
self._unique_id = None
self._isolation = isolation_level(readonly, deferrable)

@property
def is_begin(self):
return self._is_begin

@asyncio.coroutine
def begin(self):
if self._is_begin:
raise psycopg2.ProgrammingError(
'You are trying to open a new transaction, use the save point')
self._is_begin = True
yield from self._cur.execute(self._isolation.begin())
return self

@asyncio.coroutine
def commit(self):
self._check_commit_rollback()
yield from self._cur.execute(self._isolation.commit())
self._is_begin = False

@asyncio.coroutine
def rollback(self):
self._check_commit_rollback()
yield from self._cur.execute(self._isolation.rollback())
self._is_begin = False

@asyncio.coroutine
def rollback_savepoint(self):
self._check_release_rollback()
yield from self._cur.execute(
self._isolation.rollback_savepoint(self._unique_id))
self._unique_id = None

@asyncio.coroutine
def release_savepoint(self):
self._check_release_rollback()
yield from self._cur.execute(
self._isolation.release_savepoint(self._unique_id))
self._unique_id = None

@asyncio.coroutine
def savepoint(self):
self._check_commit_rollback()
if self._unique_id is not None:
raise psycopg2.ProgrammingError('You do not shut down savepoint')

self._unique_id = 's{}'.format(uuid.uuid1().hex)
yield from self._cur.execute(
self._isolation.savepoint(self._unique_id))

return self

def point(self):
return _TransactionPointContextManager(self.savepoint())

def _check_commit_rollback(self):
if not self._is_begin:
raise psycopg2.ProgrammingError('You are trying to commit '
'the transaction does not open')

def _check_release_rollback(self):
self._check_commit_rollback()
if self._unique_id is None:
raise psycopg2.ProgrammingError('You do not start savepoint')

def __repr__(self):
return "<{} transaction={} id={:#x}>".format(
self.__class__.__name__,
self._isolation,
id(self)
)

def __del__(self):
if self._is_begin:
warnings.warn(
"You have not closed transaction {!r}".format(self),
ResourceWarning)

if self._unique_id is not None:
warnings.warn(
"You have not closed savepoint {!r}".format(self),
ResourceWarning)

if PY_35:
@asyncio.coroutine
def __aenter__(self):
return (yield from self.begin())

@asyncio.coroutine
def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
yield from self.rollback()
else:
yield from self.commit()
35 changes: 28 additions & 7 deletions aiopg/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
import sys


PY_35 = sys.version_info >= (3, 5)
PY_352 = sys.version_info >= (3, 5, 2)

if PY_35:
from collections.abc import Coroutine

base = Coroutine
else:
base = object


try:
ensure_future = asyncio.ensure_future
except AttributeError:
Expand All @@ -26,7 +25,6 @@ def create_future(loop):


class _ContextManager(base):

__slots__ = ('_coro', '_obj')

def __init__(self, coro):
Expand Down Expand Up @@ -84,7 +82,6 @@ def __aexit__(self, exc_type, exc, tb):


class _SAConnectionContextManager(_ContextManager):

if PY_35: # pragma: no branch
if PY_352:
def __aiter__(self):
Expand All @@ -97,7 +94,6 @@ def __aiter__(self):


class _PoolContextManager(_ContextManager):

if PY_35:
@asyncio.coroutine
def __aexit__(self, exc_type, exc, tb):
Expand All @@ -106,8 +102,33 @@ def __aexit__(self, exc_type, exc, tb):
self._obj = None


class _TransactionContextManager(_ContextManager):
class _TransactionPointContextManager(_ContextManager):
if PY_35:

@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
yield from self._obj.rollback_savepoint()
else:
yield from self._obj.release_savepoint()

self._obj = None


class _TransactionBeginContextManager(_ContextManager):
if PY_35:

@asyncio.coroutine
def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
yield from self._obj.rollback()
else:
yield from self._obj.commit()

self._obj = None


class _TransactionContextManager(_ContextManager):
if PY_35:

@asyncio.coroutine
Expand All @@ -121,7 +142,6 @@ def __aexit__(self, exc_type, exc, tb):


class _PoolAcquireContextManager(_ContextManager):

__slots__ = ('_coro', '_conn', '_pool')

def __init__(self, coro, pool):
Expand Down Expand Up @@ -228,6 +248,7 @@ def __exit__(self, *args):
if not PY_35:
try:
from asyncio import coroutines

coroutines._COROUTINE_TYPES += (_ContextManager,)
except:
pass
34 changes: 34 additions & 0 deletions examples/transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import asyncio

import aiopg
from aiopg.transaction import Transaction, IsolationLevel

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


async def transaction(cur, isolation_level,
readonly=False, deferrable=False):
async with Transaction(cur, isolation_level,
readonly, deferrable) as transaction:
await cur.execute('insert into tbl values (1)')

async with transaction.point():
await cur.execute('insert into tbl values (3)')

await cur.execute('insert into tbl values (4)')


async def main():
async with aiopg.create_pool(dsn) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute('CREATE TABLE tbl (id int)')
await transaction(cur, IsolationLevel.repeatable_read)
await transaction(cur, IsolationLevel.read_committed)
await transaction(cur, IsolationLevel.serializable)

await cur.execute('select * from tbl')


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

0 comments on commit d825cb6

Please sign in to comment.