Skip to content

Commit

Permalink
more tuned sa event sourcing & complete tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lxyu committed Nov 25, 2014
1 parent 1e5c4d7 commit 9be3df1
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tests/conf.json

# intellij files
.idea
out
*.xml
*.iml

Expand Down
61 changes: 40 additions & 21 deletions meepo/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,24 +144,36 @@ def _pub(obj, action):
sg.send(pk)
sg_raw.send(obj)

def _init_session(session):
def session_init(session, *args, **kwargs):
if hasattr(session, "meepo_unique_id"):
logger.debug("skipped - session_init")
return

for action in ("write", "update", "delete"):
attr = "pending_{}".format(action)
if not hasattr(session, attr):
setattr(session, attr, set())
session.meepo_unique_id = uuid.uuid4().hex
logger.debug("%s - session_init" % session.meepo_unique_id)

def session_del(session):
del session.meepo_unique_id
del session.pending_write
del session.pending_update
del session.pending_delete

def after_flush_hook(session, flush_ctx):
def before_flush_hook(session, flush_ctx, nonsense):
"""Record session changes on flush
"""
_init_session(session)
session_init(session)
logger.debug("%s - before_flush" % session.meepo_unique_id)
session.pending_write |= set(session.new)
session.pending_update |= set(session.dirty)
session.pending_delete |= set(session.deleted)
event.listen(dbsession, "after_flush", after_flush_hook)
event.listen(dbsession, "before_flush", before_flush_hook)

def _pub_session(session):
_init_session(session)

logger.debug("pub_session")
for obj in session.pending_write:
_pub(obj, action="write")
for obj in session.pending_update:
Expand All @@ -178,17 +190,20 @@ def after_commit_hook(session):
"""Publish signals
"""
_pub_session(session)
session_del(session)
event.listen(dbsession, "after_commit", after_commit_hook)

else:
logger.debug("strict_tables: {}".format(strict_tables))

def session_prepare(session):
def session_prepare(session, flush_ctx):
"""Record session prepare state in before_commit
"""
assert not hasattr(session, 'meepo_unique_id')
_init_session(session)
session.meepo_unique_id = uuid.uuid4().hex
if not hasattr(session, 'meepo_unique_id'):
session_init(session)

logger.debug("%s - after_flush" % session.meepo_unique_id)

for action in ("write", "update", "delete"):
objs = [o for o in getattr(session, "pending_%s" % action)
if o.__table__.fullname in strict_tables]
Expand All @@ -198,33 +213,37 @@ def session_prepare(session):
prepare_event = collections.defaultdict(set)
for obj in objs:
prepare_event[obj.__table__.fullname].add(_pk(obj))
logger.debug("session_prepare {}: {} -> {}".format(
action, session.meepo_unique_id, prepare_event))
logger.debug("{} - session_prepare_{} -> {}".format(
session.meepo_unique_id, action, prepare_event))
signal("session_prepare").send(
prepare_event, sid=session.meepo_unique_id, action=action)
event.listen(dbsession, "before_commit", session_prepare)
event.listen(dbsession, "after_flush", session_prepare)

def session_commit(session):
"""Commit session in after_commit
"""
assert hasattr(session, 'meepo_unique_id')
# this may happen when there's nothing to commit
if not hasattr(session, 'meepo_unique_id'):
logger.debug("skipped - after_commit")
return

# normal session pub
logger.debug("%s - after_commit" % session.meepo_unique_id)
_pub_session(session)

logger.debug("session_commit: {}".format(session.meepo_unique_id))
signal("session_commit").send(session.meepo_unique_id)
del session.meepo_unique_id
session_del(session)
event.listen(dbsession, "after_commit", session_commit)

def session_rollback(session):
"""Unprepare session in after_rollback.
"""Clean session in after_rollback.
"""
# this may happen when there's nothing to rollback
if not hasattr(session, 'meepo_unique_id'):
logger.debug("skipped - after_rollback")
return

logger.debug("session_rollback: {}".format(
session.meepo_unique_id))
# del session meepo id after rollback
logger.debug("%s - after_rollback" % session.meepo_unique_id)
signal("session_rollback").send(session.meepo_unique_id)
del session.meepo_unique_id
session_del(session)
event.listen(dbsession, "after_rollback", session_rollback)
2 changes: 1 addition & 1 deletion meepo/replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def event(self, *topics, **kwargs):
the replication job.
"""
workers = kwargs.pop("workers", 1)
multi = kwargs.pop("multi", False)
multi = kwargs.pop("multi", false)
queue_limit = kwargs.pop("queue_limit", 10000)

def wrapper(func):
Expand Down
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from __future__ import absolute_import

import logging
logging.basicConfig(level=logging.DEBUG)

import json
import os

Expand All @@ -27,6 +30,8 @@ def mysql_dsn(conf):
This fixture will init a clean meepo_test database with a 'test' table
"""
logger = logging.getLogger("fixture_mysql_dsn")

dsn = conf["mysql_dsn"] if conf else "mysql+pymysql://root@localhost/"

# init database
Expand All @@ -53,7 +58,7 @@ def mysql_dsn(conf):
RESET MASTER;
"""
cursor.execute(sql)
print("executed")
logging.debug("executed")

# release conn
cursor.close()
Expand Down
6 changes: 6 additions & 0 deletions tests/test_mysql_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

from __future__ import absolute_import

import logging
logging.basicConfig(level=logging.DEBUG)

import pymysql
from blinker import signal

from meepo._compat import urlparse


def test_mysql_pub(mysql_dsn):
logger = logging.getLogger("test_mysql_pub")

def test_sg(sg_list):
return lambda pk: sg_list.append(pk)

Expand Down Expand Up @@ -54,6 +59,7 @@ def test_sg(sg_list):
cursor.close()
conn.commit()
conn.close()
logger.debug("mysql binlog generated")

from meepo.pub import mysql_pub
mysql_pub(mysql_dsn, tables=["test"])
Expand Down
188 changes: 188 additions & 0 deletions tests/test_sqlalchemy_pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-

from __future__ import absolute_import

import logging
logging.basicConfig(level=logging.DEBUG)

from blinker import signal


def test_sqlalchemy_pub(mysql_dsn):
(t_writes, t_updates, t_deletes,
s_events, s_commits, s_rollbacks) = ([] for _ in range(6))

def _clear():
t_writes.clear()
t_updates.clear()
t_deletes.clear()
s_events.clear()
s_commits.clear()
s_rollbacks.clear()

def test_sg(sg_list):
return lambda pk: sg_list.append(pk)

# connect table action signal
signal("test_write").connect(test_sg(t_writes), weak=False)
signal("test_update").connect(test_sg(t_updates), weak=False)
signal("test_delete").connect(test_sg(t_deletes), weak=False)

# connect session action signal
def session_prepare(event, sid, action):
s_events.append({"event": event, "sid": sid, "action": action})
signal("session_prepare").connect(session_prepare, weak=False)
signal("session_commit").connect(test_sg(s_commits), weak=False)
signal("session_rollback").connect(test_sg(s_rollbacks), weak=False)

import sqlalchemy as sa
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.ext.automap import automap_base

# sqlalchemy prepare
engine = sa.create_engine(mysql_dsn)
base = automap_base()
base.prepare(engine=engine, reflect=True)
Session = scoped_session(sessionmaker(bind=engine, expire_on_commit=False))
Test = base.classes["test"]

# install sqlalchemy_pub hook
from meepo.pub import sqlalchemy_pub
sqlalchemy_pub(Session, strict_tables=["test"])

# test empty commit
_clear()
Session().commit()
assert [t_writes, t_updates, t_deletes,
s_events, s_commits, s_rollbacks] == [[]] * 6

