diff --git a/backdrop/transformers/dispatch.py b/backdrop/transformers/dispatch.py index f92a86d0..54a2d467 100644 --- a/backdrop/transformers/dispatch.py +++ b/backdrop/transformers/dispatch.py @@ -59,7 +59,7 @@ def get_transform_function(transform): return getattr(transform_module, function_name) -def get_output_dataset(transform, input_dataset): +def get_or_get_and_create_output_dataset(transform, input_dataset): output_group = transform['output'].get( 'data-group', input_dataset['data_group']) output_type = transform['output']['data-type'] @@ -69,6 +69,13 @@ def get_output_dataset(transform, input_dataset): config.STAGECRAFT_OAUTH_TOKEN, ) output_data_set_config = admin_api.get_data_set(output_group, output_type) + if not output_data_set_config: + data_set_config = dict(input_dataset.items() + { + 'data_type': output_type, + 'data_group': output_group, + }.items()) + del(data_set_config['name']) + output_data_set_config = admin_api.create_data_set(data_set_config) return DataSet.from_group_and_type( config.BACKDROP_WRITE_URL, @@ -93,5 +100,7 @@ def run_transform(data_set_config, transform, earliest, latest): transform_function = get_transform_function(transform) transformed_data = transform_function(data['data'], transform['options']) - output_data_set = get_output_dataset(transform, data_set_config) + output_data_set = get_or_get_and_create_output_dataset( + transform, + data_set_config) output_data_set.post(transformed_data) diff --git a/tests/transformers/test_dispatch.py b/tests/transformers/test_dispatch.py index a2ee60a8..3f1936a3 100644 --- a/tests/transformers/test_dispatch.py +++ b/tests/transformers/test_dispatch.py @@ -2,11 +2,14 @@ import unittest from datetime import datetime -from hamcrest import assert_that, is_, has_entries +from hamcrest import assert_that, is_, has_entries, equal_to from mock import patch, MagicMock from backdrop.transformers.dispatch import ( - entrypoint, run_transform, get_query_parameters + entrypoint, + run_transform, + get_query_parameters, + get_or_get_and_create_output_dataset ) @@ -31,15 +34,27 @@ def test_entrypoint(self, mock_app, mock_adminAPI): assert_that(mock_app.send_task.call_count, is_(2)) mock_app.send_task.assert_any_call( 'backdrop.transformers.dispatch.run_transform', - args=({"group": "foo", "type": "bar"}, {'type': 1}, earliest, latest)) + args=( + {"group": "foo", "type": "bar"}, + {'type': 1}, + earliest, + latest)) mock_app.send_task.assert_any_call( 'backdrop.transformers.dispatch.run_transform', - args=({"group": "foo", "type": "bar"}, {'type': 2}, earliest, latest)) + args=( + {"group": "foo", "type": "bar"}, + {'type': 2}, + earliest, + latest)) @patch('backdrop.transformers.dispatch.AdminAPI') @patch('backdrop.transformers.dispatch.DataSet') @patch('backdrop.transformers.tasks.debug.logging') - def test_run_transform(self, mock_logging_task, mock_data_set, mock_adminAPI): + def test_run_transform( + self, + mock_logging_task, + mock_data_set, + mock_adminAPI): mock_logging_task.return_value = [{'new-data': 'point'}] adminAPI_instance = mock_adminAPI.return_value adminAPI_instance.get_data_set.return_value = { @@ -97,7 +112,11 @@ def test_run_transform(self, mock_logging_task, mock_data_set, mock_adminAPI): @patch('backdrop.transformers.dispatch.AdminAPI') @patch('backdrop.transformers.dispatch.DataSet') @patch('backdrop.transformers.tasks.debug.logging') - def test_run_transform_no_output_group(self, mock_logging_task, mock_data_set, mock_adminAPI): + def test_run_transform_no_output_group( + self, + mock_logging_task, + mock_data_set, + mock_adminAPI): mock_logging_task.return_value = [{'new-data': 'point'}] adminAPI_instance = mock_adminAPI.return_value adminAPI_instance.get_data_set.return_value = { @@ -135,6 +154,110 @@ def test_run_transform_no_output_group(self, mock_logging_task, mock_data_set, m 'http://backdrop/data', 'group', 'other-type', token='foo2', ) + @patch('backdrop.transformers.dispatch.AdminAPI') + @patch('backdrop.transformers.dispatch.DataSet') + def test_get_or_get_and_create_dataset_when_data_set_exists( + self, + mock_data_set, + mock_adminAPI): + transform_config = { + 'output': { + 'data-group': 'floop', + 'data-type': 'wibble' + } + } + input_dataset_config = { + "bearer_token": "foo2", + 'data_group': 'loop', + 'data_type': 'flibble' + } + adminAPI_instance = mock_adminAPI.return_value + adminAPI_instance.get_data_set.return_value = { + "bearer_token": "foo2", + } + data_set_instance = MagicMock() + data_set_instance.get.return_value = { + 'data': [ + {'data': 'point'}, + ], + } + mock_data_set.from_group_and_type.return_value = data_set_instance + + output_data_set = get_or_get_and_create_output_dataset( + transform_config, + input_dataset_config) + assert_that(output_data_set, equal_to(data_set_instance)) + adminAPI_instance.get_data_set.assert_called_once_with( + 'floop', + 'wibble' + ) + mock_data_set.from_group_and_type.assert_called_once_with( + 'http://backdrop/data', 'floop', 'wibble', token='foo2', + ) + + @patch('backdrop.transformers.dispatch.AdminAPI') + @patch('backdrop.transformers.dispatch.DataSet') + def test_get_and_get_or_create_dataset_when_get_finds_nothing( + self, + mock_data_set, + mock_adminAPI): + transform_config = { + 'output': { + 'data-group': 'floop', + 'data-type': 'wibble' + } + } + input_dataset_config = { + 'name': 'loop_flibble', + 'bearer_token': 'foo2', + 'data_group': 'loop', + 'data_type': 'flibble', + 'realtime': False, + 'auto_ids': 'aa,bb', + 'max_age_expected': 86400, + 'upload_filters': 'backdrop.filter.1', + 'queryable': True, + 'upload_format': '', + 'raw_queries_allowed': True, + 'published': False + } + adminAPI_instance = mock_adminAPI.return_value + adminAPI_instance.get_data_set.return_value = None + adminAPI_instance = mock_adminAPI.return_value + adminAPI_instance.create_data_set.return_value = { + 'bearer_token': 'foo2', + 'data_group': 'floop', + 'data_type': 'wibble' + } + data_set_instance = MagicMock() + data_set_instance.get.return_value = { + 'data': [ + {'data': 'point'}, + ], + } + mock_data_set.from_group_and_type.return_value = data_set_instance + + output_data_set = get_or_get_and_create_output_dataset( + transform_config, + input_dataset_config) + assert_that(output_data_set, equal_to(data_set_instance)) + adminAPI_instance.create_data_set.assert_called_once_with({ + 'data_group': 'floop', + 'data_type': 'wibble', + 'bearer_token': 'foo2', + 'realtime': False, + 'auto_ids': 'aa,bb', + 'max_age_expected': 86400, + 'upload_filters': 'backdrop.filter.1', + 'queryable': True, + 'upload_format': '', + 'raw_queries_allowed': True, + 'published': False + }) + mock_data_set.from_group_and_type.assert_called_once_with( + 'http://backdrop/data', 'floop', 'wibble', token='foo2', + ) + class GetQueryParametersTestCase(unittest.TestCase):