-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open search integrations #34693
Open search integrations #34693
Changes from all commits
9a18ce6
6e45261
5df0196
1cbca1e
851c3ab
4d3edab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,130 @@ | ||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||
# Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||||||||||||||||||||||||||
# or more contributor license agreements. See the NOTICE file | ||||||||||||||||||||||||||||||||||||||
# distributed with this work for additional information | ||||||||||||||||||||||||||||||||||||||
# regarding copyright ownership. The ASF licenses this file | ||||||||||||||||||||||||||||||||||||||
# to you under the Apache License, Version 2.0 (the | ||||||||||||||||||||||||||||||||||||||
# "License"); you may not use this file except in compliance | ||||||||||||||||||||||||||||||||||||||
# with the License. You may obtain a copy of the License at | ||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||||||||||
# | ||||||||||||||||||||||||||||||||||||||
# Unless required by applicable law or agreed to in writing, | ||||||||||||||||||||||||||||||||||||||
# software distributed under the License is distributed on an | ||||||||||||||||||||||||||||||||||||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||||||||||||||||||||||||||||
# KIND, either express or implied. See the License for the | ||||||||||||||||||||||||||||||||||||||
# specific language governing permissions and limitations | ||||||||||||||||||||||||||||||||||||||
# under the License. | ||||||||||||||||||||||||||||||||||||||
from __future__ import annotations | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
import json | ||||||||||||||||||||||||||||||||||||||
from typing import Any | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
from airflow.exceptions import AirflowException | ||||||||||||||||||||||||||||||||||||||
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
class OpenSearchHook(AwsBaseHook): | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
This Hook provides a thin wrapper around the OpenSearch client. | ||||||||||||||||||||||||||||||||||||||
Comment on lines
+29
to
+31
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense then to create a base OpenSearch hook class in an OpenSearch providers package and use that as the base class for an AWS based Hook for OpenSearch? Most of the logic in this could be moved to a base hook for Open Search and then have the AWS Specific one just override the auth method used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it how it works in general. There is some exceptions but most (if not all) of them created years ago. there is also exists hooks in Amazon Provider which not depends on
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would there be a good reason to have any operators that do things like create domains in a DAG? With the above that seems like where an open search hook that is a sub class of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you think it might be useful then I don't see any obstacles for add wrappers around this boto3 clients. For hook it is pretty simple, just create simple thin wrapper airflow/airflow/providers/amazon/aws/hooks/sqs.py Lines 23 to 40 in bd2f156
And optionally add useful methods |
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
:param: open_search_conn_id: AWS Connection to use with Open Search | ||||||||||||||||||||||||||||||||||||||
:param: log_query: Whether to log the query used for Open Search | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
conn_name_attr = "opensearch_conn_id" | ||||||||||||||||||||||||||||||||||||||
default_conn_name = "opensearch_default" | ||||||||||||||||||||||||||||||||||||||
conn_type = "opensearch" | ||||||||||||||||||||||||||||||||||||||
hook_name = "AWS Open Search Hook" | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
def __init__(self, *args: Any, open_search_conn_id: str, log_query: bool, **kwargs: Any): | ||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I would recommend to accept in hook constructor keyword only arguments, positional arguments prevent to do some changes in the future without breaking changes. |
||||||||||||||||||||||||||||||||||||||
super().__init__(*args, **kwargs) | ||||||||||||||||||||||||||||||||||||||
self.conn_id = open_search_conn_id | ||||||||||||||||||||||||||||||||||||||
self.log_query = log_query | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
conn = self.get_connection(self.conn_id) | ||||||||||||||||||||||||||||||||||||||
self.use_ssl = conn.extra_dejson.get("use_ssl", False) | ||||||||||||||||||||||||||||||||||||||
self.verify_certs = conn.extra_dejson.get("verify_certs", False) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
self.__SERVICE = "es" | ||||||||||||||||||||||||||||||||||||||
self._credentials = self.get_credentials(self.region_name) | ||||||||||||||||||||||||||||||||||||||
self._auth = AWSV4SignerAuth(self._credentials, self.region_name, self.__SERVICE) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
self.client = OpenSearch( | ||||||||||||||||||||||||||||||||||||||
hosts=[{"host": conn.host, "port": conn.port}], | ||||||||||||||||||||||||||||||||||||||
http_auth=self._auth, | ||||||||||||||||||||||||||||||||||||||
use_ssl=self.use_ssl, | ||||||||||||||||||||||||||||||||||||||
verify_certs=self.verify_certs, | ||||||||||||||||||||||||||||||||||||||
connection_class=RequestsHttpConnection, | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
cjames23 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
def get_client(self) -> OpenSearch: | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
This function is intended for Operators that will take in arguments and use the high level | ||||||||||||||||||||||||||||||||||||||
OpenSearch client which allows using Python objects to perform searches. | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
return self.client | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
def search(self, query: dict, index_name: str, **kwargs: Any) -> Any: | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
Runs a search query against the connected OpenSearch cluster. | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
:param: query: The query for the search against OpenSearch. | ||||||||||||||||||||||||||||||||||||||
:param: index_name: The name of the index to search against | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
if self.log_query: | ||||||||||||||||||||||||||||||||||||||
self.log.info("Searching %s with Query: %s", index_name, query) | ||||||||||||||||||||||||||||||||||||||
return self.client.search(body=query, index=index_name, **kwargs) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
def index(self, document: dict, index_name: str, doc_id: int, **kwargs: Any) -> Any: | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
Index a document on open search. | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
:param: document: A dictionary representation of the document | ||||||||||||||||||||||||||||||||||||||
:param: index_name: the name of the index that this document will be associated with | ||||||||||||||||||||||||||||||||||||||
:param: doc_id: the numerical identifier that will be used to identify the document on the index. | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
return self.client.index(index=index_name, id=doc_id, body=document, **kwargs) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
def delete(self, index_name: str, query: dict | None = None, doc_id: int | None = None): | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
Delete from an index by either a query or by the document id. | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
:param: index_name: the name of the index to delete from | ||||||||||||||||||||||||||||||||||||||
:param: query: If deleting by query a dict representation of the query to run to | ||||||||||||||||||||||||||||||||||||||
identify documents to delete. | ||||||||||||||||||||||||||||||||||||||
:param: doc_id: The identifier of the document to delete. | ||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||
if query is not None: | ||||||||||||||||||||||||||||||||||||||
if self.log_query: | ||||||||||||||||||||||||||||||||||||||
self.log.info("Deleting from %s using Query: %s", index_name, query) | ||||||||||||||||||||||||||||||||||||||
return self.client.delete_by_query(index=index_name, body=query) | ||||||||||||||||||||||||||||||||||||||
elif doc_id is not None: | ||||||||||||||||||||||||||||||||||||||
return self.client.delete(index=index_name, id=doc_id) | ||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||
AirflowException("To delete a document you must include one of either a query or a document id. ") | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||||||||||||||
def get_ui_field_behaviour() -> dict[str, Any]: | ||||||||||||||||||||||||||||||||||||||
"""Returns custom UI field behaviour for Amazon Open Search Connection.""" | ||||||||||||||||||||||||||||||||||||||
return { | ||||||||||||||||||||||||||||||||||||||
"hidden_fields": ["schema"], | ||||||||||||||||||||||||||||||||||||||
"relabeling": { | ||||||||||||||||||||||||||||||||||||||
"host": "OpenSearch Cluster Endpoint", | ||||||||||||||||||||||||||||||||||||||
"login": "AWS Access Key ID", | ||||||||||||||||||||||||||||||||||||||
"password": "AWS Secret Access Key", | ||||||||||||||||||||||||||||||||||||||
"extra": "Open Search Configuration", | ||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||
"placeholders": { | ||||||||||||||||||||||||||||||||||||||
"extra": json.dumps( | ||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||
"use_ssl": True, | ||||||||||||||||||||||||||||||||||||||
"verify_certs": True, | ||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||
indent=2, | ||||||||||||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
# | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quite related to this comment thread but are these operators specific to the AWS managed service? Should not these operators be in the opensearch provider? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR is on hold right now until the other PR is ready. Once that is ready I will be updating this PR with pieces in place that handle the separate provider and really what my thought process here is going to be is that the particular Operators for the managed service just need to be sub classes of the Open Search provider ones and replace the hook with the one required for the managed service piece. I am open to other ideas here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like very good to me 👍 |
||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations | ||
|
||
from functools import cached_property | ||
from typing import TYPE_CHECKING, Any, Sequence | ||
|
||
from airflow.exceptions import AirflowException | ||
from airflow.models import BaseOperator | ||
from airflow.providers.amazon.aws.hooks.opensearch import OpenSearchHook | ||
|
||
if TYPE_CHECKING: | ||
from opensearch_dsl.document import Document | ||
from opensearch_dsl.search import Search | ||
|
||
from airflow.utils.context import Context | ||
|
||
|
||
class OpenSearchQueryOperator(BaseOperator): | ||
""" | ||
Runs a query search against a given index on an AWS OpenSearch cluster and returns results. | ||
|
||
.. seealso:: | ||
For more information on how to use this operator, take a look at the guide: | ||
:ref:`howto/operator:OpenSearchSearchOperator` | ||
|
||
:param: query: A Dictionary Open Search DSL query. | ||
:param: search_object: A Search object from opensearch-dsl. | ||
:param: index_name: The name of the index to search for documents. | ||
:param: opensearch_conn_id: opensearch connection to use | ||
:param: log_query: Whether to log the query used. Defaults to True and logs query used. | ||
""" | ||
|
||
template_fields: Sequence[str] = ["query"] | ||
|
||
def __init__( | ||
self, | ||
*, | ||
query: dict | None = None, | ||
search_object: Search | None = None, | ||
index_name: str | None = None, | ||
opensearch_conn_id: str = "opensearch_default", | ||
log_query: bool = True, | ||
**kwargs, | ||
) -> None: | ||
super().__init__(**kwargs) | ||
self.query = query | ||
self.index_name = index_name | ||
self.opensearch_conn_id = opensearch_conn_id | ||
self.log_query = log_query | ||
self.search_object = search_object | ||
|
||
@cached_property | ||
def hook(self) -> OpenSearchHook: | ||
"""Gets an instance of an OpenSearchHook.""" | ||
return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=self.log_query) | ||
|
||
def execute(self, context: Context) -> Any: | ||
"""Executes a search against a given index or a Search object on an AWS OpenSearch Cluster.""" | ||
result = None | ||
|
||
if self.query is not None: | ||
if not self.query.get("query"): | ||
raise AirflowException("Query input is missing required field Query in dictionary") | ||
if self.index_name is None: | ||
raise AirflowException("Index name is required when using the query input.") | ||
try: | ||
result = self.hook.search(index_name=self.index_name, query=self.query) | ||
except Exception as e: | ||
raise AirflowException(e) | ||
elif self.search_object is not None: | ||
try: | ||
result = self.search_object.using(self.hook.get_client()).execute() | ||
except Exception as e: | ||
raise AirflowException(e) | ||
else: | ||
raise AirflowException( | ||
"""Input missing required input of query or search_object. | ||
Either query or search_object is required.""" | ||
) | ||
return result | ||
|
||
|
||
class OpenSearchCreateIndexOperator(BaseOperator): | ||
""" | ||
Creates a new index on an AWS Open Search cluster with a given index name. | ||
|
||
.. seealso:: | ||
For more information on how to use this operator, take a look at the guide: | ||
:ref:`howto/operator:OpenSearchCreateIndexOperator` | ||
|
||
:param: index_name: The name of the index to be created. | ||
:param: index_body: A dictionary that defines index settings | ||
:param: opensearch_conn_id: opensearch connection to use | ||
""" | ||
|
||
def __init__( | ||
self, *, index_name: str, index_body: dict[str, Any], opensearch_conn_id: str = "opensearch_default", **kwargs | ||
) -> None: | ||
super().__init__(**kwargs) | ||
self.index_name = index_name | ||
self.index_body = index_body | ||
self.opensearch_conn_id = opensearch_conn_id | ||
|
||
@cached_property | ||
def hook(self) -> OpenSearchHook: | ||
"""Gets an instance of an OpenSearchHook.""" | ||
return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=False) | ||
|
||
def execute(self, context: Context) -> Any: | ||
"""Creates an index on an AWS Open Search cluster.""" | ||
try: | ||
self.hook.get_client().indices.create(index=self.index_name, body=self.index_body) | ||
except Exception as e: | ||
raise AirflowException(e) | ||
|
||
|
||
class OpenSearchAddDocumentOperator(BaseOperator): | ||
""" | ||
Runs a query search against a given index on an AWS OpenSearch cluster and returns results. | ||
|
||
.. seealso:: | ||
For more information on how to use this operator, take a look at the guide: | ||
:ref:`howto/operator:OpenSearchAddDocumentOperator` | ||
|
||
:param: index_name: The name of the index to put the document. | ||
:param: document: A dictionary representation of the document. | ||
:param: document_id: The id for the document in the index. | ||
:param: doc_class: A Document subclassed object using opensearch-dsl | ||
:param: opensearch_conn_id: opensearch connection to use | ||
""" | ||
|
||
def __init__( | ||
self, | ||
*, | ||
index_name: str | None = None, | ||
document: dict[str, Any] | None = None, | ||
doc_id: int | None = None, | ||
doc_class: Document | None = None, | ||
opensearch_conn_id: str = "opensearch_default", | ||
**kwargs, | ||
) -> None: | ||
super().__init__(**kwargs) | ||
self.index_name = index_name | ||
self.document = document | ||
self.doc_id = doc_id | ||
self.doc_class = doc_class | ||
self.opensearch_conn_id = opensearch_conn_id | ||
|
||
@cached_property | ||
def hook(self) -> OpenSearchHook: | ||
"""Gets an instance of an OpenSearchHook.""" | ||
return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=False) | ||
|
||
def execute(self, context: Context) -> Any: | ||
"""Saves a document to a given index on an AWS OpenSearch cluster.""" | ||
if self.doc_class is not None: | ||
try: | ||
result = self.doc_class.save(using=self.hook.get_client()) | ||
except Exception as e: | ||
raise AirflowException(e) | ||
elif self.index_name is not None and self.document is not None and self.doc_id is not None: | ||
try: | ||
result = self.hook.index( | ||
index_name=self.index_name, document=self.document, doc_id=self.doc_id | ||
) | ||
except Exception as e: | ||
raise AirflowException(e) | ||
else: | ||
raise AirflowException( | ||
"Index name, document dictionary and doc_id or a Document subclassed object is required." | ||
) | ||
|
||
return result |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we should create support of OpenSearch (as product) in a separate provider. Otherwise this feature would be exclusively available only for AWS. And after that we could add support of AWS OpenSearch (as managed service) into Amazon provider. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Working now on creating a providers package for OpenSearch and moving the main Hook logic there with the hook for AWS Open Search taking that as a base class and only doing overrides of the AWS Open Search specific requirements such as the auth and the extras requiring a region_name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets have seperate PR for open search integration and depend PR for AWS related code on top of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay let me revert the changes that added the new open search provider. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changes are reverted. @eladkal @Taragolis What would be your preference on the approach for the new provider, I can get everything onto a new branch and cut a different PR and then mail the dev list for lazy consensus given open search is an apache licensed product, or do we need a full discussion and vote? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think a discussion is needed it seems as it fits the criteria for lazy consensus vote as popular vendor neutral open source. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Draft PR #34705 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this hook needs to be modified after #34705 is merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on that now, everything in the amazon provider will have a prefix of Aws. And this hook will be a child of the open search hook and potentially the AwsBaseHook as well to avoid rewriting the logic for getting a boto3 client.