-
Notifications
You must be signed in to change notification settings - Fork 14.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create opensearch provider and hook as base for amazon open search hook
- Loading branch information
Showing
9 changed files
with
272 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
.. Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
.. http://www.apache.org/licenses/LICENSE-2.0 | ||
.. Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
.. NOTE TO CONTRIBUTORS: | ||
Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes | ||
and you want to add an explanation to the users on how they are supposed to deal with them. | ||
The changelog is updated and maintained semi-automatically by release manager. | ||
``apache-airflow-providers-opensearch`` | ||
|
||
Changelog | ||
--------- |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations | ||
|
||
import json | ||
from functools import cached_property | ||
from typing import Any | ||
|
||
from opensearchpy import OpenSearch, RequestsHttpConnection | ||
|
||
from airflow.exceptions import AirflowException | ||
from airflow.hooks.base import BaseHook | ||
|
||
|
||
class OpenSearchHook(BaseHook): | ||
""" | ||
This Hook provides a thin wrapper around the OpenSearch client. | ||
:param: open_search_conn_id: Connection to use with Open Search | ||
:param: log_query: Whether to log the query used for Open Search | ||
""" | ||
conn_name_attr = "opensearch_conn_id" | ||
default_conn_name = "opensearch_default" | ||
conn_type = "opensearch" | ||
hook_name = "OpenSearch Hook" | ||
|
||
def __init__(self, *args: Any, open_search_conn_id: str, log_query: bool, **kwargs: Any): | ||
super().__init__(*args, **kwargs) | ||
self.client = None | ||
self.conn_id = open_search_conn_id | ||
self.log_query = log_query | ||
|
||
self.conn = self.get_connection(self.conn_id) | ||
self.use_ssl = self.conn.extra_dejson.get("use_ssl", False) | ||
self.verify_certs = self.conn.extra_dejson.get("verify_certs", False) | ||
self.__SERVICE = "es" | ||
|
||
@cached_property | ||
def get_client(self) -> OpenSearch: | ||
""" | ||
This function is intended for Operators that will take in arguments and use the high level | ||
OpenSearch client which allows using Python objects to perform searches. | ||
""" | ||
auth = (self.conn.login, self.conn.password) | ||
self.client = OpenSearch( | ||
hosts=[{"host": self.conn.host, "port": self.conn.port}], | ||
http_auth=auth, | ||
use_ssl=self.use_ssl, | ||
verify_certs=self.verify_certs, | ||
connection_class=RequestsHttpConnection, | ||
) | ||
return self.client | ||
|
||
def search(self, query: dict, index_name: str, **kwargs: Any) -> Any: | ||
""" | ||
Runs a search query against the connected OpenSearch cluster. | ||
:param: query: The query for the search against OpenSearch. | ||
:param: index_name: The name of the index to search against | ||
""" | ||
if self.log_query: | ||
self.log.info("Searching %s with Query: %s", index_name, query) | ||
return self.client.search(body=query, index=index_name, **kwargs) | ||
|
||
def index(self, document: dict, index_name: str, doc_id: int, **kwargs: Any) -> Any: | ||
""" | ||
Index a document on open search. | ||
:param: document: A dictionary representation of the document | ||
:param: index_name: the name of the index that this document will be associated with | ||
:param: doc_id: the numerical identifier that will be used to identify the document on the index. | ||
""" | ||
return self.client.index(index=index_name, id=doc_id, body=document, **kwargs) | ||
|
||
def delete(self, index_name: str, query: dict | None = None, doc_id: int | None = None): | ||
""" | ||
Delete from an index by either a query or by the document id. | ||
:param: index_name: the name of the index to delete from | ||
:param: query: If deleting by query a dict representation of the query to run to | ||
identify documents to delete. | ||
:param: doc_id: The identifier of the document to delete. | ||
""" | ||
if query is not None: | ||
if self.log_query: | ||
self.log.info("Deleting from %s using Query: %s", index_name, query) | ||
return self.client.delete_by_query(index=index_name, body=query) | ||
elif doc_id is not None: | ||
return self.client.delete(index=index_name, id=doc_id) | ||
else: | ||
AirflowException("To delete a document you must include one of either a query or a document id. ") | ||
|
||
@staticmethod | ||
def get_ui_field_behaviour() -> dict[str, Any]: | ||
"""Returns custom UI field behaviour for Open Search Connection.""" | ||
return { | ||
"hidden_fields": ["schema"], | ||
"relabeling": { | ||
"extra": "Open Search Configuration", | ||
}, | ||
"placeholders": { | ||
"extra": json.dumps( | ||
{ | ||
"use_ssl": True, | ||
"verify_certs": True | ||
}, | ||
indent=2, | ||
), | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
--- | ||
package-name: apache-airflow-providers-opensearch | ||
name: Opensearch | ||
description: | | ||
`Opensearch <https://opensearch.org/>`__ | ||
suspended: false | ||
versions: | ||
- 1.0.0 | ||
|
||
dependencies: | ||
- apache-airflow>=2.4.3 | ||
- opensearch-py>=2.2.0 | ||
- opensearch-dsl>=2.1.0 | ||
|
||
integrations: | ||
- integration-name: Opensearch | ||
external-doc-url: https://opensearch.org/ | ||
logo: /integration-logos/opensearch/opensearch.png | ||
tags: [software] | ||
|
||
hooks: | ||
- integration-name: Opensearch | ||
python-modules: | ||
- airflow.providers.opensearch.hooks.opensearch | ||
|
||
connection-types: | ||
- hook-class-name: airflow.providers.opensearch.hooks.opensearch.OpenSearchHook | ||
connection-type: opensearch |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.