Python bindings for the AppOptics API
Switch branches/tags
Nothing to show
Clone or download
cheempz Merge pull request #5 from drbachler/alerts-missing-keys
Add some missing? keys to alerts
Latest commit 689db89 Oct 31, 2018
Type Name Latest commit message Commit time
Failed to load latest commit information.
sh initial Mar 29, 2018
tests improvements after self-review Jun 27, 2018
.gitignore Initial commit Mar 20, 2018 update README/CHANGELOG to reflect the changes to this library Jun 28, 2018
LICENSE Removed username from connection; updated github repo and LICENSE May 2, 2018
Makefile updating makefile Apr 4, 2018
README Removed username from connection; updated github repo and LICENSE May 2, 2018 Merge pull request #3 from appoptics/ao-8631 Jun 28, 2018 initial Mar 29, 2018 change version to 5.0.0 Jul 3, 2018
tox.ini renaming to appoptics_metrics Apr 4, 2018


A Python wrapper for the AppOptics Metrics API.

Documentation Notes

  • New accounts
    • Refer to master for the latest documentation.


In your shell:

$ easy_install appoptics-metrics


$ pip install appoptics-metrics

From your application or script:

import appoptics_metrics


Assuming you have an AppOptics account, go to your organization settings page and get your API token (long hexadecimal string).

  api = appoptics_metrics.connect('token')

Metric name sanitization

When creating your connection you may choose to provide a sanitization function. This will be applied to any metric name you pass in. For example we provide a sanitization function that will ensure your metrics are legal AppOptics names. This can be set as such:

  api = appoptics_metrics.connect('token', sanitizer=appoptics_metrics.sanitize_metric_name)

By default no sanitization is done.

Basic Usage

To iterate over your metrics:

  for m in api.list_metrics():

or use list_all_metrics() to iterate over all your metrics with transparent pagination.

Let's now create a measurement for a specific metric. The metric will be created if it does not exist:

  api.submit("temperature", 80, tags={"city": "sf"})

There is also an alias which does the same thing (but in a more explicit name):

  api.submit_measurement("temperature", 80, tags={"city": "sf"})

View your metric names:

  for m in api.list_metrics():

To retrieve a metric or measurement:

  # Retrieve metric metadata ONLY
  gauge = api.get("temperature")  # or use the alias: api.get_metric(...) # "temperature"

  # Retrieve measurements from last 15 minutes
  resp = api.get_measurements("temperature", duration=900, resolution=1)
  # {u'name': u'temperature',
  # u'links': [],
  # u'series': [{u'measurements': [
  #   {u'value': 80.0, u'time': 1502917147}
  # ],
  # u'tags': {u'city': u'sf'}}],
  # u'attributes': {u'created_by_ua': u'appoptics_metrics/2.0.0...'
  # , u'aggregate': False}, u'resolution': 1}

To retrieve a composite metric:

  # Get average temperature across all cities for last 8 hours
  compose = 'mean(s("temperature", "*", {function: "mean", period: "3600"}))'
  import time
  start_time = int(time.time()) - 8 * 3600

  # For tag-based (new) accounts.
  # Will be deprecated in favor of `get_composite` in a future tags-only release
  resp = api.get_composite_tagged(compose, start_time=start_time)
  # [
  #   {
  #     u'query': {u'metric': u'temperature', u'tags': {}},
  #     u'metric': {u'attributes': {u'created_by_ua': u'statsd-librato-backend/0.1.7'},
  #     u'type': u'gauge',
  #     u'name': u'temperature'},
  #     u'measurements': [{u'value': 42.0, u'time': 1504719992}],
  #     u'tags': {u'one': u'1'}}],
  #     u'compose': u's("foo", "*")',
  #     u'resolution': 1
  #   }
  # ]

  # For backward compatibility in legacy Librato (source-based)
  resp = api.get_composite(compose, start_time=start_time)

To create a saved composite metric:

  api.create_composite('composite.humidity', 'sum(s("humidity", "*"))',
      description='a test composite')

Delete a metric:


Aliases are provided for methods manipulating metrics and measurements:

    create(...)     -> create_metric(...)
    submit(...)     -> submit_measurement(...)
    update(...)     -> update_metric(...)
    delete(...)     -> delete_metric(...)
    get(...)        -> get_metric(...)
    get_tagged(...) -> get_measurements(...)

Sending measurements in batch mode

Sending a measurement in a single HTTP request is inefficient. The overhead both at protocol and backend level is very high. That's why we provide an alternative method to submit your measurements. The idea is to send measurements in batch mode. We push measurements that are stored and when we are ready, they will be submitted in an efficient manner. Here is an example:

api = appoptics_metrics.connect('token')
q = api.new_queue()
q.add('temperature', 22.1, tags={'location': 'downstairs'})
q.add('temperature', 23.1, tags={'location': 'upstairs'})

Queues can also be used as context managers. Once the context block is complete the queue is submitted automatically. This is true even if an exception interrupts flow. In the example below if potentially_dangerous_operation causes an exception the queue will submit the first measurement as it was the only one successfully added. If the operation succeeds both measurements will be submitted.

