Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InvalidResponse: Protocol Error: b'1' #6335

Closed
10 of 18 tasks
HosseyNJF opened this issue Sep 6, 2020 · 32 comments
Closed
10 of 18 tasks

InvalidResponse: Protocol Error: b'1' #6335

HosseyNJF opened this issue Sep 6, 2020 · 32 comments

Comments

@HosseyNJF
Copy link

HosseyNJF commented Sep 6, 2020

Checklist

  • I have verified that the issue exists against the master branch of Celery. (I cannot test this because it's a production server)
  • This has already been asked to the discussion group first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery. (I cannot test this because it's a production server)
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

Environment & Settings

Celery version: 4.4.7 (cliffs)

celery report Output:

software -> celery:4.4.7 (cliffs) kombu:4.6.11 py:3.6.9
            billiard:3.6.3.0 redis:3.5.3
platform -> system:Linux arch:64bit, ELF
            kernel version:4.15.0-115-generic imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis+socket:///var/run/redis/redis-server.sock

ABSOLUTE_URL_OVERRIDES: {
 }
ADMINS: []
ALLOWED_HOSTS: ['proj.com']
APPEND_SLASH: True
AUTHENTICATION_BACKENDS: ['django.contrib.auth.backends.ModelBackend']
AUTH_PASSWORD_VALIDATORS: '********'
AUTH_USER_MODEL: 'accounts.CustomUser'
BASE_DIR: '/home/proj/website'
CACHES: {
    'default': {   'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
                   'LOCATION': 'unix:/var/run/memcached/memcached.sock'}}
CACHE_MIDDLEWARE_ALIAS: 'default'
CACHE_MIDDLEWARE_KEY_PREFIX: '********'
CACHE_MIDDLEWARE_SECONDS: 600
CELERY_BROKER_URL: 'redis+socket:///var/run/redis/redis-server.sock'
CELERY_RESULT_BACKEND: 'redis+socket:///var/run/redis/redis-server.sock'
CSRF_COOKIE_AGE: 31449600
CSRF_COOKIE_DOMAIN: None
CSRF_COOKIE_HTTPONLY: False
CSRF_COOKIE_NAME: 'csrftoken'
CSRF_COOKIE_PATH: '/'
CSRF_COOKIE_SAMESITE: 'Lax'
CSRF_COOKIE_SECURE: False
CSRF_FAILURE_VIEW: 'django.views.csrf.csrf_failure'
CSRF_HEADER_NAME: 'HTTP_X_CSRFTOKEN'
CSRF_TRUSTED_ORIGINS: []
CSRF_USE_SESSIONS: False
DATABASES: {
    'default': {   'ATOMIC_REQUESTS': False,
                   'AUTOCOMMIT': True,
                   'CONN_MAX_AGE': 0,
                   'ENGINE': 'django.db.backends.postgresql',
                   'HOST': 'localhost',
                   'NAME': 'proj',
                   'OPTIONS': {},
                   'PASSWORD': '********',
                   'PORT': '',
                   'TEST': {   'CHARSET': None,
                               'COLLATION': None,
                               'MIRROR': None,
                               'NAME': None},
                   'TIME_ZONE': None,
                   'USER': 'postgres'}}
DATABASE_ROUTERS: '********'
DATA_UPLOAD_MAX_MEMORY_SIZE: 2621440
DATA_UPLOAD_MAX_NUMBER_FIELDS: 1000
DATETIME_FORMAT: 'N j, Y, P'
DATETIME_INPUT_FORMATS: ['%Y-%m-%d %H:%M:%S',
 '%Y-%m-%d %H:%M:%S.%f',
 '%Y-%m-%d %H:%M',
 '%Y-%m-%d',
 '%m/%d/%Y %H:%M:%S',
 '%m/%d/%Y %H:%M:%S.%f',
 '%m/%d/%Y %H:%M',
 '%m/%d/%Y',
 '%m/%d/%y %H:%M:%S',
 '%m/%d/%y %H:%M:%S.%f',
 '%m/%d/%y %H:%M',
 '%m/%d/%y']
DATE_FORMAT: 'N j, Y'
DATE_INPUT_FORMATS: ['%Y-%m-%d',
 '%m/%d/%Y',
 '%m/%d/%y',
 '%b %d %Y',
 '%b %d, %Y',
 '%d %b %Y',
 '%d %b, %Y',
 '%B %d %Y',
 '%B %d, %Y',
 '%d %B %Y',
 '%d %B, %Y']
DEBUG: False
DEBUG_PROPAGATE_EXCEPTIONS: False
DECIMAL_SEPARATOR: '.'
DEFAULT_CHARSET: 'utf-8'
DEFAULT_EXCEPTION_REPORTER_FILTER: 'django.views.debug.SafeExceptionReporterFilter'
DEFAULT_FILE_STORAGE: 'django.core.files.storage.FileSystemStorage'
DEFAULT_FROM_EMAIL: 'PROJ <noreply@proj.com>'
DEFAULT_INDEX_TABLESPACE: ''
DEFAULT_TABLESPACE: ''
DISALLOWED_USER_AGENTS: []
EMAIL_BACKEND: 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST: 'localhost'
EMAIL_HOST_PASSWORD: '********'
EMAIL_HOST_USER: ''
EMAIL_PORT: 25
EMAIL_SSL_CERTFILE: None
EMAIL_SSL_KEYFILE: '********'
EMAIL_SUBJECT_PREFIX: '[Django] '
EMAIL_TIMEOUT: None
EMAIL_USE_LOCALTIME: False
EMAIL_USE_SSL: False
EMAIL_USE_TLS: False
FILE_CHARSET: 'utf-8'
FILE_UPLOAD_DIRECTORY_PERMISSIONS: None
FILE_UPLOAD_HANDLERS: ['django.core.files.uploadhandler.MemoryFileUploadHandler',
 'django.core.files.uploadhandler.TemporaryFileUploadHandler']
FILE_UPLOAD_MAX_MEMORY_SIZE: 2621440
FILE_UPLOAD_PERMISSIONS: 420
FILE_UPLOAD_TEMP_DIR: None
FIRST_DAY_OF_WEEK: 0
FIXTURE_DIRS: []
FORCE_SCRIPT_NAME: None
FORMAT_MODULE_PATH: None
FORM_RENDERER: 'django.forms.renderers.TemplatesSetting'
IGNORABLE_404_URLS: []
INSTALLED_APPS: ['proj.apps.SuitConfig',
 'fullurl',
 'django_telegrambot',
 'django_jalali',
 'django_extensions',
 'debug_toolbar',
 'django_user_agents',
 'htmlemailer',
 'qsessions',
 'django.forms',
 'django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'django.contrib.humanize',
  # .... my proj apps here
 ]
INTERNAL_IPS: ['127.0.0.1']
LANGUAGES: 
    (('fa', 'Farsi'), ('en', 'انگلیسی'))
LANGUAGES_BIDI: ['he', 'ar', 'fa', 'ur']
LANGUAGE_CODE: 'fa'
LANGUAGE_COOKIE_AGE: None
LANGUAGE_COOKIE_DOMAIN: None
LANGUAGE_COOKIE_HTTPONLY: False
LANGUAGE_COOKIE_NAME: 'django_language'
LANGUAGE_COOKIE_PATH: '/'
LANGUAGE_COOKIE_SAMESITE: None
LANGUAGE_COOKIE_SECURE: False
LOCALE_PATHS: []
LOGGING: {
    'disable_existing_loggers': False,
    'formatters': {   'standard': {   'format': '%(asctime)s [%(levelname)s] '
                                                '%(name)s: %(message)s'}},
    'handlers': {   'file': {   'backupCount': 10,
                                'class': 'logging.handlers.RotatingFileHandler',
                                'filename': '/var/log/proj/website.log',
                                'formatter': 'standard',
                                'level': 'INFO',
                                'maxBytes': 10485760}},
    'loggers': {   'django': {   'handlers': ['file'],
                                 'level': 'INFO',
                                 'propagate': True}},
    'version': 1}
LOGGING_CONFIG: 'logging.config.dictConfig'
LOGIN_REDIRECT_URL: 'dashboard'
LOGIN_URL: 'login'
LOGOUT_REDIRECT_URL: 'index'
MANAGERS: []
MAX_STRING_LENGTH: 65536
MEDIA_ROOT: '/home/proj/website/media'
MEDIA_URL: '/media/'
MESSAGE_STORAGE: 'django.contrib.messages.storage.fallback.FallbackStorage'
MESSAGE_TAGS: {
 25: 'success', 40: 'danger'}
MIDDLEWARE: ['htmlmin.middleware.MarkRequestMiddleware',
 'htmlmin.middleware.HtmlMinifyMiddleware',
 'debug_toolbar.middleware.DebugToolbarMiddleware',
 'django.middleware.security.SecurityMiddleware',
 'qsessions.middleware.SessionMiddleware',
 'django.middleware.locale.LocaleMiddleware',
 'django.middleware.common.CommonMiddleware',
 'django.middleware.csrf.CsrfViewMiddleware',
 'django.contrib.auth.middleware.AuthenticationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware',
 'django_user_agents.middleware.UserAgentMiddleware']
MIGRATION_MODULES: {
 }
MONTH_DAY_FORMAT: 'F j'
NUMBER_GROUPING: 0
PASSWORD_HASHERS: '********'
PASSWORD_RESET_TIMEOUT_DAYS: '********'
PREPEND_WWW: False
ROOT_URLCONF: 'proj.urls'
SECRET_KEY: '********'
SECURE_BROWSER_XSS_FILTER: False
SECURE_CONTENT_TYPE_NOSNIFF: True
SECURE_HSTS_INCLUDE_SUBDOMAINS: False
SECURE_HSTS_PRELOAD: False
SECURE_HSTS_SECONDS: 0
SECURE_PROXY_SSL_HEADER: None
SECURE_REDIRECT_EXEMPT: []
SECURE_REFERRER_POLICY: None
SECURE_SSL_HOST: None
SECURE_SSL_REDIRECT: False
SENTRY_API_KEY: '********'
SENTRY_TRACES_SAMPLE_RATE: 0.0001
SERVER_EMAIL: 'root@localhost'
SESSION_CACHE_ALIAS: 'default'
SESSION_COOKIE_AGE: 1209600
SESSION_COOKIE_DOMAIN: None
SESSION_COOKIE_HTTPONLY: True
SESSION_COOKIE_NAME: 'sessionid'
SESSION_COOKIE_PATH: '/'
SESSION_COOKIE_SAMESITE: 'Lax'
SESSION_COOKIE_SECURE: False
SESSION_ENGINE: 'qsessions.backends.cached_db'
SESSION_EXPIRE_AT_BROWSER_CLOSE: False
SESSION_FILE_PATH: None
SESSION_SAVE_EVERY_REQUEST: False
SESSION_SERIALIZER: 'django.contrib.sessions.serializers.JSONSerializer'
SETTINGS_MODULE: 'proj.settings'
SHORT_DATETIME_FORMAT: 'm/d/Y P'
SHORT_DATE_FORMAT: 'm/d/Y'
SIGNING_BACKEND: 'django.core.signing.TimestampSigner'
SILENCED_SYSTEM_CHECKS: []
STATICFILES_DIRS: ['/home/proj/website/proj/static']
STATICFILES_FINDERS: ['django.contrib.staticfiles.finders.FileSystemFinder',
 'django.contrib.staticfiles.finders.AppDirectoriesFinder']
STATICFILES_STORAGE: 'proj.storages.ForgivingManifestStaticFilesStorage'
STATIC_ROOT: '/home/proj/website/static'
STATIC_URL: '/static/'
TEMPLATES: [{'APP_DIRS': True,
  'BACKEND': 'django.template.backends.django.DjangoTemplates',
  'DIRS': ['/home/proj/website/templates'],
  'OPTIONS': {'context_processors': ['django.template.context_processors.debug',
                                     'django.template.context_processors.request',
                                     'django.contrib.auth.context_processors.auth',
                                     'django.contrib.messages.context_processors.messages']}}]
TEST_NON_SERIALIZED_APPS: []
TEST_RUNNER: 'django.test.runner.DiscoverRunner'
THOUSAND_SEPARATOR: ','
TIME_FORMAT: 'P'
TIME_INPUT_FORMATS: ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M']
TIME_ZONE: 'Asia/Tehran'
USE_I18N: True
USE_L10N: True
USE_THOUSAND_SEPARATOR: False
USE_TZ: True
USE_X_FORWARDED_HOST: False
USE_X_FORWARDED_PORT: False
WSGI_APPLICATION: 'proj.wsgi.application'
X_FRAME_OPTIONS: 'DENY'
YEAR_MONTH_FORMAT: 'F Y'
is_overridden: <bound method Settings.is_overridden of <Settings "proj.settings">>

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: 3.6
  • Minimal Celery Version: 4.4
  • Minimal Kombu Version: 4.6.11
  • Minimal Broker Version: 4.0.9
  • Minimal Result Backend Version: 4.0.9
  • Minimal OS and/or Kernel Version: Ubuntu 18.04
  • Minimal Broker Client Version: 3.5.3
  • Minimal Result Backend Client Version: 3.5.3

Python Packages

pip freeze Output:

aiohttp==3.6.2
amqp==2.6.1 (I don't think that I'm using this)
asgiref==3.2.5
async-timeout==3.0.1
attrs==19.3.0
beautifulsoup4==4.8.2
billiard==3.6.3.0
celery==4.4.7
certifi==2019.11.28
cffi==1.14.0
chardet==3.0.4
commonmark==0.9.1
commonmarkextensions==0.0.5
cryptography==2.8
cssutils==1.0.2
decorator==4.4.2
Django==3.0.8
django-debug-toolbar==2.2
django-extensions==2.2.8
django-forgiving-collectstatic==1.0.0
django-fullurl==1.1
django-html-emailer==0.0.8.1
django-htmlmin==0.11.0
django-ipware==3.0.1
django-jalali==4.0.0
django-qsessions==1.0.0
django-suit @ https://github.com/darklow/django-suit/tarball/v2
-e git://github.com/telebotter/django-telegrambot.git@76742ec4cff775bf4e31df481face7662f5f412f#egg=django_telegrambot
django-user-agents==0.4.0
future==0.18.2
geoip2==4.0.2
html5lib==1.0.1
htpasswd==2.3
idna==2.10
idna-ssl==1.1.0
importlib-metadata==1.7.0
jdatetime==3.6.2
kombu==4.6.11
maxminddb==2.0.2
multidict==4.7.6
orderedmultidict==1.0.1
pkg-resources==0.0.0
psycopg2-binary==2.8.5
pycparser==2.20
pynliner==0.8.0
python-memcached==1.59
python-telegram-bot==12.4.2
pytz==2019.3
redis==3.5.3
requests==2.24.0
sentry-sdk==0.17.3
six==1.14.0
soupsieve==2.0
sqlparse==0.3.1
tornado==6.0.4
typing-extensions==3.7.4.2
ua-parser==0.10.0
urllib3==1.25.10
user-agents==2.1
vine==1.3.0
webencodings==0.5.1
yarl==1.5.1
zipp==3.1.0

Other Dependencies

I'm using celery with Django, placed behind an Apache2 server, using the WSGI mod. The entries for WSGI are:

WSGIDaemonProcess proj python-home=/home/proj/website/.venv user=proj group=proj socket-user=proj processes=3 threads=15
WSGIProcessGroup proj
WSGIScriptAlias / /home/proj/website/proj/wsgi.py

Minimally Reproducible Test Case

Just use celery with Redis and Django behind apache2, in multi-threaded mode.

Expected Behavior

I expect that celery works every time I run a task.

Actual Behavior

Most of the time starting a task works fine, but sometimes I get this error (or something similar with different values inside ProtocolError):

InvalidResponse: Protocol Error: b'1'
  File "django/core/handlers/exception.py", line 34, in inner
    response = get_response(request)

Full stacktrace here

@thedrow
Copy link
Member

thedrow commented Sep 9, 2020

Thank you for the very detailed bug report.

This exception does not get handled. I think that we should catch it and log an error instead. Actually I'm not so sure since this is happening from the publisher's side.
Furthermore, I'm not sure how this is related to thread safety since the Redis connection pool is thread safe.

@andymccurdy I'm not very familiar with the Redis protocol. Do you have a clue on what's going on here?

@HosseyNJF
Copy link
Author

I can confirm that there is no error when the wsgi settings are changed to this:

WSGIDaemonProcess proj python-home=/home/proj/website/.venv user=proj group=proj socket-user=proj processes=3 threads=1

(Notice the single thread per process)
So probably there is something going on with the threads.

@andymccurdy
Copy link

My guess is that this is a race condition.

All Redis messages start with a control character that indicates what type of data comes next. For some reason, the stack trace above shows that the control character is not present. The only time I’ve seen this happen is when multiple threads are reading from the same socket at the same time.

Keep in mind that while the connection pool is indeed threadsafe, the connections that the pool hand out are not thread safe. This includes PubSub objects as they hold a reference to a single connection that’s in pubsub mode.

When celery enqueues a task, does it use a single pubsub connection? Multiple threads are likely enqueueing at the same time and if celery is enqueueing those tasks with a single pubsub connection without guarding access with a threading.Lock, that would lead to an error like this.

@HosseyNJF
Copy link
Author

HosseyNJF commented Sep 9, 2020

I'm not really familiar with celery's internals, so there is nothing that I could add to your response. But I'm plenty much sure that this is a race condition, and would be fixed by having some kind of lock system inside; because I have faced similar problems (nonsense response from server) while working with sockets in multiple threads and fixed it this way.

Is there any workaround for it in the meantime, so that I could use the WSGI's multi-threading? The single-thread system is performing poorly now.

@thedrow
Copy link
Member

thedrow commented Sep 10, 2020

Is there any workaround for it in the meantime, so that I could use the WSGI's multi-threading? The single-thread system is performing poorly now.
Not that I'm aware of.

@thedrow
Copy link
Member

thedrow commented Sep 10, 2020

My guess is that this is a race condition.

All Redis messages start with a control character that indicates what type of data comes next. For some reason, the stack trace above shows that the control character is not present. The only time I’ve seen this happen is when multiple threads are reading from the same socket at the same time.

Keep in mind that while the connection pool is indeed threadsafe, the connections that the pool hand out are not thread safe. This includes PubSub objects as they hold a reference to a single connection that’s in pubsub mode.

When celery enqueues a task, does it use a single pubsub connection? Multiple threads are likely enqueueing at the same time and if celery is enqueueing those tasks with a single pubsub connection without guarding access with a threading.Lock, that would lead to an error like this.

This is how we subscribe to a channel.

@andymccurdy
Copy link

How do ResultConsumers work within Celery? Is there a singleton ResultConsumer instance for the application? Or are there multiple ResultConsumer instances, perhaps one per thread or one per task invocation?

@thedrow
Copy link
Member

thedrow commented Sep 10, 2020

There's only one.
Actually I think #5145 intended to fix this but it didn't get merged as of yet.

If there's a better way, I'm all ears.

@andymccurdy
Copy link

@thedrow ah, that makes sense then and explains this issue. 2 threads are calling pubsub.get_mesage() at the same time.

You could use a thread local as #5145 suggests. Remember that each thread will have its own socket connection. If Celery still puts all those file descriptors in a list for a select(), you’ll need to include the descriptor from each of those.

An easier way might be to keep the single consumer instance and add a threading.Lock() context manager in drain_events() around the get_message() call. That should make sure that multiple threads aren’t reading off the same socket at the same time. You’d still have to deal with making sure the correct thread gets the message it’s looking for, but I presume you’re already doing this.

Alternatively, you could abandon pubsub completely and replace it with Redis streams (added in 5.0). Streams are a time ordered data structure that holds messages. They also remember which consumers have read which messages. They’re far more durable than pubsub. If a connection is severed, the client can pick up where it left off with no message loss.

@thedrow
Copy link
Member

thedrow commented Sep 13, 2020

We are going for a new implementation of the Redis broker but in the meanwhile we need to support that broken one.

@andymccurdy You make good points. Thank you.
Why don't we have thread-safe connections from the Redis client BTW?

@andymccurdy
Copy link

@thedrow The ConnectionPool is thread-safe. The connections the pool hands out are not thread safe. Connections would incur a massive performance penalty to make them thread safe. You'd either have to synchronize the entire request/response round trip (which would block all other threads while waiting for the socket) or synchronize each send/read and keep track of the order each thread made a request so that responses can be distributed to the appropriate thread. In both cases it's far more performant for each thread to have its own connection.

The docs (last paragraph of this section) explicitly state that PubSub objects are not thread safe

@dmmatson
Copy link

I encountered this same issue with the Redis result backend. Sometime after completing a large number of tasks, my API queries task results like so: AsyncResult(TASK_ID).get() in a gunicorn process that runs with --threads 5. If multiple clients simultaneously hit the REST endpoint which executes the get() it pops up fairly quickly.

Thought I would chime in that I can't reproduce this when load testing with hundreds of concurrent threads after downgrading to these dependencies:

amqp==1.4.9
anyjson==0.3.3
billiard==3.3.0.23
celery==3.1.25
kombu==3.0.37
redis==2.10.6

I may attempt to introduce a global lock when retrieving results (per the previous comment) in my gunicorn process so I can stick with Celery 4.x until this is fixed.

@mskogorevrmc
Copy link

Hi. I encountered this same issue with the Redis result backend. Most of the time starting a task works fine, but sometimes I get this error. If I understand correctly, these are similar errors.

My error:

Protocol Error: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00*3'
Traceback (most recent call last):
  File "/var/www/qwer/app/mod_axmsg/dispatch.py", line 376, in execute
    result_value = func_meta.func(*task.args_list, **task.kwargs_dict)
  File "/var/www/qwer/app/mod_rmc/libs/health.py", line 91, in update_all_health
    health.update_all_health()
  File "/var/www/qwer/app/mod_core/libs/health.py", line 597, in update_all_health
    result.join()
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/result.py", line 781, in join
    disable_sync_subtasks=disable_sync_subtasks,
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/result.py", line 237, in get
    on_message=on_message,
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 200, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 268, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 55, in drain_events_until
    yield self.wait_for(p, wait, timeout=interval)
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 64, in wait_for
    wait(timeout=timeout)
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/celery/backends/redis.py", line 161, in drain_events
    message = self._pubsub.get_message(timeout=timeout)
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/redis/client.py", line 3617, in get_message
    response = self.parse_response(block=False, timeout=timeout)
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/redis/client.py", line 3505, in parse_response
    response = self._execute(conn, conn.read_response)
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/redis/client.py", line 3479, in _execute
    return command(*args, **kwargs)
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/redis/connection.py", line 739, in read_response
    response = self._parser.read_response()
  File "/usr/local/myqpp/qwer/lib/python3.5/site-packages/redis/connection.py", line 331, in read_response
    raise InvalidResponse("Protocol Error: %r" % raw)
redis.exceptions.InvalidResponse: Protocol Error: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00*3'

@thedrow
Copy link
Member

thedrow commented Oct 6, 2020

Yes that's correct.
I'd rather have a patch with locking instead of the connection per thread model as it is the safer approach.
Unfortunately I don't have time to make it as of now.

If someone can, please do.

@HosseyNJF
Copy link
Author

@thedrow I guess that would have a massive impact on performance (in environments with high request count per second), as @andymccurdy said; Because the lock would block all other threads from accessing Redis until the entire request/response cycle is finished for one active thread.

@mskogorevrmc
Copy link

@thedrow @HosseyNJF I'm sorry, but I didn't really understand.
Is there any solution now? Changing the WSGI settings will help me?

@thedrow
Copy link
Member

thedrow commented Oct 8, 2020

In that case we should test the patch we do have and complete it.
It lacks tests.

@HosseyNJF
Copy link
Author

@mskogorevrmc If you want to fix this bug temporarily, just set the settings like this:
WSGIDaemonProcess [...........] processes=5 threads=1
The difference is in the threads=1 part. There should only be one thread, in order to prevent this problem.

@mskogorevrmc
Copy link

I changed the WSGI settings, but I still get errors.

My WSGI configuration:
WSGIDaemonProcess apiSM80 python-path=/var/www/myapp:/usr/local/app/lib/python3.5/site-packages processes=20 threads=1

I currently use the following versions of packages:

celery                 4.4.6
kombu                  4.6.11
Flask                  1.1.2
redis                  3.5.3

Errors:

Traceback (most recent call last):
  File "/var/www/myapp/app/mod_axmsg/dispatch.py", line 376, in execute
    result_value = func_meta.func(*task.args_list, **task.kwargs_dict)
  File "/var/www/myapp/app/mod_automation/libs/health_digest.py", line 29, in build_client_health_digest
    res.get()
  File "/usr/local/app/lib/python3.5/site-packages/celery/result.py", line 237, in get
    on_message=on_message,
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 200, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 268, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 55, in drain_events_until
    yield self.wait_for(p, wait, timeout=interval)
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 64, in wait_for
    wait(timeout=timeout)
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/redis.py", line 161, in drain_events
    message = self._pubsub.get_message(timeout=timeout)
  File "/usr/local/app/lib/python3.5/site-packages/redis/client.py", line 3617, in get_message
    response = self.parse_response(block=False, timeout=timeout)
  File "/usr/local/app/lib/python3.5/site-packages/redis/client.py", line 3505, in parse_response
    response = self._execute(conn, conn.read_response)
  File "/usr/local/app/lib/python3.5/site-packages/redis/client.py", line 3479, in _execute
    return command(*args, **kwargs)
  File "/usr/local/app/lib/python3.5/site-packages/redis/connection.py", line 739, in read_response
    response = self._parser.read_response()
  File "/usr/local/app/lib/python3.5/site-packages/redis/connection.py", line 331, in read_response
    raise InvalidResponse("Protocol Error: %r" % raw)
redis.exceptions.InvalidResponse: Protocol Error: b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00*3'

When I use RabbitMQ as a broker, I get these errors:

 Received 0x7b while expecting 0xce
Traceback (most recent call last):
  File "/var/www/myapp/mod_axmsg/dispatch.py", line 376, in execute
    result_value = func_meta.func(*task.args_list, **task.kwargs_dict)
  File "/var/www/myapp/mod_rmc/libs/health.py", line 91, in update_all_health
    health.update_all_health()
  File "/var/www/myapp/mod_core/libs/health.py", line 597, in update_all_health
    result.join()
  File "/usr/local/app/lib/python3.5/site-packages/celery/result.py", line 781, in join
    disable_sync_subtasks=disable_sync_subtasks,
  File "/usr/local/app/lib/python3.5/site-packages/celery/result.py", line 237, in get
    on_message=on_message,
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 200, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 268, in _wait_for_pending
    on_interval=on_interval):
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 55, in drain_events_until
    yield self.wait_for(p, wait, timeout=interval)
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 64, in wait_for
    wait(timeout=timeout)
  File "/usr/local/app/lib/python3.5/site-packages/celery/backends/rpc.py", line 63, in drain_events
    return self._connection.drain_events(timeout=timeout)
  File "/usr/local/app/lib/python3.5/site-packages/kombu/connection.py", line 324, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/usr/local/app/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
  File "/usr/local/app/lib/python3.5/site-packages/amqp/connection.py", line 508, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/app/lib/python3.5/site-packages/amqp/connection.py", line 513, in blocking_read
    frame = self.transport.read_frame()
  File "/usr/local/app/lib/python3.5/site-packages/amqp/transport.py", line 301, in read_frame
    'Received {0:#04x} while expecting 0xce'.format(ch))
