Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Use send_task so celery tasks need to be imported into backdrop.write
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrco committed Dec 8, 2014
1 parent ba2c57e commit 844827e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 32 deletions.
12 changes: 12 additions & 0 deletions backdrop/transformers/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from worker import app


@app.task(ignore_result=True)
def dispatch(dataset_id):
"""
For the given parameters, query stagecraft for transformations
to run, and dispatch tasks to the appropriate workers.
"""

# TODO: query stagecraft with dataset_id and run transforms
pass
4 changes: 1 addition & 3 deletions backdrop/transformers/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from celery import Celery

import backdrop.write.tasks

# Load the appropriate config file as a python module
import importlib
from os import getenv
Expand All @@ -13,7 +11,7 @@
app = Celery(
'transformations',
broker=config.TRANSFORMER_AMQP_URL,
include=['backdrop.write.tasks'])
include=['tasks'])


if __name__ == '__main__':
Expand Down
11 changes: 7 additions & 4 deletions backdrop/write/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from os import getenv
from celery import Celery

from flask import abort, Flask, g, jsonify, request
from flask_featureflags import FeatureFlag
Expand All @@ -7,8 +8,6 @@
from backdrop.core.flaskutils import DataSetConverter
from backdrop.write.decompressing_request import DecompressingRequest

from backdrop.write.tasks import dispatch

from ..core.errors import ParseError, ValidationError
from ..core import log_handler, cache_control
from ..core.flaskutils import generate_request_id
Expand Down Expand Up @@ -48,6 +47,8 @@

app.url_map.converters["data_set"] = DataSetConverter

celery_app = Celery(broker=app.config['TRANSFORMER_AMQP_URL'])


def _record_write_error(e):
app.logger.exception(e)
Expand Down Expand Up @@ -127,7 +128,8 @@ def write_by_group(data_group, data_type):
if errors:
return (jsonify(messages=errors), 400)
else:
dispatch.delay(data_set_config['name'])
celery_app.send_task('transformations.tasks.dispatch',
args=(data_set_config['name'],))
return jsonify(status='ok')


Expand All @@ -152,7 +154,8 @@ def put_by_group_and_type(data_group, data_type):
if len(data) > 0:
abort(400, 'Not implemented: you can only pass an empty JSON list')

dispatch.delay(data_set_config['name'])
celery_app.send_task('transformations.tasks.dispatch',
args=(data_set_config['name'],))
return _empty_data_set(data_set_config)

except (ParseError, ValidationError) as e:
Expand Down
25 changes: 0 additions & 25 deletions backdrop/write/tasks.py

This file was deleted.

0 comments on commit 844827e

Please sign in to comment.