From d61030d55fe959311da621957aad028b3aed6fdb Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Wed, 30 Jan 2019 15:28:00 +0000 Subject: [PATCH 1/8] Fix pipeline status badge Some previous status manager refactoring broke the badge end point. I've also changed it from redirecting to the shields.io image, to proxying it. --- datapackage_pipelines/web/server.py | 36 ++++++++++++++++++----------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/datapackage_pipelines/web/server.py b/datapackage_pipelines/web/server.py index 85a78c43..837eb6dc 100644 --- a/datapackage_pipelines/web/server.py +++ b/datapackage_pipelines/web/server.py @@ -1,14 +1,15 @@ import datetime import os - +from io import BytesIO import logging +from copy import deepcopy + import slugify import yaml import mistune -from copy import deepcopy -from flask import Blueprint +import requests -from flask import Flask, render_template, abort, redirect +from flask import Blueprint, Flask, render_template, abort, send_file from flask_cors import CORS from flask_jsonpify import jsonify from flask_basicauth import BasicAuth @@ -114,7 +115,8 @@ def main(): }[pipeline_status.state()], 'ended': datestr(ex.finish_time) if ex else None, 'started': datestr(ex.start_time) if ex else None, - 'last_success': datestr(success_ex.finish_time) if success_ex else None, + 'last_success': + datestr(success_ex.finish_time) if success_ex else None, } statuses.append(pipeline_obj) @@ -181,7 +183,9 @@ def pipeline_raw_api(pipeline_id): "error_log": pipeline_status.errors(), "stats": last_execution.stats if last_execution else None, "success": last_execution.success if last_execution else None, - "last_success": last_successful_execution.finish_time if last_successful_execution else None, + "last_success": + last_successful_execution.finish_time + if last_successful_execution else None, "trigger": last_execution.trigger if last_execution else None, "pipeline": pipeline_status.pipeline_details, @@ -222,15 +226,18 @@ def pipeline_api(field, pipeline_id): @blueprint.route("badge/") def badge(pipeline_id): + '''An individual pipeline status''' if not pipeline_id.startswith('./'): pipeline_id = './' + pipeline_id - pipeline_status = status.get_status(pipeline_id) + pipeline_status = status.get(pipeline_id) if pipeline_status is None: abort(404) - status_text = pipeline_status.get('message') - success = pipeline_status.get('success') + status_text = pipeline_status.state().capitalize() + last_execution = pipeline_status.get_last_execution() + success = last_execution.success if last_execution else None if success is True: - record_count = pipeline_status.get('stats', {}).get('total_row_count') + stats = last_execution.stats if last_execution else None + record_count = stats.get('count_of_rows') if record_count is not None: status_text += ' (%d records)' % record_count status_color = 'brightgreen' @@ -238,9 +245,12 @@ def badge(pipeline_id): status_color = 'red' else: status_color = 'lightgray' - return redirect('https://img.shields.io/badge/{}-{}-{}.svg'.format( - 'pipeline', status_text, status_color - )) + image_url = 'https://img.shields.io/badge/{}-{}-{}.svg' \ + .format('pipeline', status_text, status_color) + r = requests.get(image_url) + buffer_image = BytesIO(r.content) + buffer_image.seek(0) + return send_file(buffer_image, mimetype='image/svg+xml') app = Flask(__name__) From c3fd5331f8a008ab96726963a72c0b398171e856 Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 10:05:34 +0000 Subject: [PATCH 2/8] If no pipeline_id found, still return a badge If no pipeline_id is found, return a badge with 'not found' written in it, rather than returning a 404. Knowing that a pipeline is potentially missing is still useful status. --- datapackage_pipelines/web/server.py | 45 ++++++++++++++++------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/datapackage_pipelines/web/server.py b/datapackage_pipelines/web/server.py index 837eb6dc..a6065157 100644 --- a/datapackage_pipelines/web/server.py +++ b/datapackage_pipelines/web/server.py @@ -224,33 +224,38 @@ def pipeline_api(field, pipeline_id): return jsonify(ret) +def make_badge_response(subject, text, colour): + image_url = 'https://img.shields.io/badge/{}-{}-{}.svg'.format( + subject, text, colour) + r = requests.get(image_url) + buffer_image = BytesIO(r.content) + buffer_image.seek(0) + return send_file(buffer_image, mimetype='image/svg+xml') + + @blueprint.route("badge/") def badge(pipeline_id): '''An individual pipeline status''' if not pipeline_id.startswith('./'): pipeline_id = './' + pipeline_id pipeline_status = status.get(pipeline_id) - if pipeline_status is None: - abort(404) - status_text = pipeline_status.state().capitalize() - last_execution = pipeline_status.get_last_execution() - success = last_execution.success if last_execution else None - if success is True: - stats = last_execution.stats if last_execution else None - record_count = stats.get('count_of_rows') - if record_count is not None: - status_text += ' (%d records)' % record_count - status_color = 'brightgreen' - elif success is False: - status_color = 'red' + + status_color = 'lightgray' + if pipeline_status.pipeline_details: + status_text = pipeline_status.state().capitalize() + last_execution = pipeline_status.get_last_execution() + success = last_execution.success if last_execution else None + if success is True: + stats = last_execution.stats if last_execution else None + record_count = stats.get('count_of_rows') + if record_count is not None: + status_text += ' (%d records)' % record_count + status_color = 'brightgreen' + elif success is False: + status_color = 'red' else: - status_color = 'lightgray' - image_url = 'https://img.shields.io/badge/{}-{}-{}.svg' \ - .format('pipeline', status_text, status_color) - r = requests.get(image_url) - buffer_image = BytesIO(r.content) - buffer_image.seek(0) - return send_file(buffer_image, mimetype='image/svg+xml') + status_text = "not found" + return make_badge_response('pipeline', status_text, status_color) app = Flask(__name__) From dd669b3e11c197eb3296da4f3252a05d18120e2f Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 10:07:44 +0000 Subject: [PATCH 3/8] Pipeline collection status badge A status badge at the /badge/collection/ endpoint. This returns a pipeline collection summary badge, e.g. pipelines | 22 succeeded, 14 failed, 2 invalid, 1 running Pipeline path will filter the pipeline ids it is reporting on. --- datapackage_pipelines/web/server.py | 43 +++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/datapackage_pipelines/web/server.py b/datapackage_pipelines/web/server.py index a6065157..75929c75 100644 --- a/datapackage_pipelines/web/server.py +++ b/datapackage_pipelines/web/server.py @@ -3,6 +3,7 @@ from io import BytesIO import logging from copy import deepcopy +from collections import Counter import slugify import yaml @@ -224,7 +225,7 @@ def pipeline_api(field, pipeline_id): return jsonify(ret) -def make_badge_response(subject, text, colour): +def _make_badge_response(subject, text, colour): image_url = 'https://img.shields.io/badge/{}-{}-{}.svg'.format( subject, text, colour) r = requests.get(image_url) @@ -255,7 +256,45 @@ def badge(pipeline_id): status_color = 'red' else: status_text = "not found" - return make_badge_response('pipeline', status_text, status_color) + return _make_badge_response('pipeline', status_text, status_color) + + +@blueprint.route("badge/collection/") +def badge_collection(pipeline_path): + '''Status badge for a collection of pipelines.''' + all_pipeline_ids = sorted(status.all_pipeline_ids()) + + if not pipeline_path.startswith('./'): + pipeline_path = './' + pipeline_path + + # Filter pipeline ids to only include those that start with pipeline_path. + path_pipeline_ids = \ + [p for p in all_pipeline_ids if p.startswith(pipeline_path)] + + statuses = [] + for pipeline_id in path_pipeline_ids: + pipeline_status = status.get(pipeline_id) + if pipeline_status is None: + abort(404) + status_text = pipeline_status.state().lower() + statuses.append(status_text) + + status_color = 'lightgray' + status_counter = Counter(statuses) + if status_counter: + if len(status_counter) == 1 and status_counter['succeeded'] > 0: + status_color = 'brightgreen' + elif status_counter['failed'] > 0: + status_color = 'red' + elif status_counter['failed'] == 0: + status_color = 'yellow' + status_text = \ + ', '.join(['{} {}'.format(v, k) + for k, v in status_counter.items()]) + else: + status_text = "not found" + + return _make_badge_response('pipelines', status_text, status_color) app = Flask(__name__) From 852162bbdf5d27f5f819c03e638ff2fee70ff824 Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 10:19:42 +0000 Subject: [PATCH 4/8] Make pipeline status text lowercase to fit conventions --- datapackage_pipelines/web/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapackage_pipelines/web/server.py b/datapackage_pipelines/web/server.py index 75929c75..51bde1dc 100644 --- a/datapackage_pipelines/web/server.py +++ b/datapackage_pipelines/web/server.py @@ -243,7 +243,7 @@ def badge(pipeline_id): status_color = 'lightgray' if pipeline_status.pipeline_details: - status_text = pipeline_status.state().capitalize() + status_text = pipeline_status.state().lower() last_execution = pipeline_status.get_last_execution() success = last_execution.success if last_execution else None if success is True: From c9c451344067087c177f96d0db01fdabdbd82e06 Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 10:44:48 +0000 Subject: [PATCH 5/8] Update readme with status badge details. --- README.md | 48 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index a5473838..5741ab0f 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ worldbank-co2-emissions: - run: dump_to_zip parameters: - out-file: co2-emissions-wb.zip + out-file: co2-emissions-wb.zip ``` In this example we see one pipeline called `worldbank-co2-emissions`. Its pipeline consists of 4 steps: @@ -530,7 +530,7 @@ _Parameters_: - `count` - count the number of occurrences of a specific key For this method, specifying `name` is not required. In case it is specified, `count` will count the number of non-null values for that source field. - + - `counters` - count the number of occurrences of distinct values Will return an array of 2-tuples of the form `[value, count-of-value]`. @@ -563,7 +563,7 @@ _Important: the "source" resource **must** appear before the "target" resource i key: ["CC"] fields: population: - name: "census_2015" + name: "census_2015" full: true ``` @@ -719,7 +719,7 @@ Filtering just American and European countries, leaving out countries whose main Duplicate a resource. -`duplicate` accepts the name of a single resource in the datapackage. +`duplicate` accepts the name of a single resource in the datapackage. It will then duplicate it in the output datapackage, with a different name and path. The duplicated resource will appear immediately after its original. @@ -1097,7 +1097,7 @@ _Parameters_: You should provide a `name` and `url` attributes, and other optional attributes as defined in the [spec]([http://specs.frictionlessdata.io/data-packages/#resource-information). -`url` indicates where the data for this resource resides. Later on, when `stream_remote_resources` runs, it will use the `url` (which is stored in the resource in the `dpp:streamedFrom` property) to read the data rows and push them into the pipeline. +`url` indicates where the data for this resource resides. Later on, when `stream_remote_resources` runs, it will use the `url` (which is stored in the resource in the `dpp:streamedFrom` property) to read the data rows and push them into the pipeline. Note that `url` also supports `env://`, which indicates that the resource url should be fetched from the indicated environment variable. This is useful in case you are supplying a string with sensitive information (such as an SQL connection string for streaming from a database table). @@ -1353,8 +1353,8 @@ if __name__ == '__main__': for resource in resource_iterator_: yield resource_processor(resource) - spew(ctx.datapackage, - new_resource_iterator(ctx.resource_iterator), + spew(ctx.datapackage, + new_resource_iterator(ctx.resource_iterator), ctx.stats) ``` @@ -1637,11 +1637,11 @@ $ docker run -v `pwd`:/pipelines:rw -p 5000:5000 \ And then browse to `http://:5000/` to see the current execution status dashboard. -## Pipeline Dashboard +## Pipeline Dashboard & Status Badges -When installed on a server or running using the task scheduler, it's often very hard to know exactly what's running and what's the status of each pipeline. +When installed on a server or running using the task scheduler, it's often very hard to know exactly what's running and what the status is of each pipeline. -To make things easier, you can spin up the web dashboard which provides an overview of each pipeline's status, its basic info and the result of it latest execution. +To make things easier, you can spin up the web dashboard to provide an overview of each pipeline's status, its basic info and the result of it latest execution. To start the web server run `dpp serve` from the command line and browse to http://localhost:5000 @@ -1649,6 +1649,34 @@ The environment variable `DPP_BASE_PATH` will determine whether dashboard will b The dashboard endpoints can be made to require authentication by adding a username and password with the environment variables `DPP_BASIC_AUTH_USERNAME` and `DPP_BASIC_AUTH_PASSWORD`. +Even simpler pipeline status is available with a status badge, both for individual pipelines, and for pipeline collections. For a single pipeline, add the full pipeline id to the badge endpoint: + +``` +http://localhost:5000/badge/path_to/pipelines/my-pipeline-id +``` + +![](https://img.shields.io/badge/pipeline-succeeded%20(30756%20records)-brightgreen.svg) + +![](https://img.shields.io/badge/pipeline-invalid-lightgrey.svg) + +![](https://img.shields.io/badge/pipeline-failed-red.svg) + +![](https://img.shields.io/badge/pipeline-not%20found-lightgray.svg) + +Or for a collection of pipelines: + +``` +http://localhost:5000/badge/collection/path_to/pipelines/ +``` + +![](https://img.shields.io/badge/pipelines-22%20succeeded-brightgreen.svg) + +![](https://img.shields.io/badge/pipelines-4%20running%2C%20%201%20succeeded%2C%205%20queued-yellow.svg) + +![](https://img.shields.io/badge/pipelines-11%20succeeded%2C%207%20failed%2C%201%20invalid-red.svg) + +![](https://img.shields.io/badge/pipelines-not%20found-lightgray.svg) + ## Integrating with other services Datapackage-pipelines can call a predefined webhook on any pipeline event. This might allow for potential integrations with other applications. From 1f2fb81b4b68958b77f12550a5c070ccb7a915dd Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 13:57:11 +0000 Subject: [PATCH 6/8] Use basic auth for dashboard view, but not badges This commit makes application of basic auth (if active) more selective. Instead of forcing basic auth on all endpoints, it is now applied as a decorator for the main dashboard views. This allows us to serve the status badges without basic auth while still being able to protect the dashboard pages. --- datapackage_pipelines/web/server.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/datapackage_pipelines/web/server.py b/datapackage_pipelines/web/server.py index 51bde1dc..e89eb227 100644 --- a/datapackage_pipelines/web/server.py +++ b/datapackage_pipelines/web/server.py @@ -2,6 +2,7 @@ import os from io import BytesIO import logging +from functools import wraps from copy import deepcopy from collections import Counter @@ -84,10 +85,29 @@ def flatten(children_): return groups +def basic_auth_required(view_func): + """ + A decorator that can be used to protect specific views with HTTP basic + access authentication. Conditional on having BASIC_AUTH_USERNAME and + BASIC_AUTH_PASSWORD set as env vars. + """ + @wraps(view_func) + def wrapper(*args, **kwargs): + if app.config.get('BASIC_AUTH_ACTIVE', False): + if basic_auth.authenticate(): + return view_func(*args, **kwargs) + else: + return basic_auth.challenge() + else: + return view_func(*args, **kwargs) + return wrapper + + blueprint = Blueprint('dpp', 'dpp') @blueprint.route("") +@basic_auth_required def main(): all_pipeline_ids = sorted(status.all_pipeline_ids()) statuses = [] @@ -153,6 +173,7 @@ def refresh(): @blueprint.route("api/raw/status") +@basic_auth_required def pipeline_raw_api_status(): pipelines = sorted(status.all_statuses(), key=lambda x: x.get('id')) for pipeline in pipelines: @@ -164,6 +185,7 @@ def pipeline_raw_api_status(): @blueprint.route("api/raw/") +@basic_auth_required def pipeline_raw_api(pipeline_id): if not pipeline_id.startswith('./'): pipeline_id = './' + pipeline_id @@ -199,6 +221,7 @@ def pipeline_raw_api(pipeline_id): @blueprint.route("api//") +@basic_auth_required def pipeline_api(field, pipeline_id): if not pipeline_id.startswith('./'): @@ -304,10 +327,10 @@ def badge_collection(pipeline_path): and os.environ.get('DPP_BASIC_AUTH_PASSWORD', False): app.config['BASIC_AUTH_USERNAME'] = os.environ['DPP_BASIC_AUTH_USERNAME'] app.config['BASIC_AUTH_PASSWORD'] = os.environ['DPP_BASIC_AUTH_PASSWORD'] + app.config['BASIC_AUTH_ACTIVE'] = True - basic_auth = BasicAuth(app) +basic_auth = BasicAuth(app) - app.config['BASIC_AUTH_FORCE'] = True CORS(app) From 744509b7bf02170b47f91409f7a442bb59bf01ea Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 14:00:49 +0000 Subject: [PATCH 7/8] Removing the refresh view as it's not used and looks like a DoS risk. --- datapackage_pipelines/web/server.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/datapackage_pipelines/web/server.py b/datapackage_pipelines/web/server.py index e89eb227..af94b730 100644 --- a/datapackage_pipelines/web/server.py +++ b/datapackage_pipelines/web/server.py @@ -16,8 +16,6 @@ from flask_jsonpify import jsonify from flask_basicauth import BasicAuth -from datapackage_pipelines.celery_tasks.celery_tasks import \ - execute_update_pipelines from datapackage_pipelines.status import status_mgr from datapackage_pipelines.utilities.stat_utils import user_facing_stats @@ -166,12 +164,6 @@ def state_or_dirty(state, p): markdown=markdown) -@blueprint.route("api/refresh") -def refresh(): - execute_update_pipelines() - return jsonify({'ok': True}) - - @blueprint.route("api/raw/status") @basic_auth_required def pipeline_raw_api_status(): From f727cae5761d5d04c111a7da21098904cb6fe8ea Mon Sep 17 00:00:00 2001 From: Brook Elgie Date: Thu, 31 Jan 2019 14:18:16 +0000 Subject: [PATCH 8/8] Add note to README about basic auth and the badges --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 5741ab0f..068e2e41 100644 --- a/README.md +++ b/README.md @@ -1677,6 +1677,8 @@ http://localhost:5000/badge/collection/path_to/pipelines/ ![](https://img.shields.io/badge/pipelines-not%20found-lightgray.svg) +Note that these badge endpoints will always be exposed regardless of `DPP_BASIC_AUTH_PASSWORD` and `DPP_BASIC_AUTH_USERNAME` settings. + ## Integrating with other services Datapackage-pipelines can call a predefined webhook on any pipeline event. This might allow for potential integrations with other applications.