amqp.exceptions.UnexpectedFrame: Received 0x7b while expecting 0xce

Could someone tell me if there is a fix?

@mskogorevrmc
Copy link

Also, I use the mpm_event module for apache, can these cause problems with threads?

@thedrow
Copy link
Member

thedrow commented Nov 1, 2020

I'm not familiar with that module but a quick Google search shows that it shouldn't.
@matusvalo has a WIP in #6416.

@matusvalo
Copy link
Member

When reading apache docs:

The event Multi-Processing Module (MPM) is designed to allow more requests to be served simultaneously by passing off some processing work to the listeners threads, freeing up the worker threads to serve new requests.

So it seems that requests are served by multiple threads. @mskogorevrmc can you check using prefork module [1]? Please let us know whether it helps.

[1] https://httpd.apache.org/docs/2.4/mod/prefork.html

@mskogorevrmc
Copy link

@matusvalo With the module prefork for apache, I don't get errors for about a week, but it uses a lot of resources. Are there any plans to fix this?

@matusvalo
Copy link
Member

#6416 was fixed and merged to master. @mskogorevrmc could you test your bug against master of Celery to check if it is fixed for you?

@charlesoconor
Copy link

I've had a similar issue with 4.4.2. Updating to the current master has fixed the issue. Was locking around all .get calls so would love to have this released.

@thedrow thedrow added this to the 5.0.3 milestone Dec 9, 2020
@kkito
Copy link

kkito commented Dec 14, 2020

Same problem when using celery with tornado. I know it is not a recommend way.

to make it async

async_result = celery_task.apply_async(args, kwargs)
result = await tornado.ioloop.IOLoop.current().run_in_executor(None, async_result.get)

it works fine if there is no concurrency. but when using benchmark tools, InvalidResponse: Protocol Error happended

ab -c 5 -n 100 localhost:8080/celery_using_url

still happned when upgrade to 5.0.4


current solution is to make an individual process as executor

executor = concurrent.futures.ProcessPoolExecutor(1) # global
#  ... omit
result = await tornado.ioloop.IOLoop.current().run_in_executor(executor, async_result.get)

ProcessPoolExecutor is ok ,but ThreadPoolExecutor still has such error.
I am not familiar with internal mechanism of celery, But i guess some thing wrong when redis read content in multi threads or non block situation.

Some try, hope it helps.

@thedrow
Copy link
Member

thedrow commented Dec 14, 2020

@kkito Can you please provide a minimally reproducible test case?

@kkito
Copy link

kkito commented Dec 15, 2020

@kkito Can you please provide a minimally reproducible test case?

https://github.com/kkito/celery-tornado-intergration

