Skip to content

Commit

Permalink
Switch PJDataManager to use connection pool instead of connection dir…
Browse files Browse the repository at this point in the history
…ectly

This allows for more flexible connection management. The pool can control
when to reuse or when to close the database connection.
  • Loading branch information
kedder committed Feb 22, 2021
1 parent 40b144f commit 8c0d17d
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 80 deletions.
11 changes: 10 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@ CHANGES
=======


2.0.2 (unreleased)
3.0.0 (unreleased)
------------------

- Backwards incompatible change: PJDataManager now accepts a pool instead
of connection object. PJDataManager will get the connection from the pool
when joining the transaction, and return it back when transaction
completes (aborts or commits). This allows for more flexible connection
management. The connection pool must implement IPJConnectionPool interface
(it is compatible with psycopg2.pool).

- `IPJDataManager.begin()` is renamed to `setTransactionOptions()`

- Errors executing SQL statements now doom the entire transaction,
causing `transaction.interfaces.DoomedTransaction` exception on
any attempts to commit it. A failed transaction must be aborted.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def alltests():

setup(
name='pjpersist',
version='2.0.2.dev0',
version='2.0.2.dev5',
author="Shoobx Team",
author_email="dev@shoobx.com",
url='https://github.com/Shoobx/pjpersist',
Expand Down
117 changes: 79 additions & 38 deletions src/pjpersist/datamanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@
import hashlib
import logging
import os
import psycopg2
import psycopg2.extensions
import psycopg2.extras
import psycopg2.errorcodes
import pjpersist.sqlbuilder as sb
import random
import re
import socket
import struct
import threading
import time
import traceback
import transaction
import uuid
import zope.interface
from collections.abc import MutableMapping
from typing import Optional

import psycopg2
import psycopg2.errorcodes
import psycopg2.extensions
import psycopg2.extras
import transaction

import pjpersist.sqlbuilder as sb
import zope.interface
from pjpersist import interfaces, serialize
from pjpersist.querystats import QueryReport

Expand Down Expand Up @@ -427,11 +429,16 @@ class PJDataManager(object):
_pristine = True

# Flag showing whether there is any write in the current transaction
_dirty = False
_dirty: bool

def __init__(self, conn, root_table=None):
self._conn = conn
self.database = get_database_name_from_dsn(conn.dsn)
_isolation_level: Optional[str]
_readonly: Optional[bool]
_deferrable: Optional[bool]

def __init__(self, pool, root_table=None):
self._conn = None
self.database = None
self._pool = pool
self._reader = serialize.ObjectReader(self)
self._writer = serialize.ObjectWriter(self)
# All of the following object lists are keys by object id. This is
Expand All @@ -442,6 +449,12 @@ def __init__(self, conn, root_table=None):
self._loaded_objects = {}
self._inserted_objects = {}
self._removed_objects = {}
self._dirty = False

self._readonly = None
self._deferrable = None
self._isolation_level = None

# The latest states written to the database.
self._latest_states = {}
self._needs_to_join = True
Expand Down Expand Up @@ -662,6 +675,8 @@ def _get_doc_object(self, obj):

def _join_txn(self):
if self._needs_to_join:
self._acquire_conn()
# once we have a working connection, we can join the transaction
transaction = self.transaction_manager.get()
transaction.join(self)
self._needs_to_join = False
Expand All @@ -686,12 +701,14 @@ def dump(self, obj):
return res

def load(self, dbref, klass=None):
self._join_txn()
dm = self
if dbref.database != self.database:
# This is a reference of object from different database! We need to
# locate the suitable data manager for this.
dmp = zope.component.getUtility(interfaces.IPJDataManagerProvider)
dm = dmp.get(dbref.database)
dm._join_txn()
assert dm.database == dbref.database, (dm.database, dbref.database)
return dm.load(dbref, klass)

Expand All @@ -702,7 +719,13 @@ def reset(self):
# DB updates, not just reset PJDataManager state
self.abort(None)

def _release(self, conn):
def _acquire_conn(self):
"""Get connection from connection pool"""
self._conn = self._pool.getconn()
assert self._conn.status == psycopg2.extensions.STATUS_READY
self.database = get_database_name_from_dsn(self._conn.dsn)

def _release_conn(self, conn):
"""Release the connection after transaction is complete
"""
if not conn.closed:
Expand All @@ -712,7 +735,8 @@ def _release(self, conn):
isolation_level="DEFAULT",
readonly="DEFAULT",
deferrable="DEFAULT")
self.__init__(conn)
self._pool.putconn(conn)
self.__init__(self._pool)

def insert(self, obj, oid=None):
self._join_txn()
Expand Down Expand Up @@ -780,6 +804,9 @@ def oldstate(self, obj, tid):
def register(self, obj):
self._join_txn()

if self._readonly:
raise interfaces.ReadOnlyDataManagerError()

# Do not bring back removed objects. But only main the document
# objects can be removed, so check for that.
if id(self._get_doc_object(obj)) in self._removed_objects:
Expand All @@ -794,6 +821,9 @@ def register(self, obj):
obj_registered(self)

def abort(self, transaction):
if self._conn is None:
# Connection was never aqcuired - nothing to abort
return
self._report_stats()
try:
if self._tpc_activated:
Expand All @@ -804,7 +834,7 @@ def abort(self, transaction):
# this happens usually when PG is restarted and the connection dies
# our only chance to exit the spiral is to abort the transaction
pass
self._release(self._conn)
self._release_conn(self._conn)
self._dirty = False

def _might_execute_with_error(self, op):
Expand All @@ -816,26 +846,23 @@ def _might_execute_with_error(self, op):
check_for_disconnect(e, "DataManager.commit")
raise

