Skip to content

Commit

Permalink
fix thread leak, fix sqlite errors on shutdown, add tests
Browse files Browse the repository at this point in the history
This fixes a major thread leak, by ensuring that the ConnectionPool's
close() method always gets called (fixes buildbot#722).  That method was
triggering spurious errors from sqlite, but passing
check_same_thread=False fixes that (and fixes buildbot#714).  This also
represents further work on buildbot#725 by adding new, well-isolated tests
  • Loading branch information
Dustin J. Mitchell committed Feb 21, 2010
1 parent 0f1da80 commit 5d41eef
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
28 changes: 22 additions & 6 deletions buildbot/db.py
Expand Up @@ -390,6 +390,7 @@ def create_or_upgrade_db(spec):
pass
# so here we've got a pre-existing database, of unknown version
db = DBConnector(spec)
db.start()
ver = db.get_version()
# this will eventually have a structure like follows:
#if ver == 1:
Expand All @@ -407,6 +408,7 @@ def create_or_upgrade_db(spec):
def open_db(spec):
# this will only open a pre-existing database of the current version
db = DBConnector(spec)
db.start()
ver = db.get_version()
if ver is None:
raise DatabaseNotReadyError("cannot use empty database")
Expand Down Expand Up @@ -467,6 +469,12 @@ def __init__(self, spec):
connkw = spec.connkw.copy()
connkw["cp_reconnect"] = True
connkw["cp_noisy"] = True
if 'sqlite' in spec.dbapiName:
# This disables sqlite's obsessive checks that a given connection is
# only used in one thread; this is justified by the Twisted ticket
# regarding the errors you get on connection shutdown if you do *not*
# add this parameter: http://twistedmatrix.com/trac/ticket/3629
connkw['check_same_thread'] = False
#connkw["cp_min"] = connkw["cp_max"] = 1
log.msg("creating database connector: %s %s %s" % \
(spec.dbapiName, spec.connargs, connkw))
Expand All @@ -486,19 +494,23 @@ def __init__(self, spec):

self._pending_operation_count = 0

self._started = False

def getCurrentTime(self):
return time.time()

def start(self):
# this only needs to be called from non-reactor contexts, like
# runner.upgradeMaster and other CLI commands
# this only *needs* to be called in reactorless environments (which
# should be eliminated anyway). but it doesn't hurt anyway
self._pool.start()
self._started = True

def stop(self):
"""Call this when you're done with me"""
# this only needs to be called from non-reactor contexts, like
# runner.upgradeMaster and other CLI commands
if not self._started:
return
self._pool.close()
self._started = False
del self._pool

def quoteq(self, query):
Expand Down Expand Up @@ -530,6 +542,7 @@ def get_version(self):

def runQueryNow(self, *args, **kwargs):
# synchronous+blocking version of runQuery()
assert self._started
return self.runInteractionNow(self._runQuery, *args, **kwargs)

def _runQuery(self, c, *args, **kwargs):
Expand All @@ -556,6 +569,7 @@ def _end_operation(self, t):

def runInteractionNow(self, interaction, *args, **kwargs):
# synchronous+blocking version of runInteraction()
assert self._started
start = self.getCurrentTime()
t = self._start_operation()
try:
Expand Down Expand Up @@ -605,11 +619,12 @@ def subscribe_to(self, category, observer):
self._subscribers[category].add(observer)

def runQuery(self, *args, **kwargs):
assert self._started
self._pending_operation_count += 1
start = self.getCurrentTime()
t = self._start_operation()
#t = self._start_operation()
d = self._pool.runQuery(*args, **kwargs)
d.addBoth(self._runQuery_done, start, t)
#d.addBoth(self._runQuery_done, start, t)
return d
def _runQuery_done(self, res, start, t):
self._end_operation(t)
Expand All @@ -624,6 +639,7 @@ def _add_query_time(self, start):
self._query_times.popleft()

def runInteraction(self, *args, **kwargs):
assert self._started
self._pending_operation_count += 1
start = self.getCurrentTime()
t = self._start_operation()
Expand Down
6 changes: 3 additions & 3 deletions buildbot/master.py
Expand Up @@ -394,7 +394,7 @@ class BuildMaster(service.MultiService):
change_svc = None
properties = Properties()

def __init__(self, basedir, configFileName="master.cfg", db=None):
def __init__(self, basedir, configFileName="master.cfg", db_spec=None):
service.MultiService.__init__(self)
self.setName("buildmaster")
self.basedir = basedir
Expand Down Expand Up @@ -439,8 +439,8 @@ def __init__(self, basedir, configFileName="master.cfg", db=None):
self.db = None
self.db_url = None
self.db_poll_interval = _Unset
if db:
self.loadDatabase(db)
if db_spec:
self.loadDatabase(db_spec)

self.readConfig = False

Expand Down
58 changes: 58 additions & 0 deletions buildbot/test/unit/test_db.py
@@ -1,4 +1,5 @@
import os
import threading

from zope.interface import implements
from twisted.trial import unittest
Expand Down Expand Up @@ -124,3 +125,60 @@ def test_fromURL_mysqlAuthNoPassPortArgs(self):
self.failUnlessConnection(d, 'MySQLdb',
connkw=dict(host='somehost.com', db='dbname', user="user",
port=8000, foo="moo"))

class DBConnector_Basic(unittest.TestCase):
"""
Basic tests of the DBConnector class - all start with an empty DB
"""

def setUp(self):
# use an in-memory sqlite database to test
self.dbc = db.DBConnector(db.DBSpec.from_url("sqlite://"))
self.dbc.start()

def tearDown(self):
self.dbc.stop()

# TODO: why is this method here??
def test_getCurrentTime(self):
self.assertTrue(self.dbc.getCurrentTime() > 0)

def test_quoteq_identity(self):
# FYI, sqlite uses qmark, so this test is somewhat tautalogical
assert self.dbc.paramstyle == "qmark"
self.assertEqual(
self.dbc.quoteq("SELECT * from developers where name='?'"),
"SELECT * from developers where name='?'")

def test_runQueryNow_simple(self):
self.assertEqual(self.dbc.runQueryNow("SELECT 1"),
[(1,)])

def test_runQueryNow_exception(self):
self.assertRaises(Exception, lambda :
self.dbc.runQueryNow("EAT * FROM cookies"))

def test_runInterationNow_simple(self):
def inter(cursor, *args, **kwargs):
self.assertEqual(cursor.execute("SELECT 1").fetchall(),
[(1,)])
self.dbc.runInteractionNow(inter)

def test_runInterationNow_args(self):
def inter(cursor, *args, **kwargs):
self.assertEqual((args, kwargs), ((1, 2), dict(three=4)))
cursor.execute("SELECT 1")
self.dbc.runInteractionNow(inter, 1, 2, three=4)

def test_runInterationNow_exception(self):
def inter(cursor):
cursor.execute("GET * WHERE golden")
self.assertRaises(Exception, lambda :
self.dbc.runInteractionNow(inter))

def test_runQuery_simple(self):
d = self.dbc.runQuery("SELECT 1")
def cb(res):
self.assertEqual(res, [(1,)])
d.addCallback(cb)
return d

0 comments on commit 5d41eef

Please sign in to comment.