Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Create output dataset if missing using the api.
Browse files Browse the repository at this point in the history
Default params are those of the parent. If it's realtime for instance
the transform target probably also needs to be realtime. The only
difference is the data_type and consequently the name (as this is auto
generated)
  • Loading branch information
jcbashdown committed Jan 14, 2015
1 parent bf840b0 commit 948c340
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 8 deletions.
13 changes: 11 additions & 2 deletions backdrop/transformers/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_transform_function(transform):
return getattr(transform_module, function_name)


def get_output_dataset(transform, input_dataset):
def get_or_get_and_create_output_dataset(transform, input_dataset):
output_group = transform['output'].get(
'data-group', input_dataset['data_group'])
output_type = transform['output']['data-type']
Expand All @@ -69,6 +69,13 @@ def get_output_dataset(transform, input_dataset):
config.STAGECRAFT_OAUTH_TOKEN,
)
output_data_set_config = admin_api.get_data_set(output_group, output_type)
if not output_data_set_config:
data_set_config = dict(input_dataset.items() + {
'data_type': output_type,
'data_group': output_group,
}.items())
del(data_set_config['name'])
output_data_set_config = admin_api.create_data_set(data_set_config)

return DataSet.from_group_and_type(
config.BACKDROP_WRITE_URL,
Expand All @@ -93,5 +100,7 @@ def run_transform(data_set_config, transform, earliest, latest):
transform_function = get_transform_function(transform)
transformed_data = transform_function(data['data'], transform['options'])

output_data_set = get_output_dataset(transform, data_set_config)
output_data_set = get_or_get_and_create_output_dataset(
transform,
data_set_config)
output_data_set.post(transformed_data)
135 changes: 129 additions & 6 deletions tests/transformers/test_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import unittest

from datetime import datetime
from hamcrest import assert_that, is_, has_entries
from hamcrest import assert_that, is_, has_entries, equal_to
from mock import patch, MagicMock

from backdrop.transformers.dispatch import (
entrypoint, run_transform, get_query_parameters
entrypoint,
run_transform,
get_query_parameters,
get_or_get_and_create_output_dataset
)


Expand All @@ -31,15 +34,27 @@ def test_entrypoint(self, mock_app, mock_adminAPI):
assert_that(mock_app.send_task.call_count, is_(2))
mock_app.send_task.assert_any_call(
'backdrop.transformers.dispatch.run_transform',
args=({"group": "foo", "type": "bar"}, {'type': 1}, earliest, latest))
args=(
{"group": "foo", "type": "bar"},
{'type': 1},
earliest,
latest))
mock_app.send_task.assert_any_call(
'backdrop.transformers.dispatch.run_transform',
args=({"group": "foo", "type": "bar"}, {'type': 2}, earliest, latest))
args=(
{"group": "foo", "type": "bar"},
{'type': 2},
earliest,
latest))

@patch('backdrop.transformers.dispatch.AdminAPI')
@patch('backdrop.transformers.dispatch.DataSet')
@patch('backdrop.transformers.tasks.debug.logging')
def test_run_transform(self, mock_logging_task, mock_data_set, mock_adminAPI):
def test_run_transform(
self,
mock_logging_task,
mock_data_set,
mock_adminAPI):
mock_logging_task.return_value = [{'new-data': 'point'}]
adminAPI_instance = mock_adminAPI.return_value
adminAPI_instance.get_data_set.return_value = {
Expand Down Expand Up @@ -97,7 +112,11 @@ def test_run_transform(self, mock_logging_task, mock_data_set, mock_adminAPI):
@patch('backdrop.transformers.dispatch.AdminAPI')
@patch('backdrop.transformers.dispatch.DataSet')
@patch('backdrop.transformers.tasks.debug.logging')
def test_run_transform_no_output_group(self, mock_logging_task, mock_data_set, mock_adminAPI):
def test_run_transform_no_output_group(
self,
mock_logging_task,
mock_data_set,
mock_adminAPI):
mock_logging_task.return_value = [{'new-data': 'point'}]
adminAPI_instance = mock_adminAPI.return_value
adminAPI_instance.get_data_set.return_value = {
Expand Down Expand Up @@ -135,6 +154,110 @@ def test_run_transform_no_output_group(self, mock_logging_task, mock_data_set, m
'http://backdrop/data', 'group', 'other-type', token='foo2',
)

@patch('backdrop.transformers.dispatch.AdminAPI')
@patch('backdrop.transformers.dispatch.DataSet')
def test_get_or_get_and_create_dataset_when_data_set_exists(
self,
mock_data_set,
mock_adminAPI):
transform_config = {
'output': {
'data-group': 'floop',
'data-type': 'wibble'
}
}
input_dataset_config = {
"bearer_token": "foo2",
'data_group': 'loop',
'data_type': 'flibble'
}
adminAPI_instance = mock_adminAPI.return_value
adminAPI_instance.get_data_set.return_value = {
"bearer_token": "foo2",
}
data_set_instance = MagicMock()
data_set_instance.get.return_value = {
'data': [
{'data': 'point'},
],
}
mock_data_set.from_group_and_type.return_value = data_set_instance

output_data_set = get_or_get_and_create_output_dataset(
transform_config,
input_dataset_config)
assert_that(output_data_set, equal_to(data_set_instance))
adminAPI_instance.get_data_set.assert_called_once_with(
'floop',
'wibble'
)
mock_data_set.from_group_and_type.assert_called_once_with(
'http://backdrop/data', 'floop', 'wibble', token='foo2',
)

@patch('backdrop.transformers.dispatch.AdminAPI')
@patch('backdrop.transformers.dispatch.DataSet')
def test_get_and_get_or_create_dataset_when_get_finds_nothing(
self,
mock_data_set,
mock_adminAPI):
transform_config = {
'output': {
'data-group': 'floop',
'data-type': 'wibble'
}
}
input_dataset_config = {
'name': 'loop_flibble',
'bearer_token': 'foo2',
'data_group': 'loop',
'data_type': 'flibble',
'realtime': False,
'auto_ids': 'aa,bb',
'max_age_expected': 86400,
'upload_filters': 'backdrop.filter.1',
'queryable': True,
'upload_format': '',
'raw_queries_allowed': True,
'published': False
}
adminAPI_instance = mock_adminAPI.return_value
adminAPI_instance.get_data_set.return_value = None
adminAPI_instance = mock_adminAPI.return_value
adminAPI_instance.create_data_set.return_value = {
'bearer_token': 'foo2',
'data_group': 'floop',
'data_type': 'wibble'
}
data_set_instance = MagicMock()
data_set_instance.get.return_value = {
'data': [
{'data': 'point'},
],
}
mock_data_set.from_group_and_type.return_value = data_set_instance

output_data_set = get_or_get_and_create_output_dataset(
transform_config,
input_dataset_config)
assert_that(output_data_set, equal_to(data_set_instance))
adminAPI_instance.create_data_set.assert_called_once_with({
'data_group': 'floop',
'data_type': 'wibble',
'bearer_token': 'foo2',
'realtime': False,
'auto_ids': 'aa,bb',
'max_age_expected': 86400,
'upload_filters': 'backdrop.filter.1',
'queryable': True,
'upload_format': '',
'raw_queries_allowed': True,
'published': False
})
mock_data_set.from_group_and_type.assert_called_once_with(
'http://backdrop/data', 'floop', 'wibble', token='foo2',
)


class GetQueryParametersTestCase(unittest.TestCase):

Expand Down

0 comments on commit 948c340

Please sign in to comment.