Skip to content

Commit

Permalink
Merge pull request #141 from kuba--/master
Browse files Browse the repository at this point in the history
Support for InfluxDB v0.9
  • Loading branch information
armon committed Jul 6, 2015
2 parents 8df91df + c87d850 commit 1be9a7f
Showing 1 changed file with 90 additions and 28 deletions.
118 changes: 90 additions & 28 deletions sinks/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,29 @@
# stream_command = python sinks/influxdb.py influxdb.ini INFO
#
# The InfluxDB sink takes an INI format configuration file as a first
# argument and log level as a second argument.
# argument and log level as a second argument.
# The following is an example configuration:
#
# Configuration example:
# ---------------------
#
# [influxdb]
# host = 127.0.0.1
# port = 8086
# database = dbname
# username = root
# password = root
# host = 127.0.0.1
# port = 8086
# database = dbname
# username = root
# password = root
#
#
# version = 0.9
# prefix = statsite
# timeout = 10
#
# Options:
# --------
# - version: InfluxDB version (by default 0.9)
# - prefix: A prefix to add to the keys
# - timeout: The timeout blocking operations (like connection attempts)
# - timeout: The timeout blocking operations (like connection attempts)
# will timeout after that many seconds (if it is not given, the global default timeout setting is used)
###

Expand All @@ -57,11 +60,11 @@ def __init__(self, cfg="/etc/statsite/influxdb.ini", lvl="INFO"):

self.load(cfg)


def load(self, cfg):
"""
Loads configuration from an INI format file.
"""
"""
import ConfigParser
ini = ConfigParser.RawConfigParser()
ini.read(cfg)
Expand Down Expand Up @@ -98,12 +101,63 @@ def load(self, cfg):
self.prefix = None
if ini.has_option(sect, 'prefix'):
self.prefix = ini.get(sect, 'prefix')
self.timeout = None

self.timeout = None
if ini.has_option(sect, 'timeout'):
self.timeout = ini.get(sect, 'timeout')


self.version = '0.9'
if ini.has_option(sect, 'version'):
self.version = ini.get(sect, 'version')


def flush09(self, metrics):
"""
Flushes the metrics provided to InfluxDB v0.9+.
Parameters:
- `metrics` : A list of (key,value,timestamp) tuples.
"""
if not metrics:
return

if self.timeout:
conn = httplib.HTTPConnection(self.host, int(self.port), timeout = int(self.timeout))
else:
conn = httplib.HTTPConnection(self.host, int(self.port))

params = urllib.urlencode({'db': self.database, 'u': self.username, 'p': self.password, 'precision':'s'})
headers = {
'Content-Type': 'application/stream',
'User-Agent': 'statsite',
}

# Construct the output
metrics = [m.split("|") for m in metrics if m]
self.logger.info("Outputting %d metrics" % len(metrics))

# Serialize and send via HTTP API
# InfluxDB uses following regexp "^[a-zA-Z][a-zA-Z0-9._-]*$" to validate table/series names,
# so disallowed characters replace by '.'
body = ''
for k, v, ts in metrics:
if self.prefix:
body += "%s" % re.sub(r'[^a-zA-Z0-9._-]+','.', "%s.%s" % (self.prefix, k))
else:
body += "%s" % re.sub(r'[^a-zA-Z0-9._-]+','.', k)

body += " value=" + v + " " + ts + "\n"

self.logger.debug(body)
conn.request("POST", "/write?%s" % params, body, headers)
try:
res = conn.getresponse()
self.logger.info("%s, %s" %(res.status, res.reason))
except:
self.logger.exception('Failed to send metrics to InfluxDB:', res.status, res.reason)

conn.close()

def flush(self, metrics):
"""
Flushes the metrics provided to InfluxDB.
Expand All @@ -128,47 +182,55 @@ def flush(self, metrics):
# Construct the output
metrics = [m.split("|") for m in metrics if m]
self.logger.info("Outputting %d metrics" % len(metrics))

# Serialize to JSON and send via HTTP API
# InfluxDB uses following regexp "^[a-zA-Z][a-zA-Z0-9._-]*$" to validate table/series names,
# so disallowed characters replace by '.'
if self.prefix:
body = json.dumps([{
"name":"%s" % re.sub(r'[^a-zA-Z0-9._-]+','.', "%s.%s" % (self.prefix, k)),
"columns":["value", "time"],
if self.prefix:
body = json.dumps([{
"name":"%s" % re.sub(r'[^a-zA-Z0-9._-]+','.', "%s.%s" % (self.prefix, k)),
"columns":["value", "time"],
"points":[[float(v), int(ts)]]
} for k, v, ts in metrics])
else:
body = json.dumps([{
"name":"%s" % re.sub(r'[^a-zA-Z0-9._-]+','.', k),
"columns":["value", "time"],
"name":"%s" % re.sub(r'[^a-zA-Z0-9._-]+','.', k),
"columns":["value", "time"],
"points":[[float(v), int(ts)]]
} for k, v, ts in metrics])

self.logger.debug(body)
conn.request("POST", "/db/%s/series?%s" % (self.database, params), body, headers)
try:
conn.request("POST", "/db/%s/series?%s" % (self.database, params), body, headers)
try:
res = conn.getresponse()
self.logger.info("%s, %s" %(res.status, res.reason))
except:
except:
self.logger.exception('Failed to send metrics to InfluxDB:', res.status, res.reason)

conn.close()



def main(metrics, *argv):
def version(v):
parts = [int(x) for x in v.split(".")]
while parts[-1] == 0:
parts.pop()
return parts


def main(metrics, *argv):
# Initialize the logger
logging.basicConfig()
logging.basicConfig()

# Intialize from our arguments
influxdb = InfluxDBStore(*argv[0:])

# Flush
influxdb.flush(metrics.splitlines())
# Flush format depends on InfluxDB version
if cmp(version('0.9'), version(influxdb.version)) < 0:
influxdb.flush(metrics.splitlines())
else:
influxdb.flush09(metrics.splitlines())


if __name__ == "__main__":
# Get all the inputs
main(sys.stdin.read(), *sys.argv[1:])

0 comments on commit 1be9a7f

Please sign in to comment.