Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic ElasticSearch / ElasticClient 8.x Support #8519

Merged
merged 11 commits into from
Oct 4, 2023
95 changes: 65 additions & 30 deletions celery/backends/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
except ImportError:
elasticsearch = None

try:
import elastic_transport
except ImportError:
elastic_transport = None

__all__ = ('ElasticsearchBackend',)

E_LIB_MISSING = """\
Expand All @@ -31,7 +36,7 @@ class ElasticsearchBackend(KeyValueStoreBackend):
"""

index = 'celery'
doc_type = 'backend'
doc_type = None
scheme = 'http'
host = 'localhost'
port = 9200
Expand Down Expand Up @@ -83,17 +88,17 @@ def __init__(self, url=None, *args, **kwargs):
self._server = None

def exception_safe_to_retry(self, exc):
if isinstance(exc, (elasticsearch.exceptions.TransportError)):
if isinstance(exc, elasticsearch.exceptions.ApiError):
# 401: Unauthorized
# 409: Conflict
# 429: Too Many Requests
# 500: Internal Server Error
# 502: Bad Gateway
# 503: Service Unavailable
# 504: Gateway Timeout
# N/A: Low level exception (i.e. socket exception)
if exc.status_code in {401, 409, 429, 500, 502, 503, 504, 'N/A'}:
if exc.status_code in {401, 409, 500, 502, 504, 'N/A'}:
return True
if isinstance(exc , elasticsearch.exceptions.TransportError):
return True
return False

def get(self, key):
Expand All @@ -108,11 +113,17 @@ def get(self, key):
pass

def _get(self, key):
return self.server.get(
index=self.index,
doc_type=self.doc_type,
id=key,
)
if self.doc_type:
return self.server.get(
index=self.index,
id=key,
doc_type=self.doc_type,
)
else:
return self.server.get(
index=self.index,
id=key,
)

def _set_with_state(self, key, value, state):
body = {
Expand All @@ -135,14 +146,23 @@ def set(self, key, value):

def _index(self, id, body, **kwargs):
body = {bytes_to_str(k): v for k, v in body.items()}
return self.server.index(
id=bytes_to_str(id),
index=self.index,
doc_type=self.doc_type,
body=body,
params={'op_type': 'create'},
**kwargs
)
if self.doc_type:
return self.server.index(
id=bytes_to_str(id),
index=self.index,
doc_type=self.doc_type,
body=body,
params={'op_type': 'create'},
**kwargs
)
else:
return self.server.index(
id=bytes_to_str(id),
index=self.index,
body=body,
params={'op_type': 'create'},
**kwargs
)

def _update(self, id, body, state, **kwargs):
"""Update state in a conflict free manner.
Expand Down Expand Up @@ -182,19 +202,32 @@ def _update(self, id, body, state, **kwargs):
prim_term = res_get.get('_primary_term', 1)

# try to update document with current seq_no and primary_term
res = self.server.update(
id=bytes_to_str(id),
index=self.index,
doc_type=self.doc_type,
body={'doc': body},
params={'if_primary_term': prim_term, 'if_seq_no': seq_no},
**kwargs
)
if self.doc_type:
res = self.server.update(
id=bytes_to_str(id),
index=self.index,
doc_type=self.doc_type,
body={'doc': body},
params={'if_primary_term': prim_term, 'if_seq_no': seq_no},
**kwargs
)
else:
res = self.server.update(
id=bytes_to_str(id),
index=self.index,
body={'doc': body},
params={'if_primary_term': prim_term, 'if_seq_no': seq_no},
**kwargs
)
# result is elastic search update query result
# noop = query did not update any document
# updated = at least one document got updated
if res['result'] == 'noop':
raise elasticsearch.exceptions.ConflictError(409, 'conflicting update occurred concurrently', {})
raise elasticsearch.exceptions.ConflictError(
"conflicting update occurred concurrently",
elastic_transport.ApiResponseMeta(409, "HTTP/1.1",
elastic_transport.HttpHeaders(), 0, elastic_transport.NodeConfig(
self.scheme, self.host, self.port)), None)
return res

def encode(self, data):
Expand Down Expand Up @@ -225,19 +258,21 @@ def mget(self, keys):
return [self.get(key) for key in keys]

def delete(self, key):
self.server.delete(index=self.index, doc_type=self.doc_type, id=key)
if self.doc_type:
self.server.delete(index=self.index, id=key, doc_type=self.doc_type)
else:
self.server.delete(index=self.index, id=key)

def _get_server(self):
"""Connect to the Elasticsearch server."""
http_auth = None
if self.username and self.password:
http_auth = (self.username, self.password)
return elasticsearch.Elasticsearch(
f'{self.host}:{self.port}',
f'{self.scheme}://{self.host}:{self.port}',
retry_on_timeout=self.es_retry_on_timeout,
max_retries=self.es_max_retries,
timeout=self.es_timeout,
scheme=self.scheme,
http_auth=http_auth,
)

Expand Down
3 changes: 2 additions & 1 deletion requirements/extras/elasticsearch.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
elasticsearch<8.0
elasticsearch<=8.10.0
elastic-transport==8.4.1