Skip to content

Commit

Permalink
refactor db.state connector to have a synchronous api
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Tardy <pierre.tardy@intel.com>
  • Loading branch information
Pierre Tardy committed Feb 19, 2015
1 parent 20d2dc0 commit a3c21f3
Showing 1 changed file with 96 additions and 88 deletions.
184 changes: 96 additions & 88 deletions master/buildbot/db/state.py
Expand Up @@ -33,123 +33,131 @@ class StateConnectorComponent(base.DBConnectorComponent):

def getObjectId(self, name, class_name):
# defer to a cached method that only takes one parameter (a tuple)
return self._getObjectId((name, class_name)
).addCallback(lambda objdict: objdict['id'])
d = self._getObjectId((name, class_name))
d.addCallback(lambda objdict: objdict['id'])
return d

@base.cached('objectids')
def _getObjectId(self, name_class_name_tuple):
name, class_name = name_class_name_tuple

def thd(conn):
objects_tbl = self.db.model.objects

self.checkLength(objects_tbl.c.name, name)
self.checkLength(objects_tbl.c.class_name, class_name)

def select():
q = sa.select([objects_tbl.c.id],
whereclause=((objects_tbl.c.name == name)
& (objects_tbl.c.class_name == class_name)))
res = conn.execute(q)
row = res.fetchone()
res.close()
if not row:
raise _IdNotFoundError
return row.id

def insert():
res = conn.execute(objects_tbl.insert(),
name=name,
class_name=class_name)
return res.inserted_primary_key[0]

# we want to try selecting, then inserting, but if the insert fails
# then try selecting again. We include an invocation of a hook
# method to allow tests to exercise this particular behavior
try:
return ObjDict(id=select())
except _IdNotFoundError:
pass

self._test_timing_hook(conn)

try:
return ObjDict(id=insert())
except (sqlalchemy.exc.IntegrityError,
sqlalchemy.exc.ProgrammingError):
pass
return self.thdGetObjectId(conn, name, class_name)
return self.db.pool.do(thd)

def thdGetObjectId(self, conn, name, class_name):
objects_tbl = self.db.model.objects

self.checkLength(objects_tbl.c.name, name)
self.checkLength(objects_tbl.c.class_name, class_name)

def select():
q = sa.select([objects_tbl.c.id],
whereclause=((objects_tbl.c.name == name)
& (objects_tbl.c.class_name == class_name)))
res = conn.execute(q)
row = res.fetchone()
res.close()
if not row:
raise _IdNotFoundError
return row.id

def insert():
res = conn.execute(objects_tbl.insert(),
name=name,
class_name=class_name)
return res.inserted_primary_key[0]

# we want to try selecting, then inserting, but if the insert fails
# then try selecting again. We include an invocation of a hook
# method to allow tests to exercise this particular behavior
try:
return ObjDict(id=select())
except _IdNotFoundError:
pass

return self.db.pool.do(thd)
self._test_timing_hook(conn)

try:
return ObjDict(id=insert())
except (sqlalchemy.exc.IntegrityError,
sqlalchemy.exc.ProgrammingError):
pass

return ObjDict(id=select())

class Thunk:
pass

def getState(self, objectid, name, default=Thunk):
def thd(conn):
object_state_tbl = self.db.model.object_state

q = sa.select([object_state_tbl.c.value_json],
whereclause=((object_state_tbl.c.objectid == objectid)
& (object_state_tbl.c.name == name)))
res = conn.execute(q)
row = res.fetchone()
res.close()

if not row:
if default is self.Thunk:
raise KeyError("no such state value '%s' for object %d" %
(name, objectid))
return default
try:
return json.loads(row.value_json)
except ValueError:
raise TypeError("JSON error loading state value '%s' for %d" %
(name, objectid))
return self.thdGetState(conn, objectid, name, default=default)
return self.db.pool.do(thd)

def thdGetState(self, conn, objectid, name, default=Thunk):
object_state_tbl = self.db.model.object_state

q = sa.select([object_state_tbl.c.value_json],
whereclause=((object_state_tbl.c.objectid == objectid)
& (object_state_tbl.c.name == name)))
res = conn.execute(q)
row = res.fetchone()
res.close()

if not row:
if default is self.Thunk:
raise KeyError("no such state value '%s' for object %d" %
(name, objectid))
return default
try:
return json.loads(row.value_json)
except ValueError:
raise TypeError("JSON error loading state value '%s' for %d" %
(name, objectid))

def setState(self, objectid, name, value):
def thd(conn):
object_state_tbl = self.db.model.object_state
return self.thdSetState(conn, objectid, name, value)
return self.db.pool.do(thd)

try:
value_json = json.dumps(value)
except (TypeError, ValueError):
raise TypeError("Error encoding JSON for %r" % (value,))
def thdSetState(self, conn, objectid, name, value):
object_state_tbl = self.db.model.object_state

self.checkLength(object_state_tbl.c.name, name)
try:
value_json = json.dumps(value)
except (TypeError, ValueError):
raise TypeError("Error encoding JSON for %r" % (value,))

def update():
q = object_state_tbl.update(
whereclause=((object_state_tbl.c.objectid == objectid)
& (object_state_tbl.c.name == name)))
res = conn.execute(q, value_json=value_json)
self.checkLength(object_state_tbl.c.name, name)

# check whether that worked
return res.rowcount > 0
def update():
q = object_state_tbl.update(
whereclause=((object_state_tbl.c.objectid == objectid)
& (object_state_tbl.c.name == name)))
res = conn.execute(q, value_json=value_json)

def insert():
conn.execute(object_state_tbl.insert(),
objectid=objectid,
name=name,
value_json=value_json)
# check whether that worked
return res.rowcount > 0

# try updating; if that fails, try inserting; if that fails, then
# we raced with another instance to insert, so let that instance
# win.
def insert():
conn.execute(object_state_tbl.insert(),
objectid=objectid,
name=name,
value_json=value_json)

if update():
return
# try updating; if that fails, try inserting; if that fails, then
# we raced with another instance to insert, so let that instance
# win.

self._test_timing_hook(conn)
if update():
return

try:
insert()
except (sqlalchemy.exc.IntegrityError, sqlalchemy.exc.ProgrammingError):
pass # someone beat us to it - oh well
self._test_timing_hook(conn)

return self.db.pool.do(thd)
try:
insert()
except (sqlalchemy.exc.IntegrityError, sqlalchemy.exc.ProgrammingError):
pass # someone beat us to it - oh well

def _test_timing_hook(self, conn):
# called so tests can simulate another process inserting a database row
Expand Down

0 comments on commit a3c21f3

Please sign in to comment.