Skip to content

Commit

Permalink
DataSourceConfig and DataPointConfig classes moved to SQLClient
Browse files Browse the repository at this point in the history
  • Loading branch information
epuzanov committed Mar 30, 2012
1 parent dcdccae commit a5bcd4e
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 255 deletions.
283 changes: 145 additions & 138 deletions ZenPacks/community/SQLDataSource/SQLClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
Gets performance data over python DB-API.
$Id: SQLClient.py,v 3.1 2012/03/28 23:05:12 egor Exp $"""
$Id: SQLClient.py,v 3.2 2012/03/20 19:51:03 egor Exp $"""

__version__ = "$Revision: 3.1 $"[11:-2]
__version__ = "$Revision: 3.2 $"[11:-2]

import logging
log = logging.getLogger("zen.SQLClient")
Expand All @@ -26,6 +26,7 @@
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from twisted.internet import reactor
from twisted.spread import pb

from threading import Timer
import sys
Expand Down Expand Up @@ -61,7 +62,11 @@ def parseConnectionString(cs='', options={}):
kwargs.update(options)
return args, kwargs

def runQuery(txn, sql, columns, dbapi, timeout, _timeout):
def runQuery(txn, sql, columns, dbapi, timeout):
def _timeout(txn):
if hasattr(txn._connection, 'cancel'):
txn._connection.cancel()
txn._cursor.close()
def _convert(val, type):
if val is None:
if type == dbapi.STRING:
Expand Down Expand Up @@ -90,7 +95,7 @@ def _convert(val, type):
t.cancel()
header, ct = zip(*[(h[0].lower(), h[1]) for h in txn.description or []])
if not header: return res
if columns.intersection(set(header)):
if set(columns).intersection(set(header)):
varVal = False
else:
res.append({})
Expand All @@ -117,6 +122,7 @@ def __init__(self, maxParrallel=1):
self._taskQueue = []
self._connection = None
self._cs = None
self._currentTask = None

def setMax(self, max):
self._max = max
Expand All @@ -133,7 +139,7 @@ def running(self):
def queued(self):
return len(self._taskQueue)

def submit(self, cs, name, sql, columns=[], keybindings={}, timeout=20):
def submit(self, task):
"""
submit a query to be executed. A deferred will be returned with the
the result of the query.
Expand All @@ -152,102 +158,141 @@ def submit(self, cs, name, sql, columns=[], keybindings={}, timeout=20):
@type timeout: integer
"""
deferred = Deferred()
deferred.addBoth(self._taskFinished)
for task in self._taskQueue:
if task == name: break
else:
task = adbapiTask(cs, name, sql, timeout)
self._taskQueue.append(task)
task.addTask(deferred, columns, keybindings)
deferred.addBoth(self._taskFinished, task)
task.result = deferred
self._taskQueue.append(task)
reactor.callLater(2, self._runTask)
return deferred

def _runTask(self):
if self._taskQueue and self._running < self._max:
if not self._connection:
task = self._taskQueue.pop(0)
args, kwargs = parseConnectionString(task._cs)
self._running += 1
task = self._taskQueue[0]
if task.connectionString != self._cs or not self._connection:
if self._connection:
self._connection.close()
self._connection = None
args, kwargs = parseConnectionString(task.connectionString)
connection = adbapi.ConnectionPool(*args, **kwargs)
self._cs = str(task._cs)
self._cs = task.connectionString
self._connection = connection
else:
for taskId, task in enumerate(self._taskQueue):
if task._cs == self._cs: break
else:
taskId = 0
self._taskQueue[taskId]
args, kwargs = parseConnectionString(task._cs)
connection = adbapi.ConnectionPool(*args, **kwargs)
self._cs = str(task._cs)
self._connection = connection
task = self._taskQueue.pop(taskId)
self._running += len(task._deferreds)
task(self._connection)
self._currentTask = hash((task.sqlp, str(task.columns)))
deferred = self._connection.runInteraction( runQuery, task.sqlp,
task.columns, self._connection.dbapi, task.timeout)
deferred.addBoth(self._finished)
reactor.callLater(0, self._runTask)

def _taskFinished(self, result):
def _finished(self, results):
nextTask = 0
if isinstance(results, Failure):
results.cleanFailure()
for i in reversed(range(len(self._taskQueue))):
if self._cs != self._taskQueue[i].connectionString: continue
if self._currentTask != hash((self._taskQueue[i].sqlp,
str(self._taskQueue[i].columns))):
nextTask = i
continue
if nextTask > 0: nextTask -= 1
task = self._taskQueue.pop(i)
if isinstance(results, Failure):
task.result.errback(results.getErrorMessage())
continue
if task.keybindings:
kc, kv = zip(*[map(lambda v: str(v).strip().lower(), k) \
for k in task.keybindings.iteritems()])
kv = ''.join(kv)
else: kc, kv = (), ''
result = []
for row in results:
if kv==''.join([(row.get(k) or '').strip() for k in kc]).lower():
result.append(row)
task.result.callback(result)
if self._taskQueue:
if nextTask > 0:
self._taskQueue.insert(0, self._taskQueue.pop(nextTask))

def _taskFinished(self, result, task):
if self._running > 0:
self._running -= 1
if not self._taskQueue and self._running < 1:
self._connection.close()
self._connection = None
self._cs = None
self._currentTask = None
reactor.callLater(0, self._runTask)
return result
task.result = result
return task

class DataPointConfig(pb.Copyable, pb.RemoteCopy):
id = ''
component = ''
alias = ''
expr = ''
rrdPath = ''
rrdType = None
rrdCreateCommand = ''
rrdMin = None
rrdMax = None

def __init__(self, id='', alias=''):
self.id = id
self.alias = alias

def __repr__(self):
return ':'.join((self.id, self.alias))

pb.setUnjellyableForClass(DataPointConfig, DataPointConfig)

class adbapiTask(object):

class DataSourceConfig(pb.Copyable, pb.RemoteCopy):
"""
Used by adbapiExecutor to execute queued query
Holds the config of every query to be run
"""
def __init__(self, cs, name, sql, timeout):
self._cs = cs
self._name = name
self._sql = sql
self._columns = set()
self._timeout = timeout
self._deferreds = []

def __cmp__(self, other):
return cmp(self._name, other)
device = ''
sql = ''
sqlp = ''
connectionString = ''
keybindings = None
ds = ''
cycleTime = None
eventClass = None
eventKey = None
severity = 3
lastStart = 0
lastStop = 0
timeout = 20
result = None

def __init__(self, sqlp='', kbs={}, cs='', columns={}, sql=''):
self.sqlp = sqlp
self.keybindings = kbs
self.connectionString = cs
self.points=[DataPointConfig(v,k.lower()) for k,v in columns.iteritems()]
if not sql:
self.sql = sqlp

def __repr__(self):
return self._name

def __call__(self, pool):
def _timeout(txn):
if hasattr(txn._connection, 'cancel'):
txn._connection.cancel()
txn._cursor.close()
if len(self._deferreds) > 1:
self._sql = self._name
deferred = pool.runInteraction( runQuery, self._sql, self._columns,
pool.dbapi, self._timeout, _timeout)
deferred.addCallbacks(self._finished, self._error)
return self.sqlp

def _finished(self, results):
for deferred, kc, kv in self._deferreds:
if not kc:
deferred.callback(results)
continue
result = []
for row in results:
if kv==''.join([(row.get(k) or '').strip() for k in kc]).lower():
result.append(row)
deferred.callback(result)
@property
def columns(self):
return [dp.alias for dp in self.points]

def _error(self, results):
results.cleanFailure()
for deferred, kc, kv in self._deferreds:
deferred.errback(results.getErrorMessage())
def getEventKey(self, point):
# fetch datapoint name from filename path and add it to the event key
return self.eventKey + '|' + point.rrdPath.split('/')[-1]

def addTask(self, deferred, columns=[], keybindings={}):
if columns:
self._columns.update(map(lambda v: v.lower(), columns))
if keybindings:
kc, kv = zip(*[map(lambda v: str(v).strip().lower(), k) \
for k in keybindings.iteritems()])
else: kc, kv = (), ()
self._deferreds.append((deferred, kc, ''.join(kv)))
def queryKey(self):
"Provide a value that establishes the uniqueness of this query"
return '%'.join(map(str,[self.cycleTime, self.severity,
self.connectionString, self.sql]))
def __str__(self):
return ' '.join(map(str, [
self.ds,
self.cycleTime,
]))

pb.setUnjellyableForClass(DataSourceConfig, DataSourceConfig)


