Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Refactor run_transform function
Browse files Browse the repository at this point in the history
There was a lot of code in this function so I tried to break it up into
more meaningful blocks of functionality.
  • Loading branch information
tombooth committed Dec 15, 2014
1 parent 26d623f commit d684e4d
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions backdrop/transformers/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,27 @@ def entrypoint(dataset_id, earliest, latest):
)


@app.task(ignore_result=True)
def run_transform(data_set_config, transform, earliest, latest):
data_set = DataSet.from_group_and_type(
config.BACKDROP_READ_URL,
data_set_config['data_group'],
data_set_config['data_type'],
)

def get_query_parameters(transform, earliest, latest):
query_parameters = transform.get('query-parameters', {})
query_parameters['flatten'] = 'true'
query_parameters['start_at'] = earliest
query_parameters['end_at'] = latest

data = data_set.get(query_parameters=query_parameters)
return query_parameters


def get_transform_function(transform):
function_namespace = transform['type']['function']
function_name = function_namespace.split('.')[-1]
module_namespace = '.'.join(function_namespace.split('.')[:-1])

transform_module = importlib.import_module(module_namespace)
transform_function = getattr(transform_module, function_name)
return getattr(transform_module, function_name)

transformed_data = transform_function(data['data'], transform['options'])

def get_output_dataset(transform, input_dataset):
output_group = transform['output'].get(
'data-group', data_set_config['data_group'])
'data-group', input_dataset['data_group'])
output_type = transform['output']['data-type']

admin_api = AdminAPI(
Expand All @@ -65,10 +60,28 @@ def run_transform(data_set_config, transform, earliest, latest):
)
output_data_set_config = admin_api.get_data_set(output_group, output_type)

output_data_set = DataSet.from_group_and_type(
return DataSet.from_group_and_type(
config.BACKDROP_WRITE_URL,
output_group,
output_type,
token=output_data_set_config['bearer_token'],
)


@app.task(ignore_result=True)
def run_transform(data_set_config, transform, earliest, latest):
data_set = DataSet.from_group_and_type(
config.BACKDROP_READ_URL,
data_set_config['data_group'],
data_set_config['data_type'],
)

data = data_set.get(
query_parameters=get_query_parameters(transform, earliest, latest)
)

transform_function = get_transform_function(transform)
transformed_data = transform_function(data['data'], transform['options'])

output_data_set = get_output_dataset(transform, data_set_config)
output_data_set.post(transformed_data)

0 comments on commit d684e4d

Please sign in to comment.