Here is a demo. @thedrow

@thedrow thedrow reopened this Dec 15, 2020
@matusvalo matusvalo self-assigned this Dec 15, 2020
@matusvalo
Copy link
Member

matusvalo commented Dec 18, 2020

I have checked the the reproducer and the code and here are my findings:

  1. I am able to reproduce the case and I confirm the report.
  2. The issue is caused because the producer code is called as part of AsyncIO:
    https://github.com/kkito/celery-tornado-intergration/blob/bf22d9e4ec4e21cc3d716e1d458d3c93a3bca1ce/run.py#L8-L10
  3. I suppose that issue is caused because backend and oid attributes are again shared between multiple tasks - they are running in single process/thread and they are not asyncIO aware so it is natural that they are shared between multiple requests. Moving them to threadlocal storage helps to support multi-threaded executing tasks but it has not supporting parallelism via AsyncIO
  4. I am able to fix the issue simply to store backend and oid attributes to contextvars [1] instead of thread local storage:
# file celery/app/base.py
class Celery:
    # code skipped
    def __init__(self, main=None, loader=None, backend=None,
                 amqp=None, events=None, log=None, control=None,
                 set_as_current=True, tasks=None, broker=None, include=None,
                 changes=None, config_source=None, fixups=None, task_cls=None,
                 autofinalize=True, namespace=None, strict_typing=True,
                 **kwargs):
        self.context_oid = contextvars.ContextVar('context_oid')
        self.context_backend = contextvars.ContextVar('context_backend')
        
    # code skipped
     @property
    def thread_oid(self):
        """Per-thread unique identifier for this app."""
        return self.context_oid.get(oid_from(self, threads=True))
    # code skipped
    @property
    def backend(self):
        """Current backend instance."""
        return self.context_backend.get(self._get_backend())
    # code skipped

The proposed solution is just workaround and needs to be deeply verified if something else is not broken (e.g. multithreaded/multiprocessed calling). @thedrow, does Celery supports AsyncIO? If not this issue should be fixed but in more bigger effort to migrating whole Celery stack to AsyncIO.

[1] https://docs.python.org/3/library/contextvars.html

@thedrow
Copy link
Member

thedrow commented Dec 18, 2020

Celery currently does not support asyncio.

@thedrow thedrow closed this as completed Dec 18, 2020
kkito added a commit to kkito/celery-tornado-intergration that referenced this issue Dec 20, 2020
@kkito
Copy link

kkito commented Dec 20, 2020

@matusvalo

Can i use it like kkito/celery-tornado-intergration@7e6eb39 ?

@matusvalo
Copy link
Member

Theoretically yes. I tested it and the testing script was working with this fix. BUT recent versions of Tornado are built on top of AsyncIO loop (as far as I understand). Celery stack is not adjusted for AsyncIO stack so there can be some other problems like crashes in other corner cases or performance downgrade. You can use it but you need to understand what you are doing :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants