Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #120 from haizaar/master
Browse files Browse the repository at this point in the history
Fixes to re-connect logic
  • Loading branch information
haizaar committed Aug 27, 2015
2 parents 42562db + 101f312 commit d209d00
Show file tree
Hide file tree
Showing 30 changed files with 3,265 additions and 56 deletions.
7 changes: 6 additions & 1 deletion .travis.yml
Expand Up @@ -9,15 +9,20 @@ python:
- pypy

addons:
postgresql: "9.2"
postgresql: "9.4"

before_script:
- psql -c 'CREATE DATABASE momoko_test;' -U postgres
- psql -U postgres momoko_test -c 'CREATE EXTENSION IF NOT EXISTS hstore'
- make -C tcproxy
- env

env:
global:
- MOMOKO_TEST_HSTORE=1
- MOMOKO_TEST_JSON=1
- MOMOKO_TEST_HOST=127.0.0.1
- PGHOST=127.0.0.1
matrix:
- MOMOKO_PSYCOPG2_IMPL=psycopg2
- MOMOKO_PSYCOPG2_IMPL=psycopg2cffi
Expand Down
1 change: 1 addition & 0 deletions README.rst
Expand Up @@ -40,6 +40,7 @@ Testing
Set the following environment variables with your own values before running the
unit tests::

make -C tcpproxy
export MOMOKO_TEST_DB='your_db'
export MOMOKO_TEST_USER='your_user'
export MOMOKO_TEST_PASSWORD='your_password'
Expand Down
10 changes: 8 additions & 2 deletions docs/installation.rst
Expand Up @@ -22,7 +22,7 @@ can also be used by setting the ``MOMOKO_PSYCOPG2_IMPL`` environment variable to
# 'psycopg2' or 'psycopg2cffi'
export MOMOKO_PSYCOPG2_IMPL='psycopg2cffi'

The unit tests als use this variable. It needs to be set if something else is used
The unit tests all use this variable. It needs to be set if something else is used
instead of Psycopg2 when running the unit tests. Besides ``MOMOKO_PSYCOPG2_IMPL``
there are also other variables that need to be set for the unit tests.

Expand All @@ -37,11 +37,17 @@ Here's an example for the environment variables::
# Set to '0' if hstore extension isn't enabled
export MOMOKO_TEST_HSTORE='1' # Default: 0

And running the tests is easy::
Momoko tests use tcproxy_ for simulating Postgres server unavailablity. The copy
of tcproxy is bundled with Momoko, but you need to build it first::

make -C tcproxy

Finally, running the tests is easy::

python setup.py test


