Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open search integrations #34693

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions airflow/providers/amazon/aws/hooks/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import json
from typing import Any

from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection

from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class OpenSearchHook(AwsBaseHook):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this hook needs to be modified after #34705 is merged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working on that now, everything in the amazon provider will have a prefix of Aws. And this hook will be a child of the open search hook and potentially the AwsBaseHook as well to avoid rewriting the logic for getting a boto3 client.

"""
This Hook provides a thin wrapper around the OpenSearch client.
Comment on lines +29 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AwsBaseHook/AwsGenericHook uses only for boto3 / botocore clients. Internals of this hook intend to use with this libraries and may work incorrectly if someone call public methods of AwsBaseHook/AwsGenericHook

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense then to create a base OpenSearch hook class in an OpenSearch providers package and use that as the base class for an AWS based Hook for OpenSearch? Most of the logic in this could be moved to a base hook for Open Search and then have the AWS Specific one just override the auth method used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it how it works in general. There is some exceptions but most (if not all) of them created years ago. there is also exists hooks in Amazon Provider which not depends on AwsBaseHook, example:

AwsBaseHook it wrapper around one of the boto3 (botocore) clients, e.g.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a good reason to have any operators that do things like create domains in a DAG? With the above that seems like where an open search hook that is a sub class of AwsBaseHook would come in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think it might be useful then I don't see any obstacles for add wrappers around this boto3 clients. For hook it is pretty simple, just create simple thin wrapper

class SqsHook(AwsBaseHook):
"""
Interact with Amazon Simple Queue Service.
Provide thin wrapper around :external+boto3:py:class:`boto3.client("sqs") <SQS.Client>`.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""
def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "sqs"
super().__init__(*args, **kwargs)

And optionally add useful methods


:param: open_search_conn_id: AWS Connection to use with Open Search
:param: log_query: Whether to log the query used for Open Search
"""
conn_name_attr = "opensearch_conn_id"
default_conn_name = "opensearch_default"
conn_type = "opensearch"
hook_name = "AWS Open Search Hook"

def __init__(self, *args: Any, open_search_conn_id: str, log_query: bool, **kwargs: Any):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, *args: Any, open_search_conn_id: str, log_query: bool, **kwargs: Any):
def __init__(self, *, open_search_conn_id: str, log_query: bool, **kwargs: Any):

I would recommend to accept in hook constructor keyword only arguments, positional arguments prevent to do some changes in the future without breaking changes.

super().__init__(*args, **kwargs)
self.conn_id = open_search_conn_id
self.log_query = log_query

conn = self.get_connection(self.conn_id)
self.use_ssl = conn.extra_dejson.get("use_ssl", False)
self.verify_certs = conn.extra_dejson.get("verify_certs", False)

self.__SERVICE = "es"
self._credentials = self.get_credentials(self.region_name)
self._auth = AWSV4SignerAuth(self._credentials, self.region_name, self.__SERVICE)

self.client = OpenSearch(
hosts=[{"host": conn.host, "port": conn.port}],
http_auth=self._auth,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
)
cjames23 marked this conversation as resolved.
Show resolved Hide resolved

def get_client(self) -> OpenSearch:
"""

This function is intended for Operators that will take in arguments and use the high level
OpenSearch client which allows using Python objects to perform searches.

"""
return self.client

Check failure on line 69 in airflow/providers/amazon/aws/hooks/opensearch.py

View workflow job for this annotation

GitHub Actions / Static checks

Ruff (D205)

airflow/providers/amazon/aws/hooks/opensearch.py:64:9: D205 1 blank line required between summary line and description

def search(self, query: dict, index_name: str, **kwargs: Any) -> Any:
"""
Runs a search query against the connected OpenSearch cluster.

:param: query: The query for the search against OpenSearch.
:param: index_name: The name of the index to search against
"""
if self.log_query:
self.log.info("Searching %s with Query: %s", index_name, query)
return self.client.search(body=query, index=index_name, **kwargs)

def index(self, document: dict, index_name: str, doc_id: int, **kwargs: Any) -> Any:
"""
Index a document on open search.

:param: document: A dictionary representation of the document
:param: index_name: the name of the index that this document will be associated with
:param: doc_id: the numerical identifier that will be used to identify the document on the index.
"""
return self.client.index(index=index_name, id=doc_id, body=document, **kwargs)

def delete(self, index_name: str, query: dict | None = None, doc_id: int | None = None):
"""
Delete from an index by either a query or by the document id.

:param: index_name: the name of the index to delete from
:param: query: If deleting by query a dict representation of the query to run to
identify documents to delete.
:param: doc_id: The identifier of the document to delete.
"""
if query is not None:
if self.log_query:
self.log.info("Deleting from %s using Query: %s", index_name, query)
return self.client.delete_by_query(index=index_name, body=query)
elif doc_id is not None:
return self.client.delete(index=index_name, id=doc_id)
else:
AirflowException("To delete a document you must include one of either a query or a document id. ")

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom UI field behaviour for Amazon Open Search Connection."""
return {
"hidden_fields": ["schema"],
"relabeling": {
"host": "OpenSearch Cluster Endpoint",
"login": "AWS Access Key ID",
"password": "AWS Secret Access Key",
"extra": "Open Search Configuration",
},
"placeholders": {
"extra": json.dumps(
{
"use_ssl": True,
"verify_certs": True,
},
indent=2,
),
},
}
189 changes: 189 additions & 0 deletions airflow/providers/amazon/aws/operators/opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite related to this comment thread but are these operators specific to the AWS managed service? Should not these operators be in the opensearch provider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is on hold right now until the other PR is ready. Once that is ready I will be updating this PR with pieces in place that handle the separate provider and really what my thought process here is going to be is that the particular Operators for the managed service just need to be sub classes of the Open Search provider ones and replace the hook with the one required for the managed service piece. I am open to other ideas here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like very good to me 👍

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.opensearch import OpenSearchHook

if TYPE_CHECKING:
from opensearch_dsl.document import Document
from opensearch_dsl.search import Search

from airflow.utils.context import Context


class OpenSearchQueryOperator(BaseOperator):
"""
Runs a query search against a given index on an AWS OpenSearch cluster and returns results.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:OpenSearchSearchOperator`

:param: query: A Dictionary Open Search DSL query.
:param: search_object: A Search object from opensearch-dsl.
:param: index_name: The name of the index to search for documents.
:param: opensearch_conn_id: opensearch connection to use
:param: log_query: Whether to log the query used. Defaults to True and logs query used.
"""

template_fields: Sequence[str] = ["query"]

def __init__(
self,
*,
query: dict | None = None,
search_object: Search | None = None,
index_name: str | None = None,
opensearch_conn_id: str = "opensearch_default",
log_query: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.query = query
self.index_name = index_name
self.opensearch_conn_id = opensearch_conn_id
self.log_query = log_query
self.search_object = search_object

@cached_property
def hook(self) -> OpenSearchHook:
"""Gets an instance of an OpenSearchHook."""
return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=self.log_query)

def execute(self, context: Context) -> Any:
"""Executes a search against a given index or a Search object on an AWS OpenSearch Cluster."""
result = None

if self.query is not None:
if not self.query.get("query"):
raise AirflowException("Query input is missing required field Query in dictionary")
if self.index_name is None:
raise AirflowException("Index name is required when using the query input.")
try:
result = self.hook.search(index_name=self.index_name, query=self.query)
except Exception as e:
raise AirflowException(e)
elif self.search_object is not None:
try:
result = self.search_object.using(self.hook.get_client()).execute()
except Exception as e:
raise AirflowException(e)
else:
raise AirflowException(
"""Input missing required input of query or search_object.
Either query or search_object is required."""
)
return result


class OpenSearchCreateIndexOperator(BaseOperator):
"""
Creates a new index on an AWS Open Search cluster with a given index name.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:OpenSearchCreateIndexOperator`

:param: index_name: The name of the index to be created.
:param: index_body: A dictionary that defines index settings
:param: opensearch_conn_id: opensearch connection to use
"""

def __init__(
self, *, index_name: str, index_body: dict[str, Any], opensearch_conn_id: str = "opensearch_default", **kwargs
) -> None:
super().__init__(**kwargs)
self.index_name = index_name
self.index_body = index_body
self.opensearch_conn_id = opensearch_conn_id

@cached_property
def hook(self) -> OpenSearchHook:
"""Gets an instance of an OpenSearchHook."""
return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=False)

def execute(self, context: Context) -> Any:
"""Creates an index on an AWS Open Search cluster."""
try:
self.hook.get_client().indices.create(index=self.index_name, body=self.index_body)
except Exception as e:
raise AirflowException(e)


class OpenSearchAddDocumentOperator(BaseOperator):
"""
Runs a query search against a given index on an AWS OpenSearch cluster and returns results.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:OpenSearchAddDocumentOperator`

:param: index_name: The name of the index to put the document.
:param: document: A dictionary representation of the document.
:param: document_id: The id for the document in the index.
:param: doc_class: A Document subclassed object using opensearch-dsl
:param: opensearch_conn_id: opensearch connection to use
"""

