Skip to content

Commit

Permalink
Upgrade Elasticsearch to 8 (#33135)
Browse files Browse the repository at this point in the history
  • Loading branch information
Owen-CH-Leung committed Aug 8, 2023
1 parent 515179f commit ad9d8d4
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 129 deletions.
92 changes: 0 additions & 92 deletions airflow/config_templates/config.yml
Expand Up @@ -2317,98 +2317,6 @@ kerberos:
type: boolean
example: ~
default: "True"
elasticsearch:
description: ~
options:
host:
description: |
Elasticsearch host
version_added: 1.10.4
type: string
example: ~
default: ""
log_id_template:
description: |
Format of the log_id, which is used to query for a given tasks logs
version_added: 1.10.4
type: string
example: ~
is_template: true
default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
end_of_log_mark:
description: |
Used to mark the end of a log stream for a task
version_added: 1.10.4
type: string
example: ~
default: "end_of_log"
frontend:
description: |
Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
Code will construct log_id using the log_id template from the argument above.
NOTE: scheme will default to https if one is not provided
version_added: 1.10.4
type: string
example: "http://localhost:5601/app/kibana#/discover\
?_a=(columns:!(message),query:(language:kuery,query:'log_id: \"{log_id}\"'),sort:!(log.offset,asc))"
default: ""
write_stdout:
description: |
Write the task logs to the stdout of the worker, rather than the default files
version_added: 1.10.4
type: string
example: ~
default: "False"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
version_added: 1.10.4
type: string
example: ~
default: "False"
json_fields:
description: |
Log fields to also attach to the json output, if enabled
version_added: 1.10.4
type: string
example: ~
default: "asctime, filename, lineno, levelname, message"
host_field:
description: |
The field where host name is stored (normally either `host` or `host.name`)
version_added: 2.1.1
type: string
example: ~
default: "host"
offset_field:
description: |
The field where offset is stored (normally either `offset` or `log.offset`)
version_added: 2.1.1
type: string
example: ~
default: "offset"
index_patterns:
description: |
Comma separated list of index patterns to use when searching for logs (default: `_all`).
version_added: 2.6.0
type: string
example: something-*
default: "_all"
elasticsearch_configs:
description: ~
options:
use_ssl:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "False"
verify_certs:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "True"
sensors:
description: ~
options:
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/elasticsearch/CHANGELOG.rst
Expand Up @@ -27,6 +27,12 @@
Changelog
---------

.. note::
Upgrade to Elasticsearch 8. The ElasticsearchTaskHandler & ElasticsearchSQLHook will now use Elasticsearch 8 package.
As explained https://elasticsearch-py.readthedocs.io/en/stable , Elasticsearch language clients are only backwards
compatible with default distributions and without guarantees made, we recommend upgrading the version of
Elasticsearch database to 8 to ensure compatibility with the language client.

5.0.0
.....

Expand Down
19 changes: 8 additions & 11 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -30,7 +30,7 @@
# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
import elasticsearch
import pendulum
from elasticsearch.exceptions import ElasticsearchException, NotFoundError
from elasticsearch.exceptions import NotFoundError

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(
json_fields: str,
host_field: str = "host",
offset_field: str = "offset",
host: str = "localhost:9200",
host: str = "http://localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"),
es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
Expand All @@ -101,8 +101,8 @@ def __init__(
super().__init__(base_log_folder, filename_template)
self.closed = False

self.client = elasticsearch.Elasticsearch(host.split(";"), **es_kwargs) # type: ignore[attr-defined]

self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type: ignore[attr-defined]
# in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200
if USE_PER_RUN_LOG_ID and log_id_template is not None:
warnings.warn(
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
Expand Down Expand Up @@ -292,27 +292,24 @@ def es_read(self, log_id: str, offset: int | str, metadata: dict) -> list | Elas
}

try:
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"]
max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] # type: ignore
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist", self.index_patterns)
raise e
except ElasticsearchException as e:
self.log.exception("Could not get current log size with log_id: %s", log_id)
raise e

logs: list[Any] | ElasticSearchResponse = []
if max_log_line != 0:
try:
query.update({"sort": [self.offset_field]})
res = self.client.search(
res = self.client.search( # type: ignore
index=self.index_patterns,
body=query,
size=self.MAX_LINE_PER_PAGE,
from_=self.MAX_LINE_PER_PAGE * self.PAGE,
)
logs = ElasticSearchResponse(self, res)
except elasticsearch.exceptions.ElasticsearchException:
self.log.exception("Could not read log with log_id: %s", log_id)
except Exception as err:
self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err)

return logs

Expand Down
96 changes: 95 additions & 1 deletion airflow/providers/elasticsearch/provider.yaml
Expand Up @@ -53,7 +53,7 @@ versions:
dependencies:
- apache-airflow>=2.4.0
- apache-airflow-providers-common-sql>=1.3.1
- elasticsearch>7,<7.15.0
- elasticsearch>8,<9

integrations:
- integration-name: Elasticsearch
Expand All @@ -72,3 +72,97 @@ connection-types:

logging:
- airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler

config:
elasticsearch:
description: ~
options:
host:
description: |
Elasticsearch host
version_added: 1.10.4
type: string
example: ~
default: ""
log_id_template:
description: |
Format of the log_id, which is used to query for a given tasks logs
version_added: 1.10.4
type: string
example: ~
is_template: true
default: "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
end_of_log_mark:
description: |
Used to mark the end of a log stream for a task
version_added: 1.10.4
type: string
example: ~
default: "end_of_log"
frontend:
description: |
Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
Code will construct log_id using the log_id template from the argument above.
NOTE: scheme will default to https if one is not provided
version_added: 1.10.4
type: string
example: "http://localhost:5601/app/kibana#/discover\
?_a=(columns:!(message),query:(language:kuery,query:'log_id: \"{log_id}\"'),sort:!(log.offset,asc))"
default: ""
write_stdout:
description: |
Write the task logs to the stdout of the worker, rather than the default files
version_added: 1.10.4
type: string
example: ~
default: "False"
json_format:
description: |
Instead of the default log formatter, write the log lines as JSON
version_added: 1.10.4
type: string
example: ~
default: "False"
json_fields:
description: |
Log fields to also attach to the json output, if enabled
version_added: 1.10.4
type: string
example: ~
default: "asctime, filename, lineno, levelname, message"
host_field:
description: |
The field where host name is stored (normally either `host` or `host.name`)
version_added: 2.1.1
type: string
example: ~
default: "host"
offset_field:
description: |
The field where offset is stored (normally either `offset` or `log.offset`)
version_added: 2.1.1
type: string
example: ~
default: "offset"
index_patterns:
description: |
Comma separated list of index patterns to use when searching for logs (default: `_all`).
version_added: 2.6.0
type: string
example: something-*
default: "_all"
elasticsearch_configs:
description: ~
options:
http_compress:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "False"
verify_certs:
description: ~
version_added: 1.10.5
type: string
example: ~
default: "True"
18 changes: 18 additions & 0 deletions docs/apache-airflow-providers-elasticsearch/configurations-ref.rst
@@ -0,0 +1,18 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. include:: ../exts/includes/providers-configurations-ref.rst
3 changes: 2 additions & 1 deletion docs/apache-airflow-providers-elasticsearch/index.rst
Expand Up @@ -43,6 +43,7 @@
:maxdepth: 1
:caption: References

Configuration <configurations-ref>
Python API <_api/airflow/providers/elasticsearch/index>

.. toctree::
Expand Down Expand Up @@ -103,7 +104,7 @@ PIP package Version required
======================================= ==================
``apache-airflow`` ``>=2.4.0``
``apache-airflow-providers-common-sql`` ``>=1.3.1``
``elasticsearch`` ``>7,<7.15.0``
``elasticsearch`` ``>8,<9``
======================================= ==================

Cross provider package dependencies
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/configurations-ref.rst
Expand Up @@ -41,6 +41,7 @@ in the provider's documentation. The pre-installed providers that you may want t
* :doc:`Configuration Reference for SMTP Provider <apache-airflow-providers-smtp:configurations-ref>`
* :doc:`Configuration Reference for IMAP Provider <apache-airflow-providers-imap:configurations-ref>`
* :doc:`Configuration Reference for OpenLineage Provider <apache-airflow-providers-openlineage:configurations-ref>`
* :doc:`Configuration Reference for Elasticsearch Provider <apache-airflow-providers-elasticsearch:configurations-ref>`

.. note::
For more information see :doc:`/howto/set-config`.
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Expand Up @@ -358,7 +358,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.3.1",
"apache-airflow>=2.4.0",
"elasticsearch>7,<7.15.0"
"elasticsearch>8,<9"
],
"cross-providers-deps": [
"common.sql"
Expand Down
44 changes: 41 additions & 3 deletions tests/providers/elasticsearch/log/elasticmock/__init__.py
Expand Up @@ -41,17 +41,55 @@
"""Elastic mock module used for testing"""
from functools import wraps
from unittest.mock import patch

from elasticsearch.client.utils import _normalize_hosts
from urllib.parse import unquote, urlparse

from .fake_elasticsearch import FakeElasticsearch

ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}


def _normalize_hosts(hosts):
"""
Helper function to transform hosts argument to
:class:`~elasticsearch.Elasticsearch` to a list of dicts.
"""
# if hosts are empty, just defer to defaults down the line
if hosts is None:
return [{}]

hosts = [hosts]

out = []

for host in hosts:
if "://" not in host:
host = f"//{host}"

parsed_url = urlparse(host)
h = {"host": parsed_url.hostname}

if parsed_url.port:
h["port"] = parsed_url.port

if parsed_url.scheme == "https":
h["port"] = parsed_url.port or 443
h["use_ssl"] = True

if parsed_url.username or parsed_url.password:
h["http_auth"] = f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"

if parsed_url.path and parsed_url.path != "/":
h["url_prefix"] = parsed_url.path

out.append(h)
else:
out.append(host)
return out


def _get_elasticmock(hosts=None, *args, **kwargs):
host = _normalize_hosts(hosts)[0]
elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port', 9200)}"

if elastic_key in ELASTIC_INSTANCES:
connection = ELASTIC_INSTANCES.get(elastic_key)
Expand Down

0 comments on commit ad9d8d4

Please sign in to comment.