Permalink
Browse files

Export the S3 analysis as prometheus metrics

  • Loading branch information...
hmalphettes committed Nov 27, 2017
1 parent 12d27df commit 37530227157570fb69fa20dabd791fc09a426335
Showing with 133 additions and 15 deletions.
  1. +1 −0 .gitignore
  2. +7 −7 README.rst
  3. 0 integration/prometheus.yml
  4. +2 −1 requirements.txt
  5. +78 −3 s3_storage_analyser.py
  6. +18 −1 server.py
  7. +27 −3 test_s3_storage_analyzer.py
View
@@ -5,3 +5,4 @@ __pycache__
*.pyc
.env
.coverage*
*.prom
View
@@ -8,10 +8,7 @@
S3 Storage Analyser
===================
A command line tool to display the objects stored in your AWS S3 account.
WIP: Prometheus client
======================
Expose the numbers as metrics for Prometheus
Exposes the metrics extracted for Prometheus under the `/metrics` endpoint.
Strategy: Use Cloudwatch metrics
================================
@@ -27,6 +24,8 @@ Strategy: Use Cloudwatch metrics
As a starting point this implementation uses the Cloudwatch metrics.
Prior art: https://www.opsdash.com/blog/aws-s3-cloudwatch-monitoring.html
Development
-----------
Requirement: python3
@@ -120,6 +119,10 @@ Via docker:
docker run -e TOKEN=secret --name s3analyser_endpoint --net host -d hmalphettes/s3-storage-analyser server
Usage Prometheus
----------------
Continuous Integration - Continuous Delivery
--------------------------------------------
The CI is graciously operated by Travis: https://travis-ci.org/hmalphettes/s3-storage-analyser
@@ -138,12 +141,9 @@ The run logs are sent as a notification to a slack channel:
The setup of such an infra is currently not automated. Some documentation here: https://github.com/hmalphettes/s3-storage-analyser/tree/master/integration
TODO: Commit the output into a github repository to monitor the state of the build as well as the evolution of the content of the buckets.
Next steps
----------
- Enrich the statistics displayed
- Prometheus exporter
License
-------
No changes.
View
@@ -1,3 +1,4 @@
boto3>=1.4.7
tabulate>=0.8.1
pytz
pytz
prometheus_client
View
@@ -3,6 +3,7 @@
"""
import argparse
import os
import re
import json
import multiprocessing as multi
@@ -12,14 +13,15 @@
import pytz
import boto3
import tabulate
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway, write_to_textfile
def parse_args(args=None):
"""cli parser"""
parser = argparse.ArgumentParser(description='Analyse the S3 Buckets of an Amazon AWS account.')
parser.add_argument('--unit', # type='string',
choices=['B', 'KB', 'MB', 'GB', 'TB'],
help='file size unit B|KB|MB|GB|TB', default='MB')
parser.add_argument('--prefix', help='Only select buckets that match a glob pattern. "s3://mybucke*"')
parser.add_argument('--prefix', help='Only select buckets that match a glob. "s3://mybucke*"')
parser.add_argument('--conc', type=int, help='Number of parallel workers')
parser.add_argument(
'--fmt', # type='string',
@@ -50,7 +52,37 @@ def _conc_map(fct, iterable):
__POOL[0] = pool
return pool.map(fct, iterable)
"""
Prometheus Gauges:
Objects:
_size_bytes
*region (cardinality: 16)
*bucket (cardinality: < 1000)
_files_total
*region (cardinality: 16)
*storage (cardinality: 3)
*bucket (cardinality: < 1000 ?)
Hence number of timeseries < 16*3*1000 + 16*1000 = 64k
This number is perfectly fine with Prometheus
"""
_OBJECT_GAUGE_SIZE_LABELS = ['region', 'storage', 'bucket']
_OBJECT_GAUGE_NUMBER_LABELS = ['region', 'bucket']
OBJECT_GAUGES = {}
REGISTRY = [None]
def _set_object_gauge(name, value, **kwargs):
"""Set the value of a gauge; be careful to only do this from a single
thread and to push to gateway before the thread is over"""
if REGISTRY[0] is None:
REGISTRY[0] = CollectorRegistry()
if name not in OBJECT_GAUGES:
OBJECT_GAUGES[name] = Gauge(
name, 'Number of buckets',
_OBJECT_GAUGE_SIZE_LABELS if 'size' in name else _OBJECT_GAUGE_NUMBER_LABELS,
registry=REGISTRY[0])
OBJECT_GAUGES[name].labels(**kwargs).set(value)
def stop_pool():
"""Stop the pool of sub processes"""
if __POOL[0] is not None:
__POOL[0].close()
__POOL[0] = None
@@ -69,7 +101,7 @@ def _is_glob(prefix):
def list_buckets(prefix=None):
"""Return the list of buckets {'Name','CreationDate'} """
resp = boto3.client('s3').list_buckets(prefix=prefix)
resp = boto3.client('s3').list_buckets()
buckets = resp['Buckets']
if prefix is not None:
bucket_name = _extract_bucket_from_prefix(prefix)
@@ -191,6 +223,8 @@ def get_metric(req):
bucket_name = dimension['Value']
elif dimension['Name'] == 'StorageType':
storage_type = dimension['Value']
# Note: We cant update the gauge from here: this is not in the main process
# and it is a lot easier when everything is in the same process.
return {
'MetricName': req['MetricName'],
'BucketName': bucket_name,
@@ -225,6 +259,47 @@ def fetch_bucket_info(bucket):
msg = err.__str__()
raise ValueError(f'{name} {msg}')
def update_gauges(metrics_data):
"""
Update the gauges from the metrics data:
cloudwatchs3_objects_total region,bucket
cloudwatchs3_size_bytes region,bucket,storage
"""
for data in metrics_data:
bucket = data['BucketName']
region = data['Region']
value = data['Value']
if data['MetricName'] == 'NumberOfObjects':
_set_object_gauge(f'cloudwatch_s3_objects_total', value, region=region, bucket=bucket)
# name = '_size_bytes'
storage_type = data['StorageType']
st_abr = None
if storage_type == 'StandardStorage':
st_abr = 'st'
elif storage_type == 'StandardIAStorage':
st_abr = 'ia'
elif storage_type == 'ReducedRedundancyStorage':
st_abr = 'rr'
else: # AllStorageTypes
# we could store it as a separate timeseries;
# but we can compute it easily on the prom server by doing a sum
continue
_set_object_gauge(f'cloudwatch_s3_size_bytes', value,
region=region, bucket=bucket, storage=st_abr)
commit_gauges()
def get_metrics_prom():
"""Return the path to the metrics.prom file"""
return os.getenv('PROM_TEXT', default='metrics.prom')
def commit_gauges():
"""Either push the gauges to a gatway if PROM_GATEWAY is set
or write them into a file if PROM_TEXT is set"""
if 'PROM_GATEWAY' in os.environ:
push_to_gateway(os.environ['PROM_GATEWAY'], job='s3analyser', registry=REGISTRY[0])
return
write_to_textfile(get_metrics_prom(), REGISTRY[0])
FOLDED_KEYS = {
# MetricName-StorageType -> Folded column name
'NumberOfObjects:AllStorageTypes': 'Files',
@@ -344,7 +419,7 @@ def analyse(prefix=None, unit='MB', conc=None, fmt='plain'):
buckets = list_buckets(prefix=prefix)
metrics = list_metrics(buckets, prefix=prefix)
metrics_data = get_metrics_data(metrics, buckets)
# pprint(metrics_data)
update_gauges(metrics_data)
folded = fold_metrics_data(metrics_data)
if fmt == 'json' or fmt == 'json_pretty':
return _json_dumps(folded['bybucket'], pretty=True if fmt == 'json_pretty' else False)
View
@@ -6,7 +6,7 @@
import threading
import os
from s3_storage_analyser import analyse, parse_args, stop_pool
from s3_storage_analyser import analyse, parse_args, stop_pool, get_metrics_prom
# Run a single analysis at a time
LOCK_ANALYSIS = threading.Lock()
@@ -24,6 +24,17 @@ def do_GET(self):
self.end_headers()
return
if self.path.startswith('/metrics'):
metrics_prom = get_metrics_prom()
self.send_response(200)
self.send_header('Content-type','text/plain')
self.end_headers()
if os.path.exists(metrics_prom):
file = open(metrics_prom, 'rb')
self.wfile.write(file.read())
file.close()
return
token = os.environ['TOKEN']
query_components = {
@@ -76,6 +87,12 @@ def do_GET(self):
self.wfile.write(err.__str__().encode())
return
def log_request(self, code='-', size='-'):
pass
def log_error(self, format, *args):
self.log_message(format, *args)
def _run_analysis(unit=None, prefix=None, conc='6', fmt=None, echo=False):
if not LOCK_ANALYSIS.acquire(False):
raise ValueError('There is already an analysis running')
@@ -11,8 +11,8 @@
from contextlib import redirect_stdout
from s3_storage_analyser import (
list_buckets, fold_metrics_data, convert_bytes,
main, list_metrics, get_metrics_data, _today)
list_buckets, fold_metrics_data, convert_bytes, update_gauges,
main, list_metrics, get_metrics_data, _today, get_metrics_prom)
import s3_storage_analyser
import server
@@ -159,6 +159,17 @@ def test_fold_metrics_data(monkeypatch):
assert value['Bytes-RR'] == 0
assert value['Bytes-IA'] == 0
@mock_cloudwatch
@mock_s3
# @pytest.mark.skip(reason="moto does not support get_metric_statistics")
def test_set_gauges(monkeypatch):
"""Test setting the prometheus gaugaes from the datapoints"""
_setup(monkeypatch)
buckets = list_buckets()
metrics = list_metrics(buckets)
datapoints = get_metrics_data(metrics, buckets)
update_gauges(datapoints)
def _call_main(args_str):
sio = StringIO()
with redirect_stdout(sio):
@@ -249,8 +260,11 @@ def test_main_wrong_prefix(monkeypatch):
return
raise Exception('No ValueError was raised although the prefix was wrong')
def _test_server(monkeypatch, accept=None, method='GET', full_path=None, query_string=None, port=None, status_code=200):
def _test_server(monkeypatch, accept=None, method='GET', full_path=None,
query_string=None, port=None, status_code=200):
_setup(monkeypatch)
if os.path.exists(get_metrics_prom()):
os.remove(get_metrics_prom())
if port is not None:
os.environ['S3ANALYSER_PORT'] = port.__str__()
else:
@@ -278,6 +292,14 @@ def _test_server(monkeypatch, accept=None, method='GET', full_path=None, query_s
res = conn.getresponse()
assert res.status == status_code
body = res.read().decode()
# Make sure we can get some metrics
conn.request('GET', '/metrics')
metrics = conn.getresponse()
if status_code == 200:
assert metrics.status == 200
if method != 'HEAD':
assert metrics.read().decode()
return body
finally:
if 'S3ANALYSER_PORT' in os.environ:
@@ -307,9 +329,11 @@ def test_server_favicon(monkeypatch):
@mock_s3
def test_server_json(monkeypatch):
"""Test whole server"""
os.environ['PROM_TEXT'] = 'test.prom'
data = _test_server(monkeypatch, query_string='fmt=json&unit=TB&conc=4&prefix=hm.samples', port=9003)
print(data)
assert data.startswith('{"Buckets":[{"Bucket":"hm.samples"')
del os.environ['PROM_TEXT']
@mock_cloudwatch
@mock_s3

0 comments on commit 3753022

Please sign in to comment.