Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Authentication support for Elasticsearch Result Backend #7538

Open
4 tasks done
sbhatm1213 opened this issue May 27, 2022 · 2 comments
Open
4 tasks done

Authentication support for Elasticsearch Result Backend #7538

sbhatm1213 opened this issue May 27, 2022 · 2 comments

Comments

@sbhatm1213
Copy link

Checklist

  • I have checked the issues list
    for similar or identical feature requests.
  • I have checked the pull requests list
    for existing proposed implementations of this feature.
  • I have checked the commit log
    to find out if the same feature was already implemented in the
    master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

  • None

Brief Summary

Enable Authentication support for Elasticsearch Result Backend

Design

Architectural Considerations

None

Proposed Behavior

Please provide a way to configure Elasticsearch (version8 that has Username - Password auth ) as Celery-Result-Backend

Proposed UI/UX

It could be something similar to the current Cassandra Result Backend config like so:
cassandra_auth_kwargs = { username: 'cassandra', password: 'cassandra' }
Or similar to the current CouchDB Result Backend config like :
result_backend = 'couchdb://username:password@host:port/container'

Diagrams

N/A

Alternatives

None

@open-collective-bot
Copy link

Hey @sbhatm1213 👋,
Thank you for opening an issue. We will get back to you as soon as we can.
Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors.
If you require immediate assistance please consider sponsoring us.

@sbhatm1213
Copy link
Author

I was able to fix the username-password auth with some custom code.

Later, this was the error : ( elasticsearch.exceptions.RequestError: RequestError(400, 'no handler found for uri [/celery-results-index/backend/celery-task-meta-2721a4a5-f289-4903-96e8-af259b8c3c0a] and method [GET]', 'no handler found for uri [/celery-results-index/backend/celery-task-meta-2721a4a5-f289-4903-96e8-af259b8c3c0a] and method [GET]')

I realized it was because doc_type is being used in all methods in celery. But Elasticsearch 8 does not have doc_types .

I used monkey-patching to redefine methods (of class ElasticsearchBackend) which had doc_type in them


from kombu.utils.encoding import bytes_to_str
import celery.backends.elasticsearch as celeryelastic
class ElasticsearchBackendsubclass(celeryelastic.ElasticsearchBackend):

    def _get(self, key):
        return self.server.get(
            index=self.index,
            id=key,
        )

    def _index(self, id, body, **kwargs):
        body = {bytes_to_str(k): v for k, v in body.items()}
        return self.server.index(
            id=bytes_to_str(id),
            index=self.index,
            body=body,
            params={'op_type': 'create'},
            **kwargs
        )

    def _update(self, id, body, state, **kwargs):
        body = {bytes_to_str(k): v for k, v in body.items()}

        try:
            res_get = self._get(key=id)
            if not res_get.get('found'):
                return self._index(id, body, **kwargs)
            # document disappeared between index and get calls.
        except elasticsearch.exceptions.NotFoundError:
            return self._index(id, body, **kwargs)
        try:
            meta_present_on_backend = self.decode_result(res_get['_source']['result'])
        except (TypeError, KeyError):
            pass
        else:
            if meta_present_on_backend['status'] == states.SUCCESS:
                # if stored state is already in success, do nothing
                return {'result': 'noop'}
            elif meta_present_on_backend['status'] in states.READY_STATES and state in states.UNREADY_STATES:
                # if stored state is in ready state and current not, do nothing
                return {'result': 'noop'}

        seq_no = res_get.get('_seq_no', 1)
        prim_term = res_get.get('_primary_term', 1)

        res = self.server.update(
            id=bytes_to_str(id),
            index=self.index,
            # doc_type=self.doc_type,
            body={'doc': body},
            params={'if_primary_term': prim_term, 'if_seq_no': seq_no},
            **kwargs
        )

        if res['result'] == 'noop':
            raise elasticsearch.exceptions.ConflictError(409, 'conflicting update occurred concurrently', {})
        return res
    def delete(self, key):
        self.server.delete(index=self.index, id=key)

celeryelastic.ElasticsearchBackend = ElasticsearchBackendsubclass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant