Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support message sizes > 256KB by using a third-party backend... #279

Open
diranged opened this issue Nov 20, 2013 · 13 comments
Open

Support message sizes > 256KB by using a third-party backend... #279

diranged opened this issue Nov 20, 2013 · 13 comments

Comments

@diranged
Copy link
Contributor

SQS only supports messages up to 256KB. Given that limitation, its very easy to hit the limit and fail your task submission. Here's a simple example:

group(benchmark_tasks.WaitTask().s(args=[0, data]) for i in xrange(1024)).apply_async()
Traceback (most recent call last):
File "", line 1, in
File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/canvas.py", line 194, in apply_async
return self._apply_async(args, kwargs, *_options)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/app/builtins.py", line 213, in apply_async
list(tasks), result.serializable(), gid, args), *_options
File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/app/task.py", line 474, in apply_async
*_options)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/celery/app/amqp.py", line 250, in publish_task
*_kwargs
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/messaging.py", line 164, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/connection.py", line 453, in _ensured
return fun(_args, *_kwargs)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/messaging.py", line 180, in _publish
mandatory=mandatory, immediate=immediate,
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/transport/virtual/init.py", line 469, in basic_publish
exchange, routing_key, *_kwargs)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/transport/virtual/exchange.py", line 61, in deliver
_put(queue, message, *_kwargs)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/kombu/transport/SQS.py", line 232, in _put
q.write(m)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/boto/sqs/queue.py", line 220, in write
delay_seconds)
File "/opt/nextdoor-ve/lib/python2.7/site-packages/boto/sqs/connection.py", line 253, in send_message
queue.id, verb='POST')
File "/opt/nextdoor-ve/lib/python2.7/site-packages/boto/connection.py", line 1062, in get_object
raise self.ResponseError(response.status, response.reason, body)
SQSError: SQSError: 400 Bad Request

SenderInvalidParameterValueValue for parameter MessageBody is invalid. Reason: Message body must be shorter than 262144 bytes.a34e4e32-406a-5652-b527-0537f7db8afe

There are two ways to fix this that I see.

  1. Fix only the group() command by having it break up groups automatically into smaller chunks any time the message-size limitation is reached. This is probably simple to do in the code for Kombu.
  2. Create an alternative data backend (memcache, redis, or better yet ... dynamodb) where messages > 256KB are stored and a pointer is used to let Kombu know where to go and get its raw message data from.
@diwu1989
Copy link

option 1 seems much better than option 2, because option 2 adds another piece of dependency and a place where things can break.

@ask
Copy link
Contributor

ask commented Dec 20, 2013

Not sure how option 1 would work, as there is no safe place to put the knife. If it needs to divide an object then to recover the object at the other end, all of the pieces must be received by the same process, and then what happens if one or more of the pieces are lost? or what if the process dies while processing them? May be more places where things can break in option 1. Option 2 would have to be optional, and a user can always choose to store big objects somewhere else and pass URLs manually.

@ask ask added the SQS label May 21, 2014
@ask ask added this to the v4.1 milestone Jul 9, 2016
@auvipy auvipy modified the milestones: v4.1, 5.0 Jan 13, 2018
@shuklaabhi
Copy link

shuklaabhi commented Mar 10, 2020

Will you be accepting pull requests for it, option-2?

@auvipy auvipy removed the Not Funded label Mar 10, 2020
@auvipy
Copy link
Member

auvipy commented Mar 10, 2020

we will review the PR ofcourse. please come with proper unit and integrations tests :) and mention me

@auvipy auvipy modified the milestones: 5.0, 4.7 May 5, 2020
@jmsmkn
Copy link

jmsmkn commented Aug 25, 2021

I don't know if this helps anyone who is thinking of adding this but there is a python implementation of the option 2 pattern using S3 for storage that is influenced by the AWS Java Extended client for SQS: https://github.com/archetype-digital/aws-sqs-extended. This extends boto3 with extra calls that could be used as a basis for a transport (I think). I've not reviewed the code in detail but it is tested under Python 3.7, 3.8 and 3.9, 99% test coverage and MIT licensed.

I'm a long time user of Celery+SQS but don't know my way around the internals, but I'd really be interested in a solution to this issue and would be happy to help out where I can.

@auvipy auvipy modified the milestones: 5.1.0, 5.3 Sep 11, 2021
@auvipy
Copy link
Member

auvipy commented Sep 11, 2021

I found this article very useful https://walid.dev/blog/saving-costs-asking-for-forgiveness-in-python/

@auvipy
Copy link
Member

auvipy commented Sep 11, 2021

I don't know if this helps anyone who is thinking of adding this but there is a python implementation of the option 2 pattern using S3 for storage that is influenced by the AWS Java Extended client for SQS: https://github.com/archetype-digital/aws-sqs-extended. This extends boto3 with extra calls that could be used as a basis for a transport (I think). I've not reviewed the code in detail but it is tested under Python 3.7, 3.8 and 3.9, 99% test coverage and MIT licensed.

I'm a long time user of Celery+SQS but don't know my way around the internals, but I'd really be interested in a solution to this issue and would be happy to help out where I can.

this could be useful some use cases

@jmsmkn
Copy link

jmsmkn commented Mar 31, 2023

AWS have now released the extended client library for Python, allowing up to 2GB messages on SNS via S3:

@auvipy
Copy link
Member

auvipy commented Mar 31, 2023

based on new library can we close this or we need to integrate and ensure it is supported in kombu?

@jmsmkn
Copy link

jmsmkn commented Mar 31, 2023

Ah, wait, sorry, the new lib is for addressing the same problem for SNS and not SQS, so not helpful here. Apologies.

@auvipy auvipy modified the milestones: 5.3, 5.4 Jun 17, 2023
@terrykfwong
Copy link

AWS have now released the extended client library for Python, allowing up to 2GB messages on SQS via S3:

Announcement
GitHub Repo

Amwam added a commit to Amwam/kombu that referenced this issue Sep 18, 2024
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary.

As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action

Relates to: celery#279
@Amwam
Copy link

Amwam commented Sep 18, 2024

I've made a start on attempting to integrate the sqs_extended_client. You can view that here: https://github.com/celery/kombu/compare/main...Amwam:kombu:add-sqs-large-payload-support?expand=1

While this appears to work, I'm not sure if there are issues in the implementation. There are also features missing, such as automatically deleting the payload, after the task has been completed.

The core issue I've run into is that way kombu fetches messages from SQS is via the HTTP API, rather than using boto3, so the extended client isn't used when retrieving messages, only for publishing. Another PR references a desire to convert to using boto3 for calls, but this seems like a bigger refactoring is required to make this happen in a performant way.
As a result, decoding the SQS message requires some manual handling to mimic how the sqs_extended_client behaves.

Amwam added a commit to Amwam/kombu that referenced this issue Sep 18, 2024
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary.

As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action

Relates to: celery#279
@auvipy
Copy link
Member

auvipy commented Oct 7, 2024

I've made a start on attempting to integrate the sqs_extended_client. You can view that here: https://github.com/celery/kombu/compare/main...Amwam:kombu:add-sqs-large-payload-support?expand=1

While this appears to work, I'm not sure if there are issues in the implementation. There are also features missing, such as automatically deleting the payload, after the task has been completed.

The core issue I've run into is that way kombu fetches messages from SQS is via the HTTP API, rather than using boto3, so the extended client isn't used when retrieving messages, only for publishing. Another PR references a desire to convert to using boto3 for calls, but this seems like a bigger refactoring is required to make this happen in a performant way. As a result, decoding the SQS message requires some manual handling to mimic how the sqs_extended_client behaves.

good job on starting work on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants