New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it easier to port Celery projects / queue background tasks #603

Closed
michi88 opened this Issue Jan 17, 2017 · 35 comments

Comments

Projects
None yet
@michi88
Contributor

michi88 commented Jan 17, 2017

Context

I have quite some Celery / django-q projects I would like to move to Zappa.
Now one can argue that you don't need any of the tasks libs like celery / django-q as lambda's can easily be invoked on a scheduled interval or based on S3 / SNS events. True, but this doesn't mean the api's (of celery for example) to create tasks aren't handy. Think; @task() decorator and some_func.delay() to queue the task.

I mostly use celery to invoke some background task to be handled later (sending email, updating search indexes etc.). What I would like to see is that I can still use the Celery task creation api in order for my app to be still portable if I want to move away from lambda (something Zappa promises).

Possible Fix

As celery already can use SQS as its queue we maybe can add a lambda handler to SQS events and execute the task.
I've been digging in the Celery codebase to search for the point where the task data (coming from the queue) is transformed and in the end executed by the worker. Because that is what we would need to do in a lambda handler.
Not easy to find, it's here:
https://github.com/celery/celery/blob/master/celery/worker/components.py#L229
Definitely not straightforward to get that working.

Another option would be to have a lambda 'celery worker' to run on a schedule every second (or whatever you want), have it connect to the SQS queue, pull a task (if available), execute it and exit. Far from ideal but workers can be told to only execute 1 task and exit (I'm told, haven't confirmed this yet). If that works we could also get around the 1 second schedule by having the lambda celery worker start on an SQS event but have if pull the task in a celery native fashion, instead of using the actual SQS message data when invoked.

Having said that I don't think we should try to make Zappa 'compatible' with Celery / django-q and whatever other framework there is. Celery though is a contender in my view as it's widely used.

The last option (except for not helping dev's on this topic at all) I see is creating helpers in zappa natively to replace the celery / django-q framework api's. Think;

task_id = zappa.queue.delay('module.my_func', *args, **kwargs)
# that would upload the tasks definition to for example S3
# a zappa framework native handler could execute the task based on the S3 event
# when finished the task result is uploaded back to S3 again.
task_result = zappa.queue.get_result(task_id, remove=True)  
# if finished, returns the task result and deletes the S3 task entry (if you want to), else None

If that existed I wouldn't have brought this up and I would happily write logic to use zappa vs celery based on the platform I'm running on.

One thing that does bother me with the 'zappa native' approach is that we'll be reinventing the wheel regarding task serialization, queueing, task de-serialization and executing the task. Possibly we might want to include some retry handling as well in that case.

The django-q codebase by the way is much smaller and easier to dig in. If we decide zappa should have some helpers for this it's a good starting point to look at.

Goal of this issue

I would like to start the discussion around this and collect everyone's opinion. If we come to the conclusion that Zappa should/could support something like this I'll work on a POC. If not, we can just close this issue :)

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Jan 18, 2017

So, I love this idea, and this was one of the original goals of the Zappa project.

A very annoying thing that I frequently forget is that SQS isn't a Lambda event source. I don't know why. This might sink this plan.

@richiverse

This comment has been minimized.

Contributor

richiverse commented Jan 18, 2017

@mp3il

This comment has been minimized.

mp3il commented Jan 31, 2017

@richiverse

This comment has been minimized.

Contributor

richiverse commented Jan 31, 2017

Honestly, take a look at dynamo db streams! It's a poor mans kinesis and can have lambdas triggered

http://blogs.atlassian.com/2014/08/replayable-transactions-event-sourcing-dynamodb/

@michi88

This comment has been minimized.

Contributor

michi88 commented Feb 1, 2017

SQS feels to involved (as it isn't an event source). I would like to look at dynamo streams more closely. Never worked with it myself.

I'm currently playing with just S3 as the event source. It feels like this might just be enough to act as a poor poor mans queue for simple async task delay mechanisms.

What I basically do is serialize a task as json (in a structured way like for example celery does), put it on a S3 'task' bucket and create a generic task_handler handler on the object created event source (s3:ObjectCreated:*) for this bucket. The task handler is responsible to map the event_type to the right function within the django app, execute it, and put the result back on S3 (and/or delete the task data from S3).

It basically has the interface described above zappa.queue.delay/get_result(...).

I'm not calling lambda functions directly as that way you can easily burst through your maximum concurrent lambdas. My assumption is that this will not happen with S3 events. At least you will get retries.
Good thing also about S3 is that you have no limit on the task payload size. For example SNS alone is not an option for that reason (for me).

This all works and is relatively straightforward (which I like). I do want to see what happens when there is some load on it in terms of more event created than the concurrency limit.

@michi88

This comment has been minimized.

Contributor

michi88 commented Feb 2, 2017

@Miserlou you mentioned this comment in the slack channel: #61 (comment)

Do I understand correctly that the idea there was to create and deploy multiple/separate lambda functions from the functions that are decorated? Wouldn't it make sense to have 1 event source and have 1 generic handler to route the event/task payload to the appropriate function in the project?

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Feb 3, 2017

Not multiple Lambdas, Zappa is completely monolithic by choice. This saves on complexity and keeping warm.

I should have said SNS rather than SQS, I always forget that SQS isn't a real event source.

@zappa
def make_pie(event, context):
    ingredients = get_ingredients()
    pie = bake(ingredients)
    deliver(pie)

@flask.route('api/order/pie')
def order_pie():
    make_pie()
    return "Your pie is being made!"

Then make_pie() would actually fire an SNS event, zappa deploy would create the SNS topic and register the event source, and it would all JustWork^tm.

This is all just spitballing, of course.

@richiverse

This comment has been minimized.

Contributor

richiverse commented Feb 3, 2017

why not @zappa('fifosqs|sqs|sns|dynamo|kinesis') ? And just implement the SNS one as proof of concept?

SQS in this case would be a per minute batching of events as there is a polling hack involved

I really like this idea btw!

@michi88

This comment has been minimized.

Contributor

michi88 commented Feb 3, 2017

What do you guys think about how to handle something like make_pie(countdown=20, eta=datetime_to_execute) as celery has? Should we?

BTW, I'm not trying to advocate that we copy the full celery interface as we want to keep it simple.

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Feb 3, 2017

What does countdown, do for failure retries?

Part of my vitiation for this project is how much I hate Celery (sorry Celery devs ilu) so I'm not super interested in just copying the interface, but let's also not re-invent the wheel.

@michi88

This comment has been minimized.

Contributor

michi88 commented Feb 3, 2017

countdown=20 would mean execute after a 20 sec delay. In celery you also have retry delay (countdown) for when a task fails. But that is definitely too much. Getting retry to work at all with DLQ's is probably the max we should go in that direction.

@flux627

This comment has been minimized.

flux627 commented Feb 3, 2017

@michi88 I love your S3 solution. Do you have any code to share? I agree it's a bit hacky (and perhaps too much so to be the official solution of Zappa) but maybe we can work this into a plugin of sorts, I'm willing to contribute.

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Feb 3, 2017

I'm leaning strongly towards SNS over S3 for reasons we discussed in Slack, that basically this should be a very lightweight call that can be done in a single HTTP response time with no problems, with fat S3 objects, that won't be the case. It's also more indirect.

It's very simple already, just

    sns_client.publish(
        TopicArn=SNS_ARN,
        Message=str(your_object_id),
    )

Then in your Zappa settings:

        "events": [
            {
                "function": "your_module_.your_async_func",
                "event_source": {
                    "arn":  YOUR_SNS_ARN,
                    "events": [
                        "sns:Publish"
                    ]
                }
            }
        ]
@michi88

This comment has been minimized.

Contributor

michi88 commented Feb 6, 2017

@flux627 the code I was working on strongly coupled to the app I tried it on and was only a POC to myself :). I would like to make something generic which is why I started this issue. Unfortunately I will not have time / start on this for this the coming 2 weeks. So if anyone is eager please go ahead...

@geeknam

This comment has been minimized.

Contributor

geeknam commented Feb 20, 2017

I'm donating some code that I'm using to achieve this: https://gist.github.com/geeknam/e5b4adf0a955748487f383cbe21211bd

The task decorator is inspired by Celery :)

@mp3il

This comment has been minimized.

mp3il commented Feb 20, 2017

@geeknam - cool, my guess is that supporting task response would be a problem with SNS. right?

@flux627

This comment has been minimized.

flux627 commented Feb 20, 2017

@geeknam Thank you for this! I'm sorry if it's obvious to everyone else, but can you give a little more details on how to use / integrate this into a project?

@geeknam

This comment has been minimized.

Contributor

geeknam commented Feb 21, 2017

@flux627

  1. Create SNS topic (I do this through Cloudformation), obtain the SNS topic arn (used in step 4)
  2. Put my snippet in async.py in your zappa project.
  3. Anywhere in your codebase, create a function:
from async import task

@task()
def time_consuming_task(*args, **kwargs):
    dosomethingforlessthanfiveminutes()
  1. Add SNS as event source in zappa_settings.json:
        "events": [
            {
                "function": "async.route_task",
                "event_source": {
                    "arn":  "arn:aws:sns:{region}:{account_id}:{topic_name}",
                    "events": [
                        "sns:Publish"
                    ]
                }
            }
        ]
  1. Add env var to Zappa:

SNS_ARN=arn:aws:sns:{region}:{account_id}:{topic_name}

  1. Invoke your task asynchronously (you might want to do this within request-response cycle of API Gateway):
time_consuming_task.delay()
  1. zappa update dev

  2. Profit

@geeknam

This comment has been minimized.

Contributor

geeknam commented Feb 21, 2017

I'm planning to contribute this to zappa:

  1. Add extra zappa settings: "async_tasks": true
  2. This automatically creates SNS topic and subscribes the current Lambda
  3. SNS messages will hit the task_router

Of course when I get some free time :)

@mcrowson

This comment has been minimized.

Collaborator

mcrowson commented Feb 27, 2017

I think the SNS is overkill. You get the same retry with async lambda calls. I don't see what value SNS has over Async lambda calls for this.

@aehlke

This comment has been minimized.

Contributor

aehlke commented Mar 3, 2017

I think the SNS is overkill. You get the same retry with async lambda calls. I don't see what value SNS has over Async lambda calls for this.

How do you make an async lambda call from within a zappa lambda function?

edit: https://boto3.readthedocs.io/en/latest/reference/services/lambda.html#Lambda.Client.invoke InvocationType=Event

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Mar 3, 2017

This is a good question, but I think there are tangible benefits.

SNS is rate-limit safe. SNS provides stored delivery receipts. SNS has a configurable retry policy.

@aehlke

This comment has been minimized.

Contributor

aehlke commented Mar 7, 2017

Do lambda function invocations via InvocationType=Event not include those same or similar benefits?

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Mar 7, 2017

That's a good question. I don't know if it's limit safe or if the retry policy is configurable. My gut says no.

@aehlke

This comment has been minimized.

Contributor

aehlke commented Mar 7, 2017

Some quick research, re: retries: http://docs.aws.amazon.com/lambda/latest/dg/retries-on-errors.html

Asynchronous invocation – Asynchronous events are queued before being used to invoke the Lambda function. If AWS Lambda is unable to fully process the event, it will automatically retry the invocation twice, with delays between retries. If you have specified a Dead Letter Queue for your function, then the failed event is sent to the specified Amazon SQS queue or Amazon SNS topic.

Seems this is the "officially-intended" way to do retries - the configurable retry behavior would reside in whatever handles the SQS or SNS message resulting from a failed invocation.

@Miserlou

This comment has been minimized.

Owner

Miserlou commented Mar 28, 2017

This is now merged into master!

Read the docs here: https://github.com/Miserlou/Zappa#asynchronous-task-execution

@antwan

This comment has been minimized.

antwan commented Jun 9, 2017

Just continuing the discussion here, as the async part of Zappa is almost perfect, but it lacks an important feature : possibility to trigger functions with a delay.

