Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions providers/elasticsearch/docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
Changelog
---------

When the ``[elasticsearch] host`` config embeds credentials
(``https://user:password@elk.example.com:9200``), the log-source label
shown in task logs is now the host URL with the ``user:password@`` portion
stripped. Previously the full URL (including credentials) could appear as
a dictionary key in the task-log output when log-hits did not carry a
``host`` field. The Elasticsearch client is still connected using the
full URL, so authentication is unaffected.

6.5.2
.....

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,28 @@ def getattr_nested(obj, item, default):
return default


def _strip_userinfo(url: str) -> str:
"""
Return ``url`` with any ``user:password@`` userinfo removed.

The Elasticsearch ``[elasticsearch] host`` config commonly embeds
credentials (``https://user:password@elk.example.com:9200``). This
value is reused as a display label for log-source grouping, so the
credentials would otherwise end up in task logs. Anything that is
not a valid URL is returned unchanged.
"""
try:
parsed = urlparse(url)
except (TypeError, ValueError):
return url
if not parsed.hostname or (not parsed.username and not parsed.password):
return url
netloc = parsed.hostname
if parsed.port is not None:
netloc = f"{netloc}:{parsed.port}"
return parsed._replace(netloc=netloc).geturl()


def _render_log_id(log_id_template: str, ti: TaskInstance | TaskInstanceKey, try_number: int) -> str:
return log_id_template.format(
dag_id=ti.dag_id,
Expand Down Expand Up @@ -801,8 +823,9 @@ def _get_index_patterns(self, ti: RuntimeTI | None) -> str:

def _group_logs_by_host(self, response: ElasticSearchResponse) -> dict[str, list[Hit]]:
grouped_logs = defaultdict(list)
host_fallback = _strip_userinfo(self.host)
for hit in response:
key = getattr_nested(hit, self.host_field, None) or self.host
key = getattr_nested(hit, self.host_field, None) or host_fallback
grouped_logs[key].append(hit)
return grouped_logs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
_clean_date,
_format_error_detail,
_render_log_id,
_strip_userinfo,
get_es_kwargs_from_config,
getattr_nested,
)
Expand Down Expand Up @@ -228,6 +229,21 @@ def test_format_url(self, host, expected):
else:
assert ElasticsearchTaskHandler.format_url(host) == expected

@pytest.mark.parametrize(
("host", "expected"),
[
("https://user:pass@elk.example.com:9200", "https://elk.example.com:9200"),
("http://USER:PASS@elk.example.com", "http://elk.example.com"),
("https://elk.example.com:9200", "https://elk.example.com:9200"),
("http://localhost:9200", "http://localhost:9200"),
("https://user@elk.example.com", "https://elk.example.com"),
("not-a-url", "not-a-url"),
("", ""),
],
)
def test_strip_userinfo(self, host, expected):
assert _strip_userinfo(host) == expected

def test_client(self):
assert isinstance(self.es_task_handler.client, elasticsearch.Elasticsearch)
assert self.es_task_handler.index_patterns == "_all"
Expand Down
Loading