From fa6bec3b4260dcff0d491cf0e10283632524d5ae Mon Sep 17 00:00:00 2001 From: Tom Booth Date: Mon, 15 Dec 2014 17:19:43 +0000 Subject: [PATCH] Don't dispatch task if there is no data I've pulled out the triggering of tasks to a separate function, this then handles avoiding dispatch if there is no data. I have also removed dispatching of tasks from PUT as this is only used for cleaning out a data set and the transformed data set should be emptied separately. This also brings the transform triggering endpoint into line with using trigger_transforms. This was passing through strings before but it is now expected to be passing through datetime instances. --- backdrop/write/api.py | 23 ++++++++++++++--------- tests/write/test_api.py | 41 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/backdrop/write/api.py b/backdrop/write/api.py index 3cb82d30..d294920d 100644 --- a/backdrop/write/api.py +++ b/backdrop/write/api.py @@ -3,6 +3,7 @@ from os import getenv from celery import Celery +from dateutil.parser import parse as datetime_parse from flask import abort, Flask, g, jsonify, request from flask_featureflags import FeatureFlag from backdrop import statsd @@ -130,9 +131,7 @@ def write_by_group(data_group, data_type): if errors: return (jsonify(messages=errors), 400) else: - earliest, latest = bounding_dates(data) - celery_app.send_task('backdrop.transformers.dispatch.entrypoint', - args=(data_set_config['name'], earliest, latest)) + trigger_transforms(data_set_config, data) return jsonify(status='ok') @@ -157,8 +156,6 @@ def put_by_group_and_type(data_group, data_type): if len(data) > 0: abort(400, 'Not implemented: you can only pass an empty JSON list') - celery_app.send_task('backdrop.transformers.dispatch.entrypoint', - args=(data_set_config['name'], None, None)) return _empty_data_set(data_set_config) except (ParseError, ValidationError) as e: @@ -230,16 +227,15 @@ def transform_data_set(data_group, data_type): return (jsonify(messages=[repr(e)]), 400) if '_start_at' in data: - start_at = data['_start_at'] + start_at = datetime_parse(data['_start_at']) if '_end_at' in data: - end_at = data['_end_at'] + end_at = datetime_parse(data['_end_at']) else: end_at = datetime.datetime.now() else: abort(400, 'You must specify a _start_at timestamp') - celery_app.send_task('backdrop.transformers.dispatch.entrypoint', - args=(data_set_config['name'], start_at, end_at)) + trigger_transforms(data_set_config, earliest=start_at, latest=end_at) return jsonify(status='ok') @@ -324,6 +320,15 @@ def bounding_dates(data): return sorted_data[0]['_timestamp'], sorted_data[-1]['_timestamp'] +def trigger_transforms(data_set_config, data=[], earliest=None, latest=None): + if len(data) > 0: + earliest, latest = bounding_dates(data) + + if earliest is not None and latest is not None: + celery_app.send_task('backdrop.transformers.dispatch.entrypoint', + args=(data_set_config['name'], earliest, latest)) + + def start(port): # this method only gets run on dev # app.debug = True diff --git a/tests/write/test_api.py b/tests/write/test_api.py index 0b107acc..e5004f72 100644 --- a/tests/write/test_api.py +++ b/tests/write/test_api.py @@ -2,8 +2,9 @@ import unittest from hamcrest import assert_that, is_ +from mock import patch -from backdrop.write.api import bounding_dates +from backdrop.write.api import bounding_dates, trigger_transforms class BoundingDatesTestCase(unittest.TestCase): @@ -20,3 +21,41 @@ def test_bounding_dates(self): assert_that(earliest.day, is_(1)) assert_that(latest.day, is_(9)) + + +class TriggerTransformsTestCase(unittest.TestCase): + + @patch('backdrop.write.api.celery_app') + def test_trigger_transforms(self, mock_celery_app): + earliest = datetime.datetime(2014, 9, 3) + latest = datetime.datetime(2014, 9, 10) + data = [ + {'_timestamp': earliest}, + {'_timestamp': latest}, + ] + + trigger_transforms({'name': 'dataset'}, data) + + mock_celery_app.send_task.assert_called_with( + 'backdrop.transformers.dispatch.entrypoint', + args=('dataset', earliest, latest)) + + @patch('backdrop.write.api.celery_app') + def test_trigger_transforms_no_data(self, mock_celery_app): + trigger_transforms({'name': 'dataset'}, []) + assert_that(mock_celery_app.send_task.called, is_(False)) + + @patch('backdrop.write.api.celery_app') + def test_trigger_transforms_with_dates(self, mock_celery_app): + earliest = datetime.datetime(2014, 9, 3) + latest = datetime.datetime(2014, 9, 10) + + trigger_transforms( + {'name': 'dataset'}, + earliest=earliest, + latest=latest + ) + + mock_celery_app.send_task.assert_called_with( + 'backdrop.transformers.dispatch.entrypoint', + args=('dataset', earliest, latest))