From my readings I understood it's still dodgy to implement, mainly because SNS doesn't support delays, and SQS doesn't support lambda execution. But there have been hacks to successfully delay lambda execution using cloudwatch alarm.

My question : Is this something that have been contemplated? Any plans for Zappa, for just waiting for AWS to come with a proper solution? Maybe using AWS Step Functions and wait_using_seconds?

@geeknam

This comment has been minimized.

Contributor

geeknam commented Jun 9, 2017

So I had this crazy idea of using DynamoDB TTL to expire an item which then triggers a lambda (leave delays responsiblity to DynamoDB). I gave that a try, unfortunately AWS doesn't guarantee that it expires and deletes the item at the exact same time as the specified timestamp (it only guarantees within 48h). I guess there's no other way other than a Cloudwatch Events with rate 1min.

@andytwoods

This comment has been minimized.

Contributor

andytwoods commented Jun 19, 2017

Hi all,
we are using this in production, to call functions after a delay / repeatedly (after a set delay): https://github.com/andytwoods/zappa-call-later/blob/master/zappa_call_later/models.py

With the idea being that a function is called periodically via scheduling https://github.com/Miserlou/Zappa#scheduling

Any thoughts :)

@jordanmkoncz

This comment has been minimized.

jordanmkoncz commented Dec 18, 2017

Having the ability to schedule a task to be run asynchronously at a specific time is the one missing piece for me to be able to easily use Zappa in a project where I'm currently using django-q. A simple use case is where a user makes a booking (AKA an appointment) for some time in the future, e.g. 15/12/2018 at 5pm, and when this happens I want to schedule an async task to send them a reminder email for their booking 1 hour before the booking date, i.e. 15/12/2018 at 4pm. With django-q this is very easy, I just create a scheduled task which runs once and is scheduled to run at the time I want. It would be great if it was just as easy to do this sort of thing with Zappa.

@prabhatpankaj

This comment has been minimized.

prabhatpankaj commented Jan 25, 2018

@geeknam , I am struggling to use celery in lambda function .

I had used celery + rabbitmq + django in ec2 .

our task is to move the data from temporary database table to report database table . As it will require connection in AWS VPC so external function will not work for this event .

As link provided https://gist.github.com/geeknam/e5b4adf0a955748487f383cbe21211bd

it will be very helpfull if you create sample project using zappa + django for this asynchronous task .

@spyoungtech

This comment has been minimized.

spyoungtech commented Apr 13, 2018

Similar to @jordanmkoncz -- I'm trying to find an elegant way to implement dynamic scheduling for tasks to be done in the (possibly distant) future. Currently, we satisfy this with django-celery-beat using the database scheduler.

One thought is to store these future tasks with their desired approximate execution time in a database table. Then, setup a regularly executing task that will check the table for any tasks that are ready to be invoked. I believe this is essentially the idea @andytwoods shared.

Maybe a model like this

class ScheduledTask(models.Model):
    sent_on = models.DateTimeField(null=True, default=None)
    task_name = models.CharField(max_length=128) # fully qualified name
    exec_after = models.DateTimeField()
    args = ArrayField()
    kwargs = JSONField()
    # ...
    @property
    def func(self):
        f = zappa.async.import_and_get_task(self.task_name)
        return f

Then a regularly occurring task that might look something like this.

pending_tasks = ScheduledTask.objects.filter(sent_on=None, exec_after__lte=now)
for task in pending_tasks:
    zappa.async.run(task.func, task.args, task.kwargs)
    task.sent_on = now
    task.save()

As long as you don't need very granular resolution on the time, seems like it should work.

But to make this work for Flask applications as well, instead of using a django model, perhaps dynamoDB or S3 could be used to store pending tasks. Thoughts?

@jiaaro

This comment has been minimized.

@hm1300

This comment has been minimized.

hm1300 commented Sep 18, 2018

So how to achieve this feature: possibility to trigger functions with a delay, knowing SQS already available as event-sourcing ?

any ideas ?

@jsharpe

This comment has been minimized.

jsharpe commented Nov 15, 2018

AWS step functions would be an obvious way to implement the delay and then trigger the lambda. The state machine can also handle retries as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment