Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: 'tuple' object has no attribute 'startswith' #883

Open
mpenkov opened this issue May 31, 2018 · 5 comments
Open

AttributeError: 'tuple' object has no attribute 'startswith' #883

mpenkov opened this issue May 31, 2018 · 5 comments

Comments

@mpenkov
Copy link

mpenkov commented May 31, 2018

I get the following exception when sending a task to a non-default queue (top line of the trace is my own code):

  File "/Users/misha/pn/dbi2/dbi2/server.py", line 581, in add_input
    celery.absorb_input_file.apply_async(kwargs=task_kwargs)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/task.py", line 535, in apply_async
    **options
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/base.py", line 729, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/celery/app/amqp.py", line 552, in send_task_message
    **properties
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/connection.py", line 494, in _ensured
    return fun(*args, **kwargs)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 194, in _publish
    [maybe_declare(entity) for entity in declare]
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 194, in <listcomp>
    [maybe_declare(entity) for entity in declare]
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/messaging.py", line 102, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/common.py", line 129, in maybe_declare
    return _maybe_declare(entity, declared, ident, channel, orig)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/common.py", line 135, in _maybe_declare
    entity.declare(channel=channel)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 605, in declare
    self._create_exchange(nowait=nowait, channel=channel)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 612, in _create_exchange
    self.exchange.declare(nowait=nowait, channel=channel)
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 181, in declare
    if self._can_declare():
  File "/Users/misha/envs/dbi2/lib/python3.6/site-packages/kombu/entity.py", line 168, in _can_declare
    self.name and not self.name.startswith(
AttributeError: 'tuple' object has no attribute 'startswith'

Now, the code that raises the exception is:

    def _can_declare(self):
        print(self.name)  # my own debugging
        return not self.no_declare and (
            self.name and not self.name.startswith(
                INTERNAL_EXCHANGE_PREFIX))

The above print function gives me:

('celery_slow',)

so it is indeed a tuple, whereas the code is expecting a string.

If I remove my task routing by deleting this code in my celery initialization, as described here:

    app.conf.task_routes = {
        'dbi2.celery.absorb_input_file': {'queue': 'celery_slow'},
    }

or, alternatively,

    app.conf.task_routes = ([
        ('dbi2.celery.absorb_input_file', {'queue': 'celery_slow'}),
    ],)

then everything works.

How can I get my task to work with the non-default queue?

@mpenkov
Copy link
Author

mpenkov commented May 31, 2018

I put in some print statements at various points of the stack trace, and got this:

apply_async.options: {'queue': None, 'routing_key': None, 'exchange': None, 'priority': None, 'expires': None, 'serializer': 'json', 'delivery_mode': None, 'compression': None, 'time_limit': None, 'soft_time_limit': None, 'immediate'
: None, 'mandatory': None}
send_task.options: {'queue': <unbound Queue 'celery_slow', -> <unbound Exchange 'celery_slow',(direct)> -> 'celery_slow',>, 'serializer': 'json'}
send_task.route_name: None name: 'dbi2.celery.absorb_input_file' args: None kwargs: {'input_id': '5b0f7da01ea23059c974d2be', 'filepath': '/Users/misha/pn/dbi2/gitignore/tmp/Added via Web UI_LHHQTP', 'keep_file': False} task_type: <@t
ask: dbi2.celery.absorb_input_file of tasks at 0x10ad3eb70>

Not sure if that helps.

@mpenkov
Copy link
Author

mpenkov commented May 31, 2018

(dbi2) sergeyich:dbi2 misha$ pip freeze | grep celery
celery==4.1.1
(dbi2) sergeyich:dbi2 misha$ pip freeze | grep kombu
kombu==4.2.1```

@georgepsarakis
Copy link
Contributor

@mpenkov you can try debugging at a higher level, perhaps from celery.app.routes where the configured routes are being setup. I did have a look with your examples and it seems to work properly. I am guessing that the problem is on Celery initialization steps instead of Kombu.

@mpenkov
Copy link
Author

mpenkov commented May 31, 2018

Thank you for the fast reply.

I managed to get it working by specifying queue as a keyword argument to the apply_async function. I'm happy to debug via celery.app.routes, but it's not on the stack trace. Where would I start? For reference, here's my celery init code:

def _connect_to_celery():
    broker = 'pyamqp://%s@%s' % (config.CONFIG.ampq_user, config.CONFIG.ampq_host)
    app = celery.Celery('tasks', backend='rpc://', broker=broker)
    app.conf.enable_utc = True
    app.conf.timezone = 'UTC'
    app.conf.worker_log_format = WORKER_LOG_FORMAT
    app.conf.task_routes = {
        'dbi2.celery.start_job': {'queue': SLOW_QUEUE_NAME},
        'dbi2.celery._absorb_input_file': {'queue': SLOW_QUEUE_NAME},
    }
    return app

app = _connect_to_celery()

def _absorb_input_file(input_id, filepath, keep_file=False):
    pass # do application-specific stuff here

absorb_input_file = app.task(_absorb_input_file)

@auvipy
Copy link
Member

auvipy commented Apr 17, 2022

would love to know if this issue still persists

@auvipy auvipy modified the milestones: 5.3, 5.3.x Jun 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants