Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -697,13 +697,31 @@ def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
offset = 1
for line in logs:
# Make sure line is not empty
if line.strip():
# construct log_id which is {dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
# also construct the offset field (default is 'offset')
if not line.strip():
continue

try:
log_dict = json.loads(line)
log_dict.update({"log_id": log_id, self.offset_field: offset})
offset += 1
parsed_logs.append(log_dict)
except json.JSONDecodeError:
# Best-effort fallback: preserve the raw line
self.log.debug("Failed to parse log line as JSON", exc_info=True)
log_dict = {
"message": line,
"unparsed": True,
}

# Ensure minimal compatibility with Airflow log expectations
if "event" not in log_dict and "message" not in log_dict:
log_dict["message"] = str(line)

log_dict.update(
{
"log_id": log_id,
self.offset_field: offset,
}
)
offset += 1
parsed_logs.append(log_dict)

return parsed_logs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,50 @@ def test_raw_log_contains_log_id_and_offset(self, tmp_json_file, ti):
assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
assert all(line["log_id"] == log_id for line in json_log_lines)

def test_raw_log_handles_invalid_json_line(self, ti):

raw_log = '{"message": "ok"}\nINVALID_JSON\n{"message": "ok2"}\n'
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number)

json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, log_id)

assert len(json_log_lines) == 3

assert json_log_lines[1]["message"] == "INVALID_JSON"
assert json_log_lines[1]["unparsed"] is True

assert [line["offset"] for line in json_log_lines] == [1, 2, 3]

def test_raw_log_all_plain_text(self, ti):
raw_log = "line1\nline2\nline3\n"
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number)

json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, log_id)

assert len(json_log_lines) == 3
assert all(line["unparsed"] for line in json_log_lines)
assert [line["message"] for line in json_log_lines] == ["line1", "line2", "line3"]

def test_raw_log_mixed_content(self, ti):
raw_log = '{"event": "ok"}\nplain text\n{"message": "done"}\n'
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number)

json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, log_id)

assert json_log_lines[0]["event"] == "ok"
assert json_log_lines[1]["message"] == "plain text"
assert json_log_lines[1]["unparsed"] is True
assert json_log_lines[2]["message"] == "done"

def test_raw_log_ignores_empty_lines(self, ti):
raw_log = '\n{"message": "ok"}\n\n'
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number)

json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, log_id)

assert len(json_log_lines) == 1
assert json_log_lines[0]["message"] == "ok"

def test_get_source_includes(self):
assert self.elasticsearch_io._get_source_includes() == [
"@timestamp",
Expand Down
Loading