.. _tcproxy: https://github.com/dccmx/tcproxy
.. _psycopg2cffi: http://pypi.python.org/pypi/psycopg2cffi
.. _Tornado: http://www.tornadoweb.org/
.. _Psycopg2: http://initd.org/psycopg/
Expand Down
82 changes: 55 additions & 27 deletions momoko/connection.py
Expand Up @@ -173,7 +173,7 @@ class Pool(object):
:param list setsession:
List of intial sql commands to be executed once connection is established.
If any of the commands failes, the connection will be closed.
If any of the commands fails, the connection will be closed.
**NOTE:** The commands will be executed as one transaction block.
:param bool auto_shrink:
Expand All @@ -195,6 +195,10 @@ class Pool(object):
.. _psycopg2.extensions.connection: http://initd.org/psycopg/docs/connection.html#connection
.. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor
"""

class DatabaseNotAvailable(psycopg2.DatabaseError):
"""Raised when Pool can not connect to database server"""

def __init__(self,
dsn,
connection_factory=None,
Expand Down Expand Up @@ -232,7 +236,7 @@ def __init__(self,
self.conns = ConnectionContainer()

self._last_connect_time = 0
self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available")
self._no_conn_availble_error = self.DatabaseNotAvailable("No database connection available")
self.shrink_period = shrink_period
self.shrink_delay = shrink_delay
self.auto_shrink = auto_shrink
Expand Down Expand Up @@ -440,36 +444,35 @@ def when_available(fut):
conn = fut.result()
except psycopg2.Error as error:
future.set_exc_info(sys.exc_info())
if retry:
if retry and not keep:
self.putconn(retry[0])
return

log.debug("Obtained connection: %s", conn.fileno)
try:
future_or_result = method(conn, *args, **kwargs)
except psycopg2.Error as error:
if conn.closed:
if not retry:
retry.append(conn)
self.ioloop.add_future(conn.connect(), when_available)
return
else:
future.set_exception(self._no_conn_availble_error)
else:
future.set_exc_info(sys.exc_info())
log.debug(2)
self.putconn(conn)
return
log.debug("Method failed synchronously")
return self._retry(retry, when_available, conn, keep, future)

if not async:
if not keep:
self.putconn(conn)
future.set_result(future_or_result)
log.debug(3)
self.putconn(conn)
return

chain_future(future_or_result, future)
if not keep:
future.add_done_callback(lambda f: self.putconn(conn))
def when_done(rfut):
try:
result = rfut.result()
except psycopg2.Error as error:
log.debug("Method failed Asynchronously")
return self._retry(retry, when_available, conn, keep, future)

if not keep:
self.putconn(conn)
future.set_result(result)

self.ioloop.add_future(future_or_result, when_done)

if not connection:
self.ioloop.add_future(self.getconn(ping=False), when_available)
Expand All @@ -479,6 +482,20 @@ def when_available(fut):
when_available(f)
return future

def _retry(self, retry, what, conn, keep, future):
if conn.closed:
if not retry:
retry.append(conn)
self.ioloop.add_future(conn.connect(), what)
return
else:
future.set_exception(self._no_conn_availble_error)
else:
future.set_exc_info(sys.exc_info())
if not keep:
self.putconn(conn)
return

def _reanimate(self):
assert self.conns.dead, "BUG: don't call reanimate when there is no one to reanimate"

Expand Down Expand Up @@ -556,7 +573,10 @@ def on_ping_done(ping_fut):
try:
ping_fut.result()
except psycopg2.Error as error:
ping_future.set_exc_info(error)
if conn.closed:
ping_future.set_exception(self._no_conn_availble_error)
else:
ping_future.set_exc_info(sys.exc_info())
self.putconn(conn)
else:
ping_future.set_result(conn)
Expand Down Expand Up @@ -598,7 +618,7 @@ class Connection(object):
:param list setsession:
List of intial sql commands to be executed once connection is established.
If any of the commands failes, the connection will be closed.
If any of the commands fails, the connection will be closed.
**NOTE:** The commands will be executed as one transaction block.
.. _Data Source Name: http://en.wikipedia.org/wiki/Data_Source_Name
Expand Down Expand Up @@ -636,6 +656,7 @@ def connect(self):
try:
self.connection = psycopg2.connect(self.dsn, **kwargs)
except psycopg2.Error as error:
self.connection = None
future.set_exc_info(sys.exc_info())
return future

Expand All @@ -654,6 +675,7 @@ def on_connect(on_connect_future):

self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE)
self.ioloop.add_future(future, self._set_server_version)
self.ioloop.add_future(future, self._close_on_fail)

return future

Expand All @@ -662,6 +684,12 @@ def _set_server_version(self, future):
return
self.server_version = self.connection.server_version

def _close_on_fail(self, future):
# If connection attempt evetually fails - marks connection as closed by ourselves
# since psycopg2 does not do that for us (on connection attempts)
if future.exception():
self.connection = None

def _io_callback(self, future, result, fd=None, events=None):
try:
state = self.connection.poll()
Expand All @@ -677,7 +705,7 @@ def _io_callback(self, future, result, fd=None, events=None):
elif state == POLL_WRITE:
self.ioloop.update_handler(self.fileno, IOLoop.WRITE)
else:
future.set_exception(psycopg2.OperationalError('poll() returned {0}'.format(state)))
future.set_exception(psycopg2.OperationalError("poll() returned %s" % state))

def ping(self):
"""
Expand Down Expand Up @@ -797,7 +825,7 @@ def transaction(self,
The class returned must be a subclass of `psycopg2.extensions.cursor`_.
See `Connection and cursor factories`_ for details. Defaults to ``None``.
:param bool auto_rollback:
If one of the transaction statements failes, try to automatically
If one of the transaction statements fails, try to automatically
execute ROLLBACK to abort the transaction. If ROLLBACK fails, it would
not be raised, but only logged.
Expand All @@ -818,10 +846,10 @@ def exec_statements(future):
cursor = future.result()
cursors.append(cursor)
except Exception as error:
if not auto_rollback:
transaction_future.set_exc_info(sys.exc_info())
else:
if auto_rollback and not self.closed:
self._rollback(transaction_future, error)
else:
transaction_future.set_exc_info(sys.exc_info())
return

try:
Expand Down
1 change: 0 additions & 1 deletion setup.py
Expand Up @@ -44,7 +44,6 @@
license='MIT',
test_suite='tests',
install_requires=dependencies,
test_require=dependencies + ["unittest2"],
classifiers = [
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
Expand Down
9 changes: 9 additions & 0 deletions tcproxy/.gitignore
@@ -0,0 +1,9 @@
*.o
*.DS_Store
*.log
cachegrind.out.*
callgrind.out.*
massif.out.*
memcheck.out
tcproxy
core
18 changes: 18 additions & 0 deletions tcproxy/ChangeLog
@@ -0,0 +1,18 @@
2011-06-2 Release 0.2.2
* fix a bug that causes infinit loop when process write

2011-05-21 Release 0.2.1
* fix ip address parse bug

2011-05-20 Release 0.2
* nonblock connect implemented
* semicolon bind to address not port now
* some performance optimization
* some bugfix

2011-05-18 Release 0.1.1
* -l option added to write log file
* minor code refinement

2011-05-16 Release 0.1
* first public beta release
19 changes: 19 additions & 0 deletions tcproxy/LICENSE
@@ -0,0 +1,19 @@
Copyright dccmx. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to
deal in the Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
10 changes: 10 additions & 0 deletions tcproxy/Makefile
@@ -0,0 +1,10 @@
#
# tcproxy - Makefile
#
# Author: dccmx <dccmx@dccmx.com>
#

default: all

.DEFAULT:
cd src && $(MAKE) $@
15 changes: 15 additions & 0 deletions tcproxy/README.md
@@ -0,0 +1,15 @@
**Copied from https://github.com/dccmx/tcproxy.git. Used for connection issues simulation.**

tcproxy
=======
tcproxy is a small efficient tcp proxy that can be used for port forwarding or load balancing.

sample usage
------------
tcproxy "11212 -> 11211"
tcproxy "192.168.0.1:11212 -> 192.168.0.2:11211"

not implemented yet
---------------
tcproxy "any:11212 -> rr{192.168.0.100:11211 192.168.0.101:11211 192.168.0.102:11211}"
tcproxy "any:11212 -> hash{192.168.0.100:11211 192.168.0.101:11211 192.168.0.102:11211}"
10 changes: 10 additions & 0 deletions tcproxy/TODO
@@ -0,0 +1,10 @@
failover
tcp pool
thread

tcproxy "any:11212\
<-> rr{\
192.168.0.100:11211 -> 192.168.0.100:11212\
192.168.0.101:11211\
192.168.0.102:11211}\
-> 192.168.0.103:11211"
37 changes: 37 additions & 0 deletions tcproxy/src/Makefile
@@ -0,0 +1,37 @@
PROGNAME = tcproxy

OBJS = tcproxy.o ae.o util.o policy.o zmalloc.o anet.o

CFLAGS_GEN = -Wall -Werror -g $(CFLAGS)
CFLAGS_DBG = -ggdb $(CFLAGS_GEN)
CFLAGS_OPT = -O3 -Wno-format $(CFLAGS_GEN)
DEBUG ?=

CCCOLOR="\033[34m"
LINKCOLOR="\033[34;1m"
SRCCOLOR="\033[33m"
BINCOLOR="\033[37;1m"
MAKECOLOR="\033[32;1m"
ENDCOLOR="\033[0m"

QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR);
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);

LDFLAGS +=
LIBS +=

all: $(PROGNAME)

%.o: %.c
$(QUIET_CC)$(CC) -c $(CFLAGS) $(CFLAGS_OPT) $(DEBUG) $(COMPILE_TIME) $<

$(PROGNAME): $(OBJS)
$(QUIET_LINK)$(CC) -o $(PROGNAME) $(CFLAGS_OPT) $(DEBUG) $(OBJS) $(CCLINK)
@echo
@echo "Make Complete. Read README for how to use."
@echo
@echo "Having problems with it? Send complains and bugs to dccmx@dccmx.com"
@echo

clean:
rm -f $(PROGNAME) core core.[1-9][0-9]* *.o memcheck.out callgrind.out.[1-9][0-9]* massif.out.[1-9][0-9]*

0 comments on commit d209d00

Please sign in to comment.