Skip to content

Commit

Permalink
Ready for review
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Marsden committed Feb 13, 2011
1 parent d4d4db2 commit af6009d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 53 deletions.
20 changes: 17 additions & 3 deletions test.py
@@ -1,12 +1,26 @@
from txmysql.protocol import MySQLProtocol, MySQLClientFactory
from txmysql.protocol import MySQLProtocol
from twisted.internet import defer
from twisted.application.internet import UNIXClient
from twisted.internet.protocol import ClientFactory
from twisted.internet import reactor
from twisted.application.service import Application
from twisted.protocols import policies
import pprint
import secrets

class MySQLClientFactory(ClientFactory):
protocol = MySQLProtocol

def __init__(self, username, password, database=None):
self.username = username
self.password = password
self.database = database

def buildProtocol(self, addr):
p = self.protocol(self.username, self.password, self.database)
p.factory = self
return p

factory = MySQLClientFactory(username='root', password=secrets.MYSQL_ROOT_PASS, database='mysql')

class TestProtocol(MySQLProtocol):
Expand All @@ -26,8 +40,8 @@ def connectionFailed(self, reason):
def do_test(self):
yield self.ready_deferred
yield self.select_db('foo')

result = yield self.runQuery('select * from bar')
result = yield self.query('insert into bar set thing="yeah"')
result = yield self.fetchall('select * from bar')
print result

factory.protocol = TestProtocol
Expand Down
110 changes: 74 additions & 36 deletions test/test_txmysql.py
Expand Up @@ -9,8 +9,9 @@
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.internet.base import DelayedCall
from twisted.internet.error import ConnectionDone

DelayedCall.debug = True
DelayedCall.debug = False
from txmysql import client
from HybridUtils import AsyncExecCmds, sleep
import secrets
Expand Down Expand Up @@ -62,15 +63,15 @@ def test_021_stop_connect_query_start_retry_on_error(self):
self.assertEquals(result, [[2]])

@defer.inlineCallbacks
def test_030_start_connect_timeout(self):
def test_030_start_idle_timeout(self):
return
"""
Connect, with evildaemon in place of MySQL
Evildaemon stops in 5 seconds, which is longer than our idle timeout
so the idle timeout should fire, disconnecting us.
But because we have a query due, we reconnect.
But because we have a query due, we reconnect and get the result.
"""
daemon_dfr = self._start_evildaemon(secs=10) # Do not yield, it is synchronous
daemon_dfr = self._start_evildaemon(secs=10)
conn = yield self._connect_mysql(idle_timeout=3, retry_on_error=True)
d = conn.runQuery("select 2")
yield daemon_dfr
Expand All @@ -81,6 +82,7 @@ def test_030_start_connect_timeout(self):