class SQLClient(BaseClient):
Expand Down Expand Up @@ -299,17 +344,11 @@ def _finished(r):
return results
deferreds = []
for table, task in queries.iteritems():
if len(task) == 4:
sqlp, kbs, cs, columns = task
sql = sqlp
else:
sqlp, kbs, cs, columns, sql = task
dbapiName = cs.split(',', 1)[0].strip('\'"')
dsc = DataSourceConfig(*task)
dbapiName = dsc.connectionString.split(',', 1)[0].strip('\'"')
executor = self._pools.setdefault(dbapiName, adbapiExecutor())
deferred = executor.submit(cs, sqlp, sql, columns.values(), kbs)
deferred.addCallback(self.parseResult, plugin, sql, columns)
deferred.addErrback(self.parseError, plugin, sql, cs)
deferred.addBoth(self.addResult, table, dbapiName)
deferred = executor.submit(dsc)
deferred.addBoth(self.parseResult, plugin, table, dbapiName)
deferreds.append(deferred)
dl = DeferredList(deferreds)
dl.addCallback(_finished)
Expand All @@ -328,17 +367,11 @@ def run(self):
log.debug("Running collection for plugin %s", plugin.name())
tasks = []
for table, task in plugin.prepareQueries().iteritems():
if len(task) == 4:
sqlp, kbs, cs, columns = task
sql = sqlp
else:
sqlp, kbs, cs, columns, sql = task
dbapiName = cs.split(',', 1)[0].strip('\'"')
dsc = DataSourceConfig(*task)
dbapiName = dsc.connectionString.split(',', 1)[0].strip('\'"')
executor = self._pools.setdefault(dbapiName, adbapiExecutor())
deferred = executor.submit(cs, sqlp, sql, columns.values(), kbs)
deferred.addCallback(self.parseResult, plugin.name(), sql, columns)
deferred.addErrback(self.parseError, plugin.name(), sql, cs)
deferred.addBoth(self.addResult, table, dbapiName)
deferred = executor.submit(dsc)
deferred.addBoth(self.parseResult,plugin.name(),table,dbapiName)
tasks.append(deferred)
tdl = DeferredList(tasks)
deferreds.append(tdl)
Expand All @@ -347,47 +380,7 @@ def run(self):
dl.addBoth(self.collectComplete, None)


def parseResult(self, r, pName, sql, columns):
"""
Twisted deferred callback used to store the
results of the collection run
@param r: result from the collection run
@type r: result
@param pName: Name of performance data collector plugin
@type pName: plugin string
@param sql: SQL query
@type sql: string
@param columns: columns-aliases dictionary
@type columns: dict
"""
log.debug('Results for %s query "%s": %s', pName, sql, str(r))
if columns:
columns = dict(zip(columns.values(), columns.keys()))
r = [dict(zip(columns.values(), tuple([row.get(cn.lower(), '') \
for cn in columns.keys()]))) for row in r]
return r


def parseError(self, r, pName, sql, cs):
"""
Twisted deferred error callback display errors
@param r: result from the collection run
@type r: result
@param pName: Name of performance data collector plugin
@type pName: plugin string
@param sql: SQL query
@type sql: string
@param cs: connection string
@type cs: string
"""
log.debug('ConnectionString: %s', cs)
log.warn('Error in %s query "%s": %s',pName,sql,r.getErrorMessage())
return r


def addResult(self, r, table, dbapiName):
def parseResult(self, datasource, pName, table, dbapiName):
"""
Twisted deferred callback used to store the
results of the collection run
Expand All @@ -402,6 +395,17 @@ def addResult(self, r, table, dbapiName):
if dbapiName in self._pools and self._pools[dbapiName]._running < 1:
self._pools[dbapiName] = None
del self._pools[dbapiName]
if isinstance(datasource.result, Failure):
log.warn('Error in %s query "%s": %s', pName, datasource.sql,
datasource.result.getErrorMessage())
return (table, datasource.result)
log.debug('Results for %s query "%s": %s', pName, datasource.sql,
str(datasource.result))
if datasource.points:
r = [dict([(p.id,row.get(p.alias,'')) for p in datasource.points]) \
for row in datasource.result]
else:
r = datasource.result
return (table, r)


Expand Down Expand Up @@ -551,6 +555,9 @@ def sqlCollect(collector, device, ip, timeout):
results = cl.query({'t':(query, {}, cs, columns)})
except Exception, e:
results = {'t':Failure(e)}
if isinstance(results, Failure):
print results.getErrorMessage()
sys.exit(1)
results = results.get('t', Failure('ERROR:zen.SQLClient:No data received.'))
if isinstance(results, Failure):
print results.getErrorMessage()
Expand Down
Loading

0 comments on commit a5bcd4e

Please sign in to comment.