def __init__(
self,
*,
index_name: str | None = None,
document: dict[str, Any] | None = None,
doc_id: int | None = None,
doc_class: Document | None = None,
opensearch_conn_id: str = "opensearch_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.index_name = index_name
self.document = document
self.doc_id = doc_id
self.doc_class = doc_class
self.opensearch_conn_id = opensearch_conn_id

@cached_property
def hook(self) -> OpenSearchHook:
"""Gets an instance of an OpenSearchHook."""
return OpenSearchHook(open_search_conn_id=self.opensearch_conn_id, log_query=False)

def execute(self, context: Context) -> Any:
"""Saves a document to a given index on an AWS OpenSearch cluster."""
if self.doc_class is not None:
try:
result = self.doc_class.save(using=self.hook.get_client())
except Exception as e:
raise AirflowException(e)
elif self.index_name is not None and self.document is not None and self.doc_id is not None:
try:
result = self.hook.index(
index_name=self.index_name, document=self.document, doc_id=self.doc_id
)
except Exception as e:
raise AirflowException(e)
else:
raise AirflowException(
"Index name, document dictionary and doc_id or a Document subclassed object is required."
)

return result
29 changes: 25 additions & 4 deletions airflow/providers/amazon/provider.yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should create support of OpenSearch (as product) in a separate provider. Otherwise this feature would be exclusively available only for AWS.

And after that we could add support of AWS OpenSearch (as managed service) into Amazon provider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working now on creating a providers package for OpenSearch and moving the main Hook logic there with the hook for AWS Open Search taking that as a base class and only doing overrides of the AWS Open Search specific requirements such as the auth and the extras requiring a region_name.

Copy link
Contributor

@eladkal eladkal Sep 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets have seperate PR for open search integration and depend PR for AWS related code on top of it.
It also requires to follow protocol of adding new provider
https://github.com/apache/airflow/blob/main/PROVIDERS.rst#accepting-new-community-providers in terms of mailing list thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay let me revert the changes that added the new open search provider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes are reverted. @eladkal @Taragolis What would be your preference on the approach for the new provider, I can get everything onto a new branch and cut a different PR and then mail the dev list for lazy consensus given open search is an apache licensed product, or do we need a full discussion and vote?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a discussion is needed it seems as it fits the criteria for lazy consensus vote as popular vendor neutral open source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draft PR #34705

Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
- apache-airflow-providers-http
# We should update minimum version of boto3 and here regularly to avoid `pip` backtracking with the number
# of candidates to consider. We should also make sure that all the below related packages have also the
# same minimum version specified. Boto3 1.28.0 has been released on July 6 2023. We should also make sure we
# set it to the version that `aiobotocore` supports (see `aiobotocore` optional dependency at the end
# of this file). Currently we set aiobotocore as minimum 2.5.3 - as this is was the first version
# same minimum version specified. Boto3 1.28.0 has been released on July 6, 2023. We should also make
# sure we set it to the version that `aiobotocore` supports (see `aiobotocore` optional dependency at
# the end of this file). Currently, we set aiobotocore as minimum 2.5.3 - as this it was the first version
# that supported boto3 1.28. NOTE!!! BOTOCORE VERSIONS ARE SHIFTED BY 3 MINOR VERSIONS
# NOTE!!! Make sure to update _MIN_BOTO3_VERSION in setup.py when you update it here
- boto3>=1.28.0
Expand All @@ -89,7 +89,14 @@
- jsonpath_ng>=1.5.3
- redshift_connector>=2.0.888
- sqlalchemy_redshift>=0.8.6
- mypy-boto3-rds>=1.24.0
- mypy-boto3-redshift-data>=1.24.0
- mypy-boto3-appflow>=1.24.0
- asgiref
- mypy-boto3-s3>=1.24.0
- opensearch-py>=2.2.0
- opensearch-dsl>=2.1.0


integrations:
- integration-name: Amazon Athena
Expand Down Expand Up @@ -292,6 +299,12 @@
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/appflow.rst
tags: [aws]
- integration-name: AWS Open Search
external-doc-url: https://aws.amazon.com/opensearch-service/
logo: /integration-logos/aws/Amazon-OpenSearch-light.png
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/opensearch.rst
tags: [aws]

operators:
- integration-name: Amazon Athena
Expand Down Expand Up @@ -365,6 +378,9 @@
- integration-name: Amazon Appflow
python-modules:
- airflow.providers.amazon.aws.operators.appflow
- integration-name: AWS Open Search
python-modules:
- airflow.providers.amazon.aws.operators.opensearch

sensors:
- integration-name: Amazon Athena
Expand Down Expand Up @@ -538,6 +554,9 @@
- integration-name: Amazon Appflow
python-modules:
- airflow.providers.amazon.aws.hooks.appflow
- integration-name: Amazon Open Search
python-modules:
- airflow.providers.amazon.aws.hooks.opensearch

triggers:
- integration-name: Amazon Web Services
Expand All @@ -554,7 +573,7 @@
- airflow.providers.amazon.aws.triggers.ec2
- integration-name: AWS Lambda
cjames23 marked this conversation as resolved.
Show resolved Hide resolved
python-modules:
- airflow.providers.amazon.aws.triggers.lambda_function
- airflow.providers.amazon.aws.triggers.lambda_function

Check failure on line 576 in airflow/providers/amazon/provider.yaml

View workflow job for this annotation

GitHub Actions / Static checks

576:9 [indentation] wrong indentation: expected 6 but found 8
- integration-name: Amazon Redshift
python-modules:
- airflow.providers.amazon.aws.triggers.redshift_cluster
Expand Down Expand Up @@ -685,6 +704,8 @@
connection-type: emr
- hook-class-name: airflow.providers.amazon.aws.hooks.redshift_sql.RedshiftSQLHook
connection-type: redshift
- hook-class-name: airflow.providers.amazon.aws.hooks.opensearch.OpenSearchHook
connection-type: opensearch

notifications:
- airflow.providers.amazon.aws.notifications.chime.ChimeNotifier
Expand Down
Loading
Loading