with api.new_queue() as q:
    q.add('temperature', 22.1, tags={'location': 'downstairs'})
    q.add('num_requests', 100, tags={'host': 'server1')

Queues by default will collect metrics until they are told to submit. You may create a queue that autosubmits based on metric volume.

# Submit when the 400th metric is queued
q = api.new_queue(auto_submit_count=400)

Tag Inheritance

Tags can be inherited from the queue or connection object if inherit_tags=True is passed as an attribute. If inherit_tags is not passed, but tags are added to the measurement, the measurement tags will be the only tags added to that measurement.

When there are tag collisions, the measurement, then the batch, then the connection is the order of priority.

api = appoptics_metrics.connect('token', tags={'company': 'librato', 'service': 'app'})

# tags will be {'city': 'sf'}
api.submit('temperature', 80, tags={'city': 'sf'})

# tags will be {'city': 'sf', 'company': 'librato', 'service': 'app'}
api.submit('temperature', 80, tags={'city': 'sf'}, inherit_tags=True)

q = api.new_queue(tags={'service':'api'})

# tags will be {'location': 'downstairs'}
q.add('temperature', 22.1, tags={'location': 'downstairs'})

# tags will be {'company': 'librato', 'service':'api'}
q.add('temperature', 23.1)

# tags will be {'location': 'downstairs', 'company': 'librato', 'service': 'api'}
q.add('temperature', 22.1, tags={'location': 'downstairs'}, inherit_tags=True)

Updating Metric Attributes

You can update the information for a metric by using the update method, for example:

for metric in api.list_metrics(name="abc*"):
  attrs = metric.attributes
  attrs['display_units_short'] = 'ms'
  api.update(, attributes=attrs)


List Annotation all annotation streams:

for stream in api.list_annotation_streams():
print("%s: %s" % (, stream.display_name))

View the metadata on a named annotation stream:

stream = api.get_annotation_stream("api.pushes")
print stream

Retrieve all of the events inside a named annotation stream, by adding a start_time parameter to the get_annotation_stream() call:

for source in
	print source[source]
	for event in events:
		print event['id']
		print event['title']
		print event['description']

Submit a new annotation to a named annotation stream (creates the stream if it doesn't exist). Title is a required parameter, and all other parameters are optional


api.post_annotation("TravisCI",title="build %s"%travisBuildID,
                     description="Application %s, Travis build %s"%(appName,travisBuildID),
                     links=[{'rel': 'travis', 'href': ''}])

Delete a named annotation stream:


Spaces API

List Spaces

# List spaces
spaces = api.list_spaces()

Create a Space

# Create a new Space directly via API
space = api.create_space("space_name")
print("Created '%s'" %

# Create a new Space via the model, passing the connection
space = Space(api, 'Production')

Find a Space

space = api.find_space('Production')

Delete a Space

space = api.create_space('Test')
# or

Create a Chart

# Create a line chart with various metric streams including their tags(s) and group/summary functions:
space = api.get_space(123)
linechart = api.create_chart(
  'cities MD line chart',
      "metric": "appoptics.cpu.percent.idle",
      "tags": [{"name": "environment", "values": ["*"]]
      "metric": "appoptics.cpu.percent.user",
      "tags": [{"name": "environment", 'dynamic': True}]

Find a Chart

# Takes either space_id or a space object
chart = api.get_chart(chart_id, space_id)
chart = api.get_chart(chart_id, space)

Update a Chart

space = api.get_space(123)
charts = space.chart_ids
chart = api.get_chart(charts[0], = 'Your chart name'

Rename a Chart

chart = api.get_chart(chart_id, space_id)
# save() gets called automatically here
chart.rename('new chart name')

Delete a Chart

chart = api.get_chart(chart_id, space_id)


List all alerts:

for alert in api.list_alerts():

Create an alert with an above condition:

alert = api.create_alert('my.alert')
alert.add_condition_for('metric_name').above(1) # trigger immediately
alert.add_condition_for('metric_name').above(1).duration(60) # trigger after a set duration
alert.add_condition_for('metric_name').above(1, 'sum') # custom summary function

Create an alert with a below condition:

alert = api.create_alert('my.alert', description='An alert description')
alert.add_condition_for('metric_name').below(1) # the same syntax as above conditions

Create an alert with an absent condition:

alert = api.create_alert('my.alert')
alert.add_condition_for('metric_name').stops_reporting_for(5) # duration in minutes of the threshold to trigger the alert

View all outbound services for the current user

for service in api.list_services():
    print(service._id, service.title, service.settings)

Create an alert with Service IDs

alert = api.create_alert('my.alert', services=[1234, 5678])

Create an alert with Service objects

s = api.list_services()
alert = api.create_alert('my.alert', services=[s[0], s[1]])

Add an outbound service to an alert:

alert = api.create_alert('my.alert')

Put it all together:

cond = {'metric_name': 'cpu', 'type': 'above', 'threshold': 42}
s = api.list_services()
api.create_alert('my.alert', conditions=[cond], services=[s[0], s[1]])
# We have an issue at the API where conditions and services are not returned
# when creating. So, retrieve back from API
alert = api.get_alert('my.alert')



Timeouts are provided by the underlying http client. By default we timeout at 10 seconds. You can change that by using api.set_timeout(timeout).

Thread Safety

The appoptics-metrics module currently does not do internal locking for thread safety. When used in multi-threaded applications, please add your own thread synchronization for sensitive operations.


Want to contribute? Need a new feature? Please open an issue.


The original version of python-librato was conceived/authored/released by Chris Moyer (AKA @kopertop). He's graciously handed over maintainership of the project to us and we're super-appreciative of his efforts.

Build / publish

python3 bdist_wheel
twine upload dist/*


Copyright (c) 2018 Solarwinds, Inc. See LICENSE for details.