# test single write
_clear()
session = Session()
t_a = Test(data='a')
session.add(t_a)
session.commit()
session.close()

event, sid = s_events.pop(), s_commits.pop()
assert t_writes == [1]
assert event == {"sid": sid, "action": "write", "event": {"test": {1}}}
assert [t_updates, t_deletes, s_rollbacks] == [[]] * 3

# test single flush - write
_clear()
session = Session()
t_b = Test(data='b')
session.add(t_b)
session.flush()
session.commit()
session.close()

event, sid = s_events.pop(), s_commits.pop()
assert t_writes == [2]
assert event == {"sid": sid, "action": "write", "event": {"test": {2}}}
assert [t_updates, t_deletes, s_rollbacks] == [[]] * 3

# test multiple writes
_clear()
session = Session()
t_c = Test(data='c')
t_d = Test(data='d')
session.add(t_c)
session.add(t_d)
session.commit()
session.close()

event, sid = s_events.pop(), s_commits.pop()
assert set(t_writes) == {3, 4}
assert event == {"sid": sid, "action": "write", "event": {"test": {3, 4}}}
assert [t_updates, t_deletes, s_rollbacks] == [[]] * 3

# test single update
_clear()
session = Session()
t_a = session.query(Test).filter(Test.data == 'a').one()
t_a.data = "aa"
session.commit()
session.close()

event, sid = s_events.pop(), s_commits.pop()
assert set(t_updates) == {1}
assert event == {"sid": sid, "action": "update", "event": {"test": {1}}}
assert [t_writes, t_deletes, s_rollbacks] == [[]] * 3

# test single flush - update
_clear()
session = Session()
t_a = session.query(Test).filter(Test.data == 'aa').one()
t_a.data = "a"
session.flush()
session.commit()
session.close()

event, sid = s_events.pop(), s_commits.pop()
assert set(t_updates) == {1}
assert event == {"sid": sid, "action": "update", "event": {"test": {1}}}
assert [t_writes, t_deletes, s_rollbacks] == [[]] * 3

# test mixed write, update, delete & multi flush
_clear()
session = Session()
t_b, t_c = session.query(Test).filter(Test.data.in_(('b', 'c'))).all()
t_e = Test(data='e')
session.add(t_e)
t_b.data = "x"
session.flush()
session.delete(t_c)
session.commit()
session.close()

# one success commit yields one commit sid
assert len(s_commits) == 1

# since the commit include a flush in it, if a flush happened in the middle
# of transaction, it will cause the same "session_prepare" events to be
# signaled multiple times.
assert s_events[:2] == s_events[2:4]

# test session events
sid = s_commits.pop()
assert s_events[2:] == [
{"sid": sid, "action": "write", "event": {"test": {5}}},
{"sid": sid, "action": "update", "event": {"test": {2}}},
{"sid": sid, "action": "delete", "event": {"test": {3}}}
]
assert (t_writes, t_updates, t_deletes) == ([5], [2], [3])

# test empty rollback
_clear()
Session().rollback()
assert [t_writes, t_updates, t_deletes,
s_events, s_commits, s_rollbacks] == [[]] * 6

# test rollback before flush
_clear()
session = Session()
t_e = Test(data='e')
session.add(t_e)
session.rollback()
session.close()

# rollback happened before flush, nothing recorded.
assert [t_writes, t_updates, t_deletes,
s_events, s_commits, s_rollbacks] == [[]] * 6

# test flush & rollback
_clear()
session = Session()
t_e = Test(data='e')
session.add(t_e)
session.flush()
session.rollback()
session.close()

# test session events, since rollback happened after flush, the write
# have a pk value.
event, sid = s_events.pop(), s_rollbacks.pop()
assert event == {"sid": sid, "action": "write", "event": {"test": {6}}}
assert [t_writes, t_updates, t_deletes, s_commits] == [[]] * 4

0 comments on commit 9be3df1

Please sign in to comment.