Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Don't dispatch task if there is no data
Browse files Browse the repository at this point in the history
I've pulled out the triggering of tasks to a separate function, this
then handles avoiding dispatch if there is no data.

I have also removed dispatching of tasks from PUT as this is only used
for cleaning out a data set and the transformed data set should be
emptied separately.

This also brings the transform triggering endpoint into line with using
trigger_transforms. This was passing through strings before but it is
now expected to be passing through datetime instances.
  • Loading branch information
tombooth committed Dec 15, 2014
1 parent c5d7b42 commit fa6bec3
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
23 changes: 14 additions & 9 deletions backdrop/write/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from os import getenv
from celery import Celery

from dateutil.parser import parse as datetime_parse
from flask import abort, Flask, g, jsonify, request
from flask_featureflags import FeatureFlag
from backdrop import statsd
Expand Down Expand Up @@ -130,9 +131,7 @@ def write_by_group(data_group, data_type):
if errors:
return (jsonify(messages=errors), 400)
else:
earliest, latest = bounding_dates(data)
celery_app.send_task('backdrop.transformers.dispatch.entrypoint',
args=(data_set_config['name'], earliest, latest))
trigger_transforms(data_set_config, data)
return jsonify(status='ok')


Expand All @@ -157,8 +156,6 @@ def put_by_group_and_type(data_group, data_type):
if len(data) > 0:
abort(400, 'Not implemented: you can only pass an empty JSON list')

celery_app.send_task('backdrop.transformers.dispatch.entrypoint',
args=(data_set_config['name'], None, None))
return _empty_data_set(data_set_config)

except (ParseError, ValidationError) as e:
Expand Down Expand Up @@ -230,16 +227,15 @@ def transform_data_set(data_group, data_type):
return (jsonify(messages=[repr(e)]), 400)

if '_start_at' in data:
start_at = data['_start_at']
start_at = datetime_parse(data['_start_at'])
if '_end_at' in data:
end_at = data['_end_at']
end_at = datetime_parse(data['_end_at'])
else:
end_at = datetime.datetime.now()
else:
abort(400, 'You must specify a _start_at timestamp')

celery_app.send_task('backdrop.transformers.dispatch.entrypoint',
args=(data_set_config['name'], start_at, end_at))
trigger_transforms(data_set_config, earliest=start_at, latest=end_at)

return jsonify(status='ok')

Expand Down Expand Up @@ -324,6 +320,15 @@ def bounding_dates(data):
return sorted_data[0]['_timestamp'], sorted_data[-1]['_timestamp']


def trigger_transforms(data_set_config, data=[], earliest=None, latest=None):
if len(data) > 0:
earliest, latest = bounding_dates(data)

if earliest is not None and latest is not None:
celery_app.send_task('backdrop.transformers.dispatch.entrypoint',
args=(data_set_config['name'], earliest, latest))


def start(port):
# this method only gets run on dev
# app.debug = True
Expand Down
41 changes: 40 additions & 1 deletion tests/write/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import unittest

from hamcrest import assert_that, is_
from mock import patch

from backdrop.write.api import bounding_dates
from backdrop.write.api import bounding_dates, trigger_transforms


class BoundingDatesTestCase(unittest.TestCase):
Expand All @@ -20,3 +21,41 @@ def test_bounding_dates(self):

assert_that(earliest.day, is_(1))
assert_that(latest.day, is_(9))


class TriggerTransformsTestCase(unittest.TestCase):

@patch('backdrop.write.api.celery_app')
def test_trigger_transforms(self, mock_celery_app):
earliest = datetime.datetime(2014, 9, 3)
latest = datetime.datetime(2014, 9, 10)
data = [
{'_timestamp': earliest},
{'_timestamp': latest},
]

trigger_transforms({'name': 'dataset'}, data)

mock_celery_app.send_task.assert_called_with(
'backdrop.transformers.dispatch.entrypoint',
args=('dataset', earliest, latest))

@patch('backdrop.write.api.celery_app')
def test_trigger_transforms_no_data(self, mock_celery_app):
trigger_transforms({'name': 'dataset'}, [])
assert_that(mock_celery_app.send_task.called, is_(False))

@patch('backdrop.write.api.celery_app')
def test_trigger_transforms_with_dates(self, mock_celery_app):
earliest = datetime.datetime(2014, 9, 3)
latest = datetime.datetime(2014, 9, 10)

trigger_transforms(
{'name': 'dataset'},
earliest=earliest,
latest=latest
)

mock_celery_app.send_task.assert_called_with(
'backdrop.transformers.dispatch.entrypoint',
args=('dataset', earliest, latest))

0 comments on commit fa6bec3

Please sign in to comment.