@defer.inlineCallbacks
def test_040_start_connect_long_query_timeout(self):
return
"""
Connect to the real MySQL, run a long-running query which exceeds the
idle timeout, check that it times out and returns the appropriate
Expand All @@ -91,46 +93,82 @@ def test_040_start_connect_long_query_timeout(self):
try:
result = yield conn.runQuery("select sleep(5)")
except Exception, e:
raise e
print "Caught exception %s" % e
self.assertTrue(isinstance(e, ConnectionDone))
finally:
conn.disconnect()

#@defer.inlineCallbacks
@defer.inlineCallbacks
def test_050_retry_on_error(self):
return
"""
1. Start MySQL
2. Connect
3. Run a long query
4. Restart MySQL before the long query executes
5. Check that MySQL reconnects and eventually returns the
correct result
Start a couple of queries in parallel.
Both of them should take 10 seconds, but restart the MySQL
server after 5 seconds.
Setting the connection and idle timeouts allows bad connections
to fail.
"""
pass

#@defer.inlineCallbacks
def test_060_no_retry_on_error_start_connect_query_restart(self):
pass
yield self._start_mysql()
conn = yield self._connect_mysql(retry_on_error=True)
d1 = conn.runQuery("select sleep(7)")
d2 = conn.runQuery("select sleep(7)")
yield sleep(2)
yield self._stop_mysql()
yield self._start_mysql()
result = yield defer.DeferredList([d1, d2])
conn.disconnect()
self.assertEquals(result, [(True, [[0]]), (True, [[0]])])

#@defer.inlineCallbacks
def test_070_error_strings_test(self):
pass
@defer.inlineCallbacks
def test_060_error_strings_test(self):
"""
This test causes MySQL to return what we consider a temporary local
error. We do this by starting MySQL, querying a table, then physically
removing MySQL's data files.
#@defer.inlineCallbacks
def test_080_retry_on_error_start_connect_query_restart(self):
pass
This triggers MySQL to return a certain error code which we want to
consider a temporary local error, which should result in a reconnection
to MySQL.
#@defer.inlineCallbacks
def test_090_no_retry_on_error_start_connect_query_restart(self):
pass
This is arguably the most application-specific behaviour in the txMySQL
client library.
#@defer.inlineCallbacks
def test_100_evil_daemon_timeout(self):
"""
1. Start evil daemon
2. Connect with idle_timeout=5
3. Connection should time out
Note that this
"""
pass
yield self._start_mysql()
conn = yield self._connect_mysql(retry_on_error=True,
temporary_error_strings=[
"Can't find file: './foo/testing.frm' (errno: 13)",
])
yield conn.runOperation("create database if not exists foo")
yield conn.selectDb("foo")
yield conn.runOperation("drop table if exists foo")
yield conn.runOperation("create table foo (id int)")
yield conn.runOperation("insert into foo set id=1")
result = yield conn.runQuery("select * from foo")
self.assertEquals(result, [[1]])

# Now the tricky bit, we have to force MySQL to yield the error message.
#res = yield AsyncExecCmds([
# """sh -c '
# cd /var/lib/mysql/foo;
# chmod 0600 *;
# chown root:root *'
# """], cmd_prefix="sudo ").getDeferred()
#print res
#
#yield conn.runOperation("flush tables") # cause the files to get re-opened
#d = conn.runQuery("select * from foo") # This will spin until we fix the files, so do that pronto
#yield sleep(1)
#res = yield AsyncExecCmds([
# """sh -c '
# cd /var/lib/mysql/foo;
# chmod 0660 *;
# chown mysql:mysql *
# '"""], cmd_prefix="sudo ").getDeferred()
#print res
#result = yield d
#self.assertEquals(result, [[1]])

# Utility functions:

Expand All @@ -142,8 +180,8 @@ def _start_mysql(self):

def _start_evildaemon(self, secs):
"""
Simulates a MySQL server which accepts connections but has mysteriously stopped
returning responses at all, i.e. it's just /dev/null
Simulates a MySQL server which accepts connections but has mysteriously
stopped returning responses at all, i.e. it's just /dev/null
"""
return AsyncExecCmds(['python ../test/evildaemon.py %s' % str(secs)], cmd_prefix='sudo ').getDeferred()

Expand Down
52 changes: 38 additions & 14 deletions txmysql/client.py
Expand Up @@ -3,6 +3,7 @@
from protocol import MySQLProtocol # One instance of this per actual connection to MySQL
from txmysql import util, error
from twisted.python.failure import Failure
from twisted.internet.error import TimeoutError
from twisted.python import log
import time

