Skip to content

Commit

Permalink
Merge f727cae into 4be9ecb
Browse files Browse the repository at this point in the history
  • Loading branch information
brew committed Jan 31, 2019
2 parents 4be9ecb + f727cae commit 2fc0156
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 42 deletions.
50 changes: 40 additions & 10 deletions README.md
Expand Up @@ -56,7 +56,7 @@ worldbank-co2-emissions:
-
run: dump_to_zip
parameters:
out-file: co2-emissions-wb.zip
out-file: co2-emissions-wb.zip
```

In this example we see one pipeline called `worldbank-co2-emissions`. Its pipeline consists of 4 steps:
Expand Down Expand Up @@ -530,7 +530,7 @@ _Parameters_:

- `count` - count the number of occurrences of a specific key
For this method, specifying `name` is not required. In case it is specified, `count` will count the number of non-null values for that source field.

- `counters` - count the number of occurrences of distinct values
Will return an array of 2-tuples of the form `[value, count-of-value]`.

Expand Down Expand Up @@ -563,7 +563,7 @@ _Important: the "source" resource **must** appear before the "target" resource i
key: ["CC"]
fields:
population:
name: "census_2015"
name: "census_2015"
full: true
```

Expand Down Expand Up @@ -719,7 +719,7 @@ Filtering just American and European countries, leaving out countries whose main

Duplicate a resource.

`duplicate` accepts the name of a single resource in the datapackage.
`duplicate` accepts the name of a single resource in the datapackage.
It will then duplicate it in the output datapackage, with a different name and path.
The duplicated resource will appear immediately after its original.

Expand Down Expand Up @@ -1097,7 +1097,7 @@ _Parameters_:

You should provide a `name` and `url` attributes, and other optional attributes as defined in the [spec]([http://specs.frictionlessdata.io/data-packages/#resource-information).

`url` indicates where the data for this resource resides. Later on, when `stream_remote_resources` runs, it will use the `url` (which is stored in the resource in the `dpp:streamedFrom` property) to read the data rows and push them into the pipeline.
`url` indicates where the data for this resource resides. Later on, when `stream_remote_resources` runs, it will use the `url` (which is stored in the resource in the `dpp:streamedFrom` property) to read the data rows and push them into the pipeline.

Note that `url` also supports `env://<environment-variable>`, which indicates that the resource url should be fetched from the indicated environment variable. This is useful in case you are supplying a string with sensitive information (such as an SQL connection string for streaming from a database table).

Expand Down Expand Up @@ -1353,8 +1353,8 @@ if __name__ == '__main__':
for resource in resource_iterator_:
yield resource_processor(resource)

spew(ctx.datapackage,
new_resource_iterator(ctx.resource_iterator),
spew(ctx.datapackage,
new_resource_iterator(ctx.resource_iterator),
ctx.stats)
```

Expand Down Expand Up @@ -1637,18 +1637,48 @@ $ docker run -v `pwd`:/pipelines:rw -p 5000:5000 \

And then browse to `http://<docker machine's IP address>:5000/` to see the current execution status dashboard.

## Pipeline Dashboard
## Pipeline Dashboard & Status Badges

When installed on a server or running using the task scheduler, it's often very hard to know exactly what's running and what's the status of each pipeline.
When installed on a server or running using the task scheduler, it's often very hard to know exactly what's running and what the status is of each pipeline.

To make things easier, you can spin up the web dashboard which provides an overview of each pipeline's status, its basic info and the result of it latest execution.
To make things easier, you can spin up the web dashboard to provide an overview of each pipeline's status, its basic info and the result of it latest execution.

To start the web server run `dpp serve` from the command line and browse to http://localhost:5000

The environment variable `DPP_BASE_PATH` will determine whether dashboard will be served from root or from another base path (example value: `/pipelines/`).

The dashboard endpoints can be made to require authentication by adding a username and password with the environment variables `DPP_BASIC_AUTH_USERNAME` and `DPP_BASIC_AUTH_PASSWORD`.

Even simpler pipeline status is available with a status badge, both for individual pipelines, and for pipeline collections. For a single pipeline, add the full pipeline id to the badge endpoint:

```
http://localhost:5000/badge/path_to/pipelines/my-pipeline-id
```

![](https://img.shields.io/badge/pipeline-succeeded%20(30756%20records)-brightgreen.svg)

![](https://img.shields.io/badge/pipeline-invalid-lightgrey.svg)

![](https://img.shields.io/badge/pipeline-failed-red.svg)

![](https://img.shields.io/badge/pipeline-not%20found-lightgray.svg)

Or for a collection of pipelines:

```
http://localhost:5000/badge/collection/path_to/pipelines/
```

![](https://img.shields.io/badge/pipelines-22%20succeeded-brightgreen.svg)

![](https://img.shields.io/badge/pipelines-4%20running%2C%20%201%20succeeded%2C%205%20queued-yellow.svg)

![](https://img.shields.io/badge/pipelines-11%20succeeded%2C%207%20failed%2C%201%20invalid-red.svg)

![](https://img.shields.io/badge/pipelines-not%20found-lightgray.svg)

Note that these badge endpoints will always be exposed regardless of `DPP_BASIC_AUTH_PASSWORD` and `DPP_BASIC_AUTH_USERNAME` settings.

## Integrating with other services

Datapackage-pipelines can call a predefined webhook on any pipeline event. This might allow for potential integrations with other applications.
Expand Down
133 changes: 101 additions & 32 deletions datapackage_pipelines/web/server.py
@@ -1,20 +1,21 @@
import datetime
import os

from io import BytesIO
import logging
from functools import wraps
from copy import deepcopy
from collections import Counter

import slugify
import yaml
import mistune
from copy import deepcopy
from flask import Blueprint
import requests

from flask import Flask, render_template, abort, redirect
from flask import Blueprint, Flask, render_template, abort, send_file
from flask_cors import CORS
from flask_jsonpify import jsonify
from flask_basicauth import BasicAuth

from datapackage_pipelines.celery_tasks.celery_tasks import \
execute_update_pipelines
from datapackage_pipelines.status import status_mgr
from datapackage_pipelines.utilities.stat_utils import user_facing_stats

Expand Down Expand Up @@ -82,10 +83,29 @@ def flatten(children_):
return groups


def basic_auth_required(view_func):
"""
A decorator that can be used to protect specific views with HTTP basic
access authentication. Conditional on having BASIC_AUTH_USERNAME and
BASIC_AUTH_PASSWORD set as env vars.
"""
@wraps(view_func)
def wrapper(*args, **kwargs):
if app.config.get('BASIC_AUTH_ACTIVE', False):
if basic_auth.authenticate():
return view_func(*args, **kwargs)
else:
return basic_auth.challenge()
else:
return view_func(*args, **kwargs)
return wrapper


blueprint = Blueprint('dpp', 'dpp')


@blueprint.route("")
@basic_auth_required
def main():
all_pipeline_ids = sorted(status.all_pipeline_ids())
statuses = []
Expand Down Expand Up @@ -114,7 +134,8 @@ def main():
}[pipeline_status.state()],
'ended': datestr(ex.finish_time) if ex else None,
'started': datestr(ex.start_time) if ex else None,
'last_success': datestr(success_ex.finish_time) if success_ex else None,
'last_success':
datestr(success_ex.finish_time) if success_ex else None,
}
statuses.append(pipeline_obj)

Expand Down Expand Up @@ -143,13 +164,8 @@ def state_or_dirty(state, p):
markdown=markdown)


@blueprint.route("api/refresh")
def refresh():
execute_update_pipelines()
return jsonify({'ok': True})


@blueprint.route("api/raw/status")
@basic_auth_required
def pipeline_raw_api_status():
pipelines = sorted(status.all_statuses(), key=lambda x: x.get('id'))
for pipeline in pipelines:
Expand All @@ -161,6 +177,7 @@ def pipeline_raw_api_status():


@blueprint.route("api/raw/<path:pipeline_id>")
@basic_auth_required
def pipeline_raw_api(pipeline_id):
if not pipeline_id.startswith('./'):
pipeline_id = './' + pipeline_id
Expand All @@ -181,7 +198,9 @@ def pipeline_raw_api(pipeline_id):
"error_log": pipeline_status.errors(),
"stats": last_execution.stats if last_execution else None,
"success": last_execution.success if last_execution else None,
"last_success": last_successful_execution.finish_time if last_successful_execution else None,
"last_success":
last_successful_execution.finish_time
if last_successful_execution else None,
"trigger": last_execution.trigger if last_execution else None,

"pipeline": pipeline_status.pipeline_details,
Expand All @@ -194,6 +213,7 @@ def pipeline_raw_api(pipeline_id):


@blueprint.route("api/<field>/<path:pipeline_id>")
@basic_auth_required
def pipeline_api(field, pipeline_id):

if not pipeline_id.startswith('./'):
Expand All @@ -220,27 +240,76 @@ def pipeline_api(field, pipeline_id):
return jsonify(ret)


def _make_badge_response(subject, text, colour):
image_url = 'https://img.shields.io/badge/{}-{}-{}.svg'.format(
subject, text, colour)
r = requests.get(image_url)
buffer_image = BytesIO(r.content)
buffer_image.seek(0)
return send_file(buffer_image, mimetype='image/svg+xml')


@blueprint.route("badge/<path:pipeline_id>")
def badge(pipeline_id):
'''An individual pipeline status'''
if not pipeline_id.startswith('./'):
pipeline_id = './' + pipeline_id
pipeline_status = status.get_status(pipeline_id)
if pipeline_status is None:
abort(404)
status_text = pipeline_status.get('message')
success = pipeline_status.get('success')
if success is True:
record_count = pipeline_status.get('stats', {}).get('total_row_count')
if record_count is not None:
status_text += ' (%d records)' % record_count
status_color = 'brightgreen'
elif success is False:
status_color = 'red'
pipeline_status = status.get(pipeline_id)

status_color = 'lightgray'
if pipeline_status.pipeline_details:
status_text = pipeline_status.state().lower()
last_execution = pipeline_status.get_last_execution()
success = last_execution.success if last_execution else None
if success is True:
stats = last_execution.stats if last_execution else None
record_count = stats.get('count_of_rows')
if record_count is not None:
status_text += ' (%d records)' % record_count
status_color = 'brightgreen'
elif success is False:
status_color = 'red'
else:
status_color = 'lightgray'
return redirect('https://img.shields.io/badge/{}-{}-{}.svg'.format(
'pipeline', status_text, status_color
))
status_text = "not found"
return _make_badge_response('pipeline', status_text, status_color)


@blueprint.route("badge/collection/<path:pipeline_path>")
def badge_collection(pipeline_path):
'''Status badge for a collection of pipelines.'''
all_pipeline_ids = sorted(status.all_pipeline_ids())

if not pipeline_path.startswith('./'):
pipeline_path = './' + pipeline_path

# Filter pipeline ids to only include those that start with pipeline_path.
path_pipeline_ids = \
[p for p in all_pipeline_ids if p.startswith(pipeline_path)]

statuses = []
for pipeline_id in path_pipeline_ids:
pipeline_status = status.get(pipeline_id)
if pipeline_status is None:
abort(404)
status_text = pipeline_status.state().lower()
statuses.append(status_text)

status_color = 'lightgray'
status_counter = Counter(statuses)
if status_counter:
if len(status_counter) == 1 and status_counter['succeeded'] > 0:
status_color = 'brightgreen'
elif status_counter['failed'] > 0:
status_color = 'red'
elif status_counter['failed'] == 0:
status_color = 'yellow'
status_text = \
', '.join(['{} {}'.format(v, k)
for k, v in status_counter.items()])
else:
status_text = "not found"

return _make_badge_response('pipelines', status_text, status_color)


app = Flask(__name__)
Expand All @@ -250,10 +319,10 @@ def badge(pipeline_id):
and os.environ.get('DPP_BASIC_AUTH_PASSWORD', False):
app.config['BASIC_AUTH_USERNAME'] = os.environ['DPP_BASIC_AUTH_USERNAME']
app.config['BASIC_AUTH_PASSWORD'] = os.environ['DPP_BASIC_AUTH_PASSWORD']
app.config['BASIC_AUTH_ACTIVE'] = True

basic_auth = BasicAuth(app)
basic_auth = BasicAuth(app)

app.config['BASIC_AUTH_FORCE'] = True

CORS(app)

Expand Down

0 comments on commit 2fc0156

Please sign in to comment.