def begin(self, readonly=None, deferrable=None, isolation_level=None):
"""Join transaction and begin it with given options
This only works for new database transactions and fail if a
transaction was already started on current connection.
"""

assert self._conn.status == psycopg2.extensions.STATUS_READY

try:
self._conn.set_session(isolation_level=isolation_level,
deferrable=deferrable,
readonly=readonly)
except psycopg2.Error as e:
check_for_disconnect(e, 'PJDataManager.begin')
raise

self._join_txn()
def setTransactionOptions(
self,
readonly: bool = None,
deferrable: bool = None,
isolation_level: int = None):
if isolation_level is not None:
self._isolation_level = isolation_level
if readonly is not None:
self._readonly = readonly
if deferrable is not None:
self._deferrable = deferrable

def _begin(self, transaction):
self._conn.set_session(isolation_level=self._isolation_level,
deferrable=self._deferrable,
readonly=self._readonly)

# This function is called when PJDataManager joins transaction. When
# two phase commit is requested, we will assign transaction id to
# underlying connection.
Expand Down Expand Up @@ -880,9 +907,11 @@ def commit(self, transaction):
self._log_rw_stats()

if not self._tpc_activated:
self._might_execute_with_error(self._conn.commit)
self._release(self._conn)
self._dirty = False
try:
self._might_execute_with_error(self._conn.commit)
self._dirty = False
finally:
self._release_conn(self._conn)

def tpc_begin(self, transaction):
pass
Expand All @@ -907,9 +936,11 @@ def tpc_vote(self, transaction):
def tpc_finish(self, transaction):
if self._tpc_activated:
self._report_stats()
self._might_execute_with_error(self._conn.tpc_commit)
self._release(self._conn)
self._dirty = False
try:
self._might_execute_with_error(self._conn.tpc_commit)
self._dirty = False
finally:
self._release_conn(self._conn)

def tpc_abort(self, transaction):
self.abort(transaction)
Expand All @@ -935,8 +966,18 @@ def isDirty(self):
return self._dirty or bool(self._registered_objects)

def setDirty(self):
if self._readonly:
raise interfaces.ReadOnlyDataManagerError()
self._dirty = True

def __del__(self):
if self._conn is None:
return

LOG.warning("Releasing connection after destroying PJDataManager. "
"Active transaction will be aborted.")
self._release_conn(self._conn)


def get_database_name_from_dsn(dsn):
import re
Expand Down
27 changes: 27 additions & 0 deletions src/pjpersist/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class DatabaseDisconnected(transaction.interfaces.TransientError):
pass


class ReadOnlyDataManagerError(transaction.interfaces.TransactionError):
"""Attempt to modify objects governed by a read-only data manager."""


class IObjectSerializer(zope.interface.Interface):
"""An object serializer allows for custom serialization output for
objects."""
Expand Down Expand Up @@ -114,6 +118,20 @@ def get_ghost(coll_name, oid):
"""


class IPJConnectionPool(zope.interface.Interface):
def getconn():
"""Get connection from the pool"""

def putconn(conn):
"""Put conncetion back to the pool"""

def closeall():
"""Close all connection in the pool"""

def reopen():
"""Reopen the pool after it was closed"""


class IPJDataManager(persistent.interfaces.IPersistentDataManager):
"""A persistent data manager that stores data in PostGreSQL/JSONB."""

Expand Down Expand Up @@ -169,6 +187,15 @@ def isDirty():
def setDirty():
"""Set dirty flag"""

def setTransactionOptions(
readonly: bool = None,
deferrable: bool = None,
isolation_level: int = None) -> None:
"""Set the options for the future transaction
Options can only be set before the postgres transaction has started
"""


class IPJDataManagerProvider(zope.interface.Interface):
"""Utility to get a PJ data manager.
Expand Down
32 changes: 28 additions & 4 deletions src/pjpersist/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,30 @@
DBNAME_OTHER = 'pjpersist_test_other'


class DummyConnectionPool:
def __init__(self, conn):
self._available = conn
self._taken = None

def getconn(self):
if self._available is None:
raise PoolError("Connection is already taken")
self._available.reset()
self._available.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self._taken = self._available
self._available = None
return self._taken

def putconn(self, conn, key=None, close=False):
assert conn is self._taken
self._available = self._taken
self._taken = None

def isTaken(self):
return self._taken is not None


@zope.interface.implementer(interfaces.IPJDataManagerProvider)
class SimpleDataManagerProvider(object):
def __init__(self, dms, default=None):
Expand Down Expand Up @@ -134,14 +158,14 @@ def setUp(test):
cleanDB(g['conn'])
cleanDB(g['conn_other'])
g['commit'] = transaction.commit
g['dm'] = datamanager.PJDataManager(g['conn'])
g['dm_other'] = datamanager.PJDataManager(g['conn_other'])
g['dm'] = datamanager.PJDataManager(DummyConnectionPool(g['conn']))
g['dm_other'] = datamanager.PJDataManager(DummyConnectionPool(g['conn_other']))

def dumpTable(table, flush=True, isolate=False):
if isolate:
conn = getConnection(database=DBNAME)
else:
conn = g['dm']._conn
conn = g['conn']
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
try:
cur.execute('SELECT * FROM ' + table)
Expand Down Expand Up @@ -212,7 +236,7 @@ def setUp(self):
setUpSerializers(self)
self.conn = getConnection(DBNAME)
cleanDB(self.conn)
self.dm = datamanager.PJDataManager(self.conn)
self.dm = datamanager.PJDataManager(DummyConnectionPool(self.conn))

def tearDown(self):
datamanager.CONFLICT_TRACEBACK_INFO.traceback = None
Expand Down
Loading

0 comments on commit 8c0d17d

Please sign in to comment.