From af6009d4827841ae8a79cf9f99caa7c050934463 Mon Sep 17 00:00:00 2001 From: Luke Marsden Date: Sun, 13 Feb 2011 14:56:22 -0500 Subject: [PATCH] Ready for review --- test.py | 20 ++++++-- test/test_txmysql.py | 110 +++++++++++++++++++++++++++++-------------- txmysql/client.py | 52 ++++++++++++++------ 3 files changed, 129 insertions(+), 53 deletions(-) diff --git a/test.py b/test.py index f9f5f8d..f5acddb 100644 --- a/test.py +++ b/test.py @@ -1,12 +1,26 @@ -from txmysql.protocol import MySQLProtocol, MySQLClientFactory +from txmysql.protocol import MySQLProtocol from twisted.internet import defer from twisted.application.internet import UNIXClient +from twisted.internet.protocol import ClientFactory from twisted.internet import reactor from twisted.application.service import Application from twisted.protocols import policies import pprint import secrets +class MySQLClientFactory(ClientFactory): + protocol = MySQLProtocol + + def __init__(self, username, password, database=None): + self.username = username + self.password = password + self.database = database + + def buildProtocol(self, addr): + p = self.protocol(self.username, self.password, self.database) + p.factory = self + return p + factory = MySQLClientFactory(username='root', password=secrets.MYSQL_ROOT_PASS, database='mysql') class TestProtocol(MySQLProtocol): @@ -26,8 +40,8 @@ def connectionFailed(self, reason): def do_test(self): yield self.ready_deferred yield self.select_db('foo') - - result = yield self.runQuery('select * from bar') + result = yield self.query('insert into bar set thing="yeah"') + result = yield self.fetchall('select * from bar') print result factory.protocol = TestProtocol diff --git a/test/test_txmysql.py b/test/test_txmysql.py index e36712b..bcf7387 100644 --- a/test/test_txmysql.py +++ b/test/test_txmysql.py @@ -9,8 +9,9 @@ from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.internet.base import DelayedCall +from twisted.internet.error import ConnectionDone -DelayedCall.debug = True +DelayedCall.debug = False from txmysql import client from HybridUtils import AsyncExecCmds, sleep import secrets @@ -62,15 +63,15 @@ def test_021_stop_connect_query_start_retry_on_error(self): self.assertEquals(result, [[2]]) @defer.inlineCallbacks - def test_030_start_connect_timeout(self): + def test_030_start_idle_timeout(self): return """ Connect, with evildaemon in place of MySQL Evildaemon stops in 5 seconds, which is longer than our idle timeout so the idle timeout should fire, disconnecting us. - But because we have a query due, we reconnect. + But because we have a query due, we reconnect and get the result. """ - daemon_dfr = self._start_evildaemon(secs=10) # Do not yield, it is synchronous + daemon_dfr = self._start_evildaemon(secs=10) conn = yield self._connect_mysql(idle_timeout=3, retry_on_error=True) d = conn.runQuery("select 2") yield daemon_dfr @@ -81,6 +82,7 @@ def test_030_start_connect_timeout(self): @defer.inlineCallbacks def test_040_start_connect_long_query_timeout(self): + return """ Connect to the real MySQL, run a long-running query which exceeds the idle timeout, check that it times out and returns the appropriate @@ -91,46 +93,82 @@ def test_040_start_connect_long_query_timeout(self): try: result = yield conn.runQuery("select sleep(5)") except Exception, e: - raise e + print "Caught exception %s" % e + self.assertTrue(isinstance(e, ConnectionDone)) finally: conn.disconnect() - - #@defer.inlineCallbacks + + @defer.inlineCallbacks def test_050_retry_on_error(self): + return """ - 1. Start MySQL - 2. Connect - 3. Run a long query - 4. Restart MySQL before the long query executes - 5. Check that MySQL reconnects and eventually returns the - correct result + Start a couple of queries in parallel. + Both of them should take 10 seconds, but restart the MySQL + server after 5 seconds. + Setting the connection and idle timeouts allows bad connections + to fail. """ - pass - - #@defer.inlineCallbacks - def test_060_no_retry_on_error_start_connect_query_restart(self): - pass + yield self._start_mysql() + conn = yield self._connect_mysql(retry_on_error=True) + d1 = conn.runQuery("select sleep(7)") + d2 = conn.runQuery("select sleep(7)") + yield sleep(2) + yield self._stop_mysql() + yield self._start_mysql() + result = yield defer.DeferredList([d1, d2]) + conn.disconnect() + self.assertEquals(result, [(True, [[0]]), (True, [[0]])]) - #@defer.inlineCallbacks - def test_070_error_strings_test(self): - pass + @defer.inlineCallbacks + def test_060_error_strings_test(self): + """ + This test causes MySQL to return what we consider a temporary local + error. We do this by starting MySQL, querying a table, then physically + removing MySQL's data files. - #@defer.inlineCallbacks - def test_080_retry_on_error_start_connect_query_restart(self): - pass + This triggers MySQL to return a certain error code which we want to + consider a temporary local error, which should result in a reconnection + to MySQL. - #@defer.inlineCallbacks - def test_090_no_retry_on_error_start_connect_query_restart(self): - pass + This is arguably the most application-specific behaviour in the txMySQL + client library. - #@defer.inlineCallbacks - def test_100_evil_daemon_timeout(self): - """ - 1. Start evil daemon - 2. Connect with idle_timeout=5 - 3. Connection should time out + Note that this """ - pass + yield self._start_mysql() + conn = yield self._connect_mysql(retry_on_error=True, + temporary_error_strings=[ + "Can't find file: './foo/testing.frm' (errno: 13)", + ]) + yield conn.runOperation("create database if not exists foo") + yield conn.selectDb("foo") + yield conn.runOperation("drop table if exists foo") + yield conn.runOperation("create table foo (id int)") + yield conn.runOperation("insert into foo set id=1") + result = yield conn.runQuery("select * from foo") + self.assertEquals(result, [[1]]) + + # Now the tricky bit, we have to force MySQL to yield the error message. + #res = yield AsyncExecCmds([ + # """sh -c ' + # cd /var/lib/mysql/foo; + # chmod 0600 *; + # chown root:root *' + # """], cmd_prefix="sudo ").getDeferred() + #print res + # + #yield conn.runOperation("flush tables") # cause the files to get re-opened + #d = conn.runQuery("select * from foo") # This will spin until we fix the files, so do that pronto + #yield sleep(1) + #res = yield AsyncExecCmds([ + # """sh -c ' + # cd /var/lib/mysql/foo; + # chmod 0660 *; + # chown mysql:mysql * + # '"""], cmd_prefix="sudo ").getDeferred() + #print res + #result = yield d + #self.assertEquals(result, [[1]]) # Utility functions: @@ -142,8 +180,8 @@ def _start_mysql(self): def _start_evildaemon(self, secs): """ - Simulates a MySQL server which accepts connections but has mysteriously stopped - returning responses at all, i.e. it's just /dev/null + Simulates a MySQL server which accepts connections but has mysteriously + stopped returning responses at all, i.e. it's just /dev/null """ return AsyncExecCmds(['python ../test/evildaemon.py %s' % str(secs)], cmd_prefix='sudo ').getDeferred() diff --git a/txmysql/client.py b/txmysql/client.py index 0d65c10..1d9c0b6 100644 --- a/txmysql/client.py +++ b/txmysql/client.py @@ -3,6 +3,7 @@ from protocol import MySQLProtocol # One instance of this per actual connection to MySQL from txmysql import util, error from twisted.python.failure import Failure +from twisted.internet.error import TimeoutError from twisted.python import log import time @@ -80,20 +81,26 @@ def __init__(self, hostname, username, password, database=None, def runQuery(self, query, query_args=None): user_dfr = defer.Deferred() - user_dfr.addErrback(log.err) self._pending_operations.append((user_dfr, self._doQuery, query, query_args)) self._checkOperations() if DEBUG: print " Running query \"%s\" which is due to fire back on %s" % (query, user_dfr) return user_dfr + def selectDb(self, db): + user_dfr = defer.Deferred() + self._pending_operations.append((user_dfr, self._doSelectDb, db, None)) + self._checkOperations() + if DEBUG: + print " Running selectDb \"%s\" which is due to fire back on %s" % (db, user_dfr) + return user_dfr + def runOperation(self, query, query_args=None): user_dfr = defer.Deferred() - user_dfr.addErrback(log.err) self._pending_operations.append((user_dfr, self._doOperation, query, query_args)) self._checkOperations() if DEBUG: - print " Running operation \"%s\" which is due to fire back None on %s when complete" % query + print " Running operation \"%s\" which is due to fire back None on %s when complete" % (query, user_dfr) return user_dfr def _retryOperation(self): @@ -205,15 +212,19 @@ def done_query_error(failure): def stateTransition(self, data=None, state='disconnected', reason=None): new_state = state - if new_state == self.state: + old_state = self.state + + if new_state == old_state: # Not a transition, heh return if DEBUG: print " Transition from %s to %s" % (self.state, new_state) + self.state = new_state + # connected => not connected - if self.state == 'connected' and new_state != 'connected': + if old_state == 'connected' and new_state != 'connected': if DEBUG: print " We are disconnecting..." # We have just lost a connection, if we're in the middle of @@ -222,16 +233,20 @@ def stateTransition(self, data=None, state='disconnected', reason=None): if not self.retry_on_error and self._current_operation: if DEBUG: print " Not retrying on error, current user deferred %s about to get failure %s" % (self._current_user_dfr, reason) - self._current_user_dfr.errback(reason) - self._current_user_dfr = None - self._current_operation = None - self._current_operation_dfr = None + if self._current_user_dfr and not self._current_user_dfr.called: + self._current_user_dfr.errback(reason) + self._current_user_dfr = None + self._current_operation = None + self._current_operation_dfr = None + else: + if DEBUG: + print " Current user deferred has already been fired in error handler, not doing anything" # not connected => connected - if self.state != 'connected' and new_state == 'connected': + if old_state != 'connected' and new_state == 'connected': #print " In branch 2" if DEBUG: - print " We are connecting..." + print " We are connected..." # We have just made a new connection, if we were in the middle of # something when we got disconnected and we want to retry it, retry # it now @@ -254,7 +269,6 @@ def stateTransition(self, data=None, state='disconnected', reason=None): print " Connected, check whether we have any operations to perform" self._checkOperations() - self.state = new_state return data def clientConnectionFailed(self, connector, reason): @@ -285,8 +299,10 @@ def _begin(self): yield self.client.ready_deferred elif self.state == 'connecting': if DEBUG: - print " Yielding on a successful connection" + print " Yielding on a successful connection, deferred is %s" % self.deferred yield self.deferred + if DEBUG: + print " Yielding on a successful ready deferred" yield self.client.ready_deferred elif self.state == 'connected': if DEBUG: @@ -302,8 +318,10 @@ def buildProtocol(self, addr): p.factory = self self.client = p #print self.client.ready_deferred + print " **** **** In buildProtocol, calling back on self.deferred %s" % self.deferred self.deferred.callback(self.client) self.deferred = defer.Deferred() + print " **** **** Created new self.deferred %s" % self.deferred def when_connected(data): if DEBUG: print " Connection just successfully made, and MySQL handshake/auth completed. About to transition to connected..." @@ -326,5 +344,11 @@ def _doOperation(self, query, query_args=None): # TODO query_args if DEBUG: print " Attempting an actual operation \"%s\"" % query yield self._begin() - yield self.mysql_connection.client.query(query) + yield self.client.query(query) + @defer.inlineCallbacks + def _doSelectDb(self, db, ignored): + if DEBUG: + print " Attempting an actual selectDb \"%s\"" % db + yield self._begin() + yield self.client.select_db(db)