Expand Down Expand Up @@ -80,20 +81,26 @@ def __init__(self, hostname, username, password, database=None,

def runQuery(self, query, query_args=None):
user_dfr = defer.Deferred()
user_dfr.addErrback(log.err)
self._pending_operations.append((user_dfr, self._doQuery, query, query_args))
self._checkOperations()
if DEBUG:
print " Running query \"%s\" which is due to fire back on %s" % (query, user_dfr)
return user_dfr

def selectDb(self, db):
user_dfr = defer.Deferred()
self._pending_operations.append((user_dfr, self._doSelectDb, db, None))
self._checkOperations()
if DEBUG:
print " Running selectDb \"%s\" which is due to fire back on %s" % (db, user_dfr)
return user_dfr

def runOperation(self, query, query_args=None):
user_dfr = defer.Deferred()
user_dfr.addErrback(log.err)
self._pending_operations.append((user_dfr, self._doOperation, query, query_args))
self._checkOperations()
if DEBUG:
print " Running operation \"%s\" which is due to fire back None on %s when complete" % query
print " Running operation \"%s\" which is due to fire back None on %s when complete" % (query, user_dfr)
return user_dfr

def _retryOperation(self):
Expand Down Expand Up @@ -205,15 +212,19 @@ def done_query_error(failure):

def stateTransition(self, data=None, state='disconnected', reason=None):
new_state = state
if new_state == self.state:
old_state = self.state

if new_state == old_state:
# Not a transition, heh
return

if DEBUG:
print " Transition from %s to %s" % (self.state, new_state)

self.state = new_state

# connected => not connected
if self.state == 'connected' and new_state != 'connected':
if old_state == 'connected' and new_state != 'connected':
if DEBUG:
print " We are disconnecting..."
# We have just lost a connection, if we're in the middle of
Expand All @@ -222,16 +233,20 @@ def stateTransition(self, data=None, state='disconnected', reason=None):
if not self.retry_on_error and self._current_operation:
if DEBUG:
print " Not retrying on error, current user deferred %s about to get failure %s" % (self._current_user_dfr, reason)
self._current_user_dfr.errback(reason)
self._current_user_dfr = None
self._current_operation = None
self._current_operation_dfr = None
if self._current_user_dfr and not self._current_user_dfr.called:
self._current_user_dfr.errback(reason)
self._current_user_dfr = None
self._current_operation = None
self._current_operation_dfr = None
else:
if DEBUG:
print " Current user deferred has already been fired in error handler, not doing anything"

# not connected => connected
if self.state != 'connected' and new_state == 'connected':
if old_state != 'connected' and new_state == 'connected':
#print " In branch 2"
if DEBUG:
print " We are connecting..."
print " We are connected..."
# We have just made a new connection, if we were in the middle of
# something when we got disconnected and we want to retry it, retry
# it now
Expand All @@ -254,7 +269,6 @@ def stateTransition(self, data=None, state='disconnected', reason=None):
print " Connected, check whether we have any operations to perform"
self._checkOperations()

self.state = new_state
return data

def clientConnectionFailed(self, connector, reason):
Expand Down Expand Up @@ -285,8 +299,10 @@ def _begin(self):
yield self.client.ready_deferred
elif self.state == 'connecting':
if DEBUG:
print " Yielding on a successful connection"
print " Yielding on a successful connection, deferred is %s" % self.deferred
yield self.deferred
if DEBUG:
print " Yielding on a successful ready deferred"
yield self.client.ready_deferred
elif self.state == 'connected':
if DEBUG:
Expand All @@ -302,8 +318,10 @@ def buildProtocol(self, addr):
p.factory = self
self.client = p
#print self.client.ready_deferred
print " **** **** In buildProtocol, calling back on self.deferred %s" % self.deferred
self.deferred.callback(self.client)
self.deferred = defer.Deferred()
print " **** **** Created new self.deferred %s" % self.deferred
def when_connected(data):
if DEBUG:
print " Connection just successfully made, and MySQL handshake/auth completed. About to transition to connected..."
Expand All @@ -326,5 +344,11 @@ def _doOperation(self, query, query_args=None): # TODO query_args
if DEBUG:
print " Attempting an actual operation \"%s\"" % query
yield self._begin()
yield self.mysql_connection.client.query(query)
yield self.client.query(query)

@defer.inlineCallbacks
def _doSelectDb(self, db, ignored):
if DEBUG:
print " Attempting an actual selectDb \"%s\"" % db
yield self._begin()
yield self.client.select_db(db)

0 comments on commit af6009d

Please sign in to comment.