From f4d204eefb698b5cd48aaea2975fcf5ba2c0ad48 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 27 Apr 2026 11:00:50 -0400 Subject: [PATCH 1/9] Fix quote support --- detection_rules/etc/test_toml.json | 10 ++++++++++ detection_rules/rule_formatter.py | 5 +++-- pyproject.toml | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/detection_rules/etc/test_toml.json b/detection_rules/etc/test_toml.json index 081222bbb20..af72ed62fa9 100644 --- a/detection_rules/etc/test_toml.json +++ b/detection_rules/etc/test_toml.json @@ -170,5 +170,15 @@ } ] } + }, + { + "metadata": { + "creation_date": "2020/02/26", + "maturity": "development", + "updated_date": "2020/02/26" + }, + "rule": { + "query": "file.path: \"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\" file.path: Hello\\:World" + } } ] diff --git a/detection_rules/rule_formatter.py b/detection_rules/rule_formatter.py index 16fe4733a5d..51302b8a214 100644 --- a/detection_rules/rule_formatter.py +++ b/detection_rules/rule_formatter.py @@ -151,12 +151,13 @@ def dump_str(self, v: str | NonformattedField) -> str: lines = wrap_text(v) multiline = len(lines) > 1 - raw = (multiline or (DQ in v and SQ not in v)) and TRIPLE_DQ not in v + raw = (multiline or (DQ in v and SQ not in v)) and TRIPLE_DQ not in v and "\\" not in v if multiline: if raw: return "".join([TRIPLE_DQ, *initial_newline, *lines, TRIPLE_DQ]) - return "\n".join([TRIPLE_SQ] + [json.dumps(line)[1:-1] for line in lines] + [TRIPLE_SQ]) + # Use literal triple-SQ to preserve backslashes and avoid invalid TOML escape sequences + return "".join([TRIPLE_SQ, *initial_newline, *lines, TRIPLE_SQ]) if raw: return f"'{lines[0]:s}'" # In the toml library there is a magic replace for \\\\x -> u00 that we wish to avoid until #4979 is resolved diff --git a/pyproject.toml b/pyproject.toml index a0afea00514..18368f55fe0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.27" +version = "1.6.28" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" From 145003796a685fb5dc7af08932f44ebeca2c8f93 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Tue, 28 Apr 2026 21:10:13 -0400 Subject: [PATCH 2/9] preserve_all_strings --- detection_rules/rule_formatter.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/detection_rules/rule_formatter.py b/detection_rules/rule_formatter.py index 51302b8a214..4e9fc829fc1 100644 --- a/detection_rules/rule_formatter.py +++ b/detection_rules/rule_formatter.py @@ -126,6 +126,22 @@ def apply_preservation(target: OrderedDict[str, Any], keys: list[str]) -> None: return data +def preserve_all_strings(obj: Any) -> Any: + """Recursively mark every string leaf under ``obj`` as NonformattedField.""" + + # Used to keep arbitrary nested string content which would otherwise + # reflow/wrap the text via ``wrap_text``. + if isinstance(obj, NonformattedField): + return obj + if isinstance(obj, str): + return NonformattedField(obj) + if isinstance(obj, dict): + return {k: preserve_all_strings(v) for k, v in obj.items()} # type: ignore[reportUnknownVariableType] + if isinstance(obj, list): + return [preserve_all_strings(v) for v in obj] # type: ignore[reportUnknownVariableType] + return obj + + class RuleTomlEncoder(toml.TomlEncoder): # type: ignore[reportMissingTypeArgument] """Generate a pretty form of toml.""" @@ -247,6 +263,12 @@ def _do_write(f: TextIO | None, _data: str, _contents: dict[str, Any]) -> None: # explicitly preserve formatting for value field in filters preserved_fields = ["meta.value"] v = [preserve_formatting_for_fields(meta, preserved_fields) for meta in v] if v is not None else [] + # Preserve the verbatim content of any nested DSL query body under ``filters[].query`` + # (e.g. ``query.query_string.query``, ``query.match_phrase.*.query``, ``query.bool.*``). + # These are Elasticsearch query DSL strings and must not be reflowed by ``wrap_text``. + for filt in v: + if isinstance(filt, dict) and isinstance(filt.get("query"), dict): + filt["query"] = preserve_all_strings(filt["query"]) if k == "note" and isinstance(v, str): # Transform instances of \ to \\ as calling write will convert \\ to \. From fefd114cb42f5e207086d0795897fafd0d014bf0 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Tue, 28 Apr 2026 21:17:59 -0400 Subject: [PATCH 3/9] Reduce complexity --- detection_rules/rule_formatter.py | 34 +++++++------------------------ 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/detection_rules/rule_formatter.py b/detection_rules/rule_formatter.py index 4e9fc829fc1..c5dd4aef017 100644 --- a/detection_rules/rule_formatter.py +++ b/detection_rules/rule_formatter.py @@ -126,22 +126,6 @@ def apply_preservation(target: OrderedDict[str, Any], keys: list[str]) -> None: return data -def preserve_all_strings(obj: Any) -> Any: - """Recursively mark every string leaf under ``obj`` as NonformattedField.""" - - # Used to keep arbitrary nested string content which would otherwise - # reflow/wrap the text via ``wrap_text``. - if isinstance(obj, NonformattedField): - return obj - if isinstance(obj, str): - return NonformattedField(obj) - if isinstance(obj, dict): - return {k: preserve_all_strings(v) for k, v in obj.items()} # type: ignore[reportUnknownVariableType] - if isinstance(obj, list): - return [preserve_all_strings(v) for v in obj] # type: ignore[reportUnknownVariableType] - return obj - - class RuleTomlEncoder(toml.TomlEncoder): # type: ignore[reportMissingTypeArgument] """Generate a pretty form of toml.""" @@ -167,13 +151,15 @@ def dump_str(self, v: str | NonformattedField) -> str: lines = wrap_text(v) multiline = len(lines) > 1 - raw = (multiline or (DQ in v and SQ not in v)) and TRIPLE_DQ not in v and "\\" not in v + raw = (multiline or (DQ in v and SQ not in v)) and TRIPLE_DQ not in v if multiline: - if raw: - return "".join([TRIPLE_DQ, *initial_newline, *lines, TRIPLE_DQ]) - # Use literal triple-SQ to preserve backslashes and avoid invalid TOML escape sequences - return "".join([TRIPLE_SQ, *initial_newline, *lines, TRIPLE_SQ]) + # Triple-double-quoted basic strings allow literal newlines and literal ``"`` + # (as long as ``"""`` doesn't appear, which is guarded above via ``TRIPLE_DQ not in v``), + # but backslashes must be escaped so that e.g. ``Hello\:World`` is serialized as + # ``Hello\\:World`` -- otherwise ``\:`` is an invalid TOML escape sequence (issue #5182). + escaped_lines = [line.replace("\\", "\\\\") for line in lines] + return "".join([TRIPLE_DQ, *initial_newline, *escaped_lines, TRIPLE_DQ]) if raw: return f"'{lines[0]:s}'" # In the toml library there is a magic replace for \\\\x -> u00 that we wish to avoid until #4979 is resolved @@ -263,12 +249,6 @@ def _do_write(f: TextIO | None, _data: str, _contents: dict[str, Any]) -> None: # explicitly preserve formatting for value field in filters preserved_fields = ["meta.value"] v = [preserve_formatting_for_fields(meta, preserved_fields) for meta in v] if v is not None else [] - # Preserve the verbatim content of any nested DSL query body under ``filters[].query`` - # (e.g. ``query.query_string.query``, ``query.match_phrase.*.query``, ``query.bool.*``). - # These are Elasticsearch query DSL strings and must not be reflowed by ``wrap_text``. - for filt in v: - if isinstance(filt, dict) and isinstance(filt.get("query"), dict): - filt["query"] = preserve_all_strings(filt["query"]) if k == "note" and isinstance(v, str): # Transform instances of \ to \\ as calling write will convert \\ to \. From f65dd24d2da91d2023df7e4f889708919c9d0896 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Tue, 28 Apr 2026 21:45:31 -0400 Subject: [PATCH 4/9] patch bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 18368f55fe0..8923f91a056 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.28" +version = "1.6.29" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" From 0e034aa118584067d12b6211fe6be7b114d3cc7b Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 4 May 2026 12:33:52 -0400 Subject: [PATCH 5/9] Fix max string wrap --- detection_rules/rule_formatter.py | 35 ++++++++++++++++++++++++++++--- tests/test_toml_formatter.py | 35 +++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/detection_rules/rule_formatter.py b/detection_rules/rule_formatter.py index c5dd4aef017..30827c05627 100644 --- a/detection_rules/rule_formatter.py +++ b/detection_rules/rule_formatter.py @@ -23,6 +23,7 @@ DQ = '"' TRIPLE_SQ = SQ * 3 TRIPLE_DQ = DQ * 3 +TOML_STRING_WRAP_WIDTH = 120 # Fields from nested objects (not BaseRuleData fields) that need to be perserved. # NOTE: we treat these as globally unique which might not be true in all cases @@ -81,7 +82,7 @@ def wrap_text(v: str, block_indent: int = 0) -> list[str]: v, initial_indent=" " * block_indent, subsequent_indent=" " * block_indent, - width=120, + width=TOML_STRING_WRAP_WIDTH, break_long_words=False, break_on_hyphens=False, ) @@ -101,6 +102,10 @@ class NonformattedField(str): # noqa: SLOT000 """Non-formatting class.""" +class UnwrappedField(str): # noqa: SLOT000 + """String field that should not receive artificial line wrapping.""" + + def preserve_formatting_for_fields(data: OrderedDict[str, Any], fields_to_preserve: list[str]) -> OrderedDict[str, Any]: """Preserve formatting for specified nested fields in an action.""" @@ -126,6 +131,22 @@ def apply_preservation(target: OrderedDict[str, Any], keys: list[str]) -> None: return data +def preserve_filter_value_formatting(data: Any) -> Any: + """Preserve filter value strings so query DSL literals are not changed.""" + if isinstance(data, dict): + for key, value in data.items(): # type: ignore[reportUnknownVariableType] + if key == "value" and isinstance(value, str): + data[key] = UnwrappedField(value) # type: ignore[reportUnknownMemberType] + elif isinstance(value, dict | list): + preserve_filter_value_formatting(value) + elif isinstance(data, list): + for value in data: # type: ignore[reportUnknownVariableType] + if isinstance(value, dict | list): + preserve_filter_value_formatting(value) + + return data + + class RuleTomlEncoder(toml.TomlEncoder): # type: ignore[reportMissingTypeArgument] """Generate a pretty form of toml.""" @@ -137,6 +158,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.dump_funcs[str] = self.dump_str self.dump_funcs[list] = self.dump_list self.dump_funcs[NonformattedField] = self.dump_str + self.dump_funcs[UnwrappedField] = self.dump_unwrapped_str def dump_str(self, v: str | NonformattedField) -> str: """Change the TOML representation to multi-line or single quote when logical.""" @@ -166,6 +188,14 @@ def dump_str(self, v: str | NonformattedField) -> str: # Also addresses an issue where backslashes in certain strings are not properly escaped in self._old_dump_str(v) return json.dumps(v) + def dump_unwrapped_str(self, v: UnwrappedField) -> str: + """Serialize a string without inserting word-wrap newlines.""" + if TRIPLE_DQ not in v and ("\n" in v or "\r" in v or (len(v) > TOML_STRING_WRAP_WIDTH and " " in v)): + escaped_value = v.replace("\\", "\\\\") + return f"{TRIPLE_DQ}\n{escaped_value}\n{TRIPLE_DQ}" + + return json.dumps(v) + def _dump_flat_list(self, v: Iterable[Any]) -> str: """A slightly tweaked version of original dump_list, removing trailing commas.""" if not v: @@ -247,8 +277,7 @@ def _do_write(f: TextIO | None, _data: str, _contents: dict[str, Any]) -> None: if k == "filters": # explicitly preserve formatting for value field in filters - preserved_fields = ["meta.value"] - v = [preserve_formatting_for_fields(meta, preserved_fields) for meta in v] if v is not None else [] + v = [preserve_filter_value_formatting(filter_) for filter_ in v] if v is not None else [] if k == "note" and isinstance(v, str): # Transform instances of \ to \\ as calling write will convert \\ to \. diff --git a/tests/test_toml_formatter.py b/tests/test_toml_formatter.py index dea5e850eff..aeaba96fe4d 100644 --- a/tests/test_toml_formatter.py +++ b/tests/test_toml_formatter.py @@ -72,3 +72,38 @@ def test_formatter_rule(self): def test_formatter_deep(self): """Test that the data remains unchanged from formatting.""" self.compare_test_data(self.test_data[1:]) + + def test_filter_value_does_not_word_wrap(self): + """Test long filter values are not split across TOML lines.""" + filter_value = ( + r"C:\Program Files\Microsoft Monitoring Agent\Agent\Health Service State\Monitoring Host Temporary " + r"Files*\AvailabilityGroupMonitoring.ps1" + ) + data = { + "rule": { + "filters": [ + { + "meta": {"negate": True}, + "query": { + "wildcard": { + "file.path": { + "case_insensitive": True, + "value": filter_value, + } + } + }, + } + ] + } + } + tmp_path = Path(tmp_file) + + try: + toml_write(copy.deepcopy(data), tmp_path) + formatted_data = tmp_path.read_text() + + self.assertIn("Monitoring Host Temporary Files*", formatted_data) + self.assertNotIn("Monitoring Host Temporary\nFiles*", formatted_data) + finally: + if tmp_path.exists(): + tmp_path.unlink() From c5daa10853a1199bd4a11fe1c8e0a556ad45fbb5 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 4 May 2026 13:14:33 -0400 Subject: [PATCH 6/9] patch bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8923f91a056..6f15cb4ac7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.29" +version = "1.6.30" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" From e2d945aad6af0337be700ed95261e7e411dfd918 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 4 May 2026 13:30:52 -0400 Subject: [PATCH 7/9] Add noqa --- detection_rules/rule_formatter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/detection_rules/rule_formatter.py b/detection_rules/rule_formatter.py index 30827c05627..bb0e6387749 100644 --- a/detection_rules/rule_formatter.py +++ b/detection_rules/rule_formatter.py @@ -144,7 +144,7 @@ def preserve_filter_value_formatting(data: Any) -> Any: if isinstance(value, dict | list): preserve_filter_value_formatting(value) - return data + return data # type: ignore[reportUnknownVariableType] class RuleTomlEncoder(toml.TomlEncoder): # type: ignore[reportMissingTypeArgument] From fb49fb839554bf399c4c7f7c5a897950373f6319 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 4 May 2026 13:59:08 -0400 Subject: [PATCH 8/9] Revert rule formatter changes, handled in a different PR --- detection_rules/rule_formatter.py | 35 +++---------------------------- tests/test_toml_formatter.py | 35 ------------------------------- 2 files changed, 3 insertions(+), 67 deletions(-) diff --git a/detection_rules/rule_formatter.py b/detection_rules/rule_formatter.py index bb0e6387749..c5dd4aef017 100644 --- a/detection_rules/rule_formatter.py +++ b/detection_rules/rule_formatter.py @@ -23,7 +23,6 @@ DQ = '"' TRIPLE_SQ = SQ * 3 TRIPLE_DQ = DQ * 3 -TOML_STRING_WRAP_WIDTH = 120 # Fields from nested objects (not BaseRuleData fields) that need to be perserved. # NOTE: we treat these as globally unique which might not be true in all cases @@ -82,7 +81,7 @@ def wrap_text(v: str, block_indent: int = 0) -> list[str]: v, initial_indent=" " * block_indent, subsequent_indent=" " * block_indent, - width=TOML_STRING_WRAP_WIDTH, + width=120, break_long_words=False, break_on_hyphens=False, ) @@ -102,10 +101,6 @@ class NonformattedField(str): # noqa: SLOT000 """Non-formatting class.""" -class UnwrappedField(str): # noqa: SLOT000 - """String field that should not receive artificial line wrapping.""" - - def preserve_formatting_for_fields(data: OrderedDict[str, Any], fields_to_preserve: list[str]) -> OrderedDict[str, Any]: """Preserve formatting for specified nested fields in an action.""" @@ -131,22 +126,6 @@ def apply_preservation(target: OrderedDict[str, Any], keys: list[str]) -> None: return data -def preserve_filter_value_formatting(data: Any) -> Any: - """Preserve filter value strings so query DSL literals are not changed.""" - if isinstance(data, dict): - for key, value in data.items(): # type: ignore[reportUnknownVariableType] - if key == "value" and isinstance(value, str): - data[key] = UnwrappedField(value) # type: ignore[reportUnknownMemberType] - elif isinstance(value, dict | list): - preserve_filter_value_formatting(value) - elif isinstance(data, list): - for value in data: # type: ignore[reportUnknownVariableType] - if isinstance(value, dict | list): - preserve_filter_value_formatting(value) - - return data # type: ignore[reportUnknownVariableType] - - class RuleTomlEncoder(toml.TomlEncoder): # type: ignore[reportMissingTypeArgument] """Generate a pretty form of toml.""" @@ -158,7 +137,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.dump_funcs[str] = self.dump_str self.dump_funcs[list] = self.dump_list self.dump_funcs[NonformattedField] = self.dump_str - self.dump_funcs[UnwrappedField] = self.dump_unwrapped_str def dump_str(self, v: str | NonformattedField) -> str: """Change the TOML representation to multi-line or single quote when logical.""" @@ -188,14 +166,6 @@ def dump_str(self, v: str | NonformattedField) -> str: # Also addresses an issue where backslashes in certain strings are not properly escaped in self._old_dump_str(v) return json.dumps(v) - def dump_unwrapped_str(self, v: UnwrappedField) -> str: - """Serialize a string without inserting word-wrap newlines.""" - if TRIPLE_DQ not in v and ("\n" in v or "\r" in v or (len(v) > TOML_STRING_WRAP_WIDTH and " " in v)): - escaped_value = v.replace("\\", "\\\\") - return f"{TRIPLE_DQ}\n{escaped_value}\n{TRIPLE_DQ}" - - return json.dumps(v) - def _dump_flat_list(self, v: Iterable[Any]) -> str: """A slightly tweaked version of original dump_list, removing trailing commas.""" if not v: @@ -277,7 +247,8 @@ def _do_write(f: TextIO | None, _data: str, _contents: dict[str, Any]) -> None: if k == "filters": # explicitly preserve formatting for value field in filters - v = [preserve_filter_value_formatting(filter_) for filter_ in v] if v is not None else [] + preserved_fields = ["meta.value"] + v = [preserve_formatting_for_fields(meta, preserved_fields) for meta in v] if v is not None else [] if k == "note" and isinstance(v, str): # Transform instances of \ to \\ as calling write will convert \\ to \. diff --git a/tests/test_toml_formatter.py b/tests/test_toml_formatter.py index aeaba96fe4d..dea5e850eff 100644 --- a/tests/test_toml_formatter.py +++ b/tests/test_toml_formatter.py @@ -72,38 +72,3 @@ def test_formatter_rule(self): def test_formatter_deep(self): """Test that the data remains unchanged from formatting.""" self.compare_test_data(self.test_data[1:]) - - def test_filter_value_does_not_word_wrap(self): - """Test long filter values are not split across TOML lines.""" - filter_value = ( - r"C:\Program Files\Microsoft Monitoring Agent\Agent\Health Service State\Monitoring Host Temporary " - r"Files*\AvailabilityGroupMonitoring.ps1" - ) - data = { - "rule": { - "filters": [ - { - "meta": {"negate": True}, - "query": { - "wildcard": { - "file.path": { - "case_insensitive": True, - "value": filter_value, - } - } - }, - } - ] - } - } - tmp_path = Path(tmp_file) - - try: - toml_write(copy.deepcopy(data), tmp_path) - formatted_data = tmp_path.read_text() - - self.assertIn("Monitoring Host Temporary Files*", formatted_data) - self.assertNotIn("Monitoring Host Temporary\nFiles*", formatted_data) - finally: - if tmp_path.exists(): - tmp_path.unlink() From 5f08e5bc3e6f33016dd265932fce16e88811a711 Mon Sep 17 00:00:00 2001 From: eric-forte-elastic Date: Mon, 4 May 2026 14:48:20 -0400 Subject: [PATCH 9/9] fix bump --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 80d1555ddf7..e756a083069 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.35" +version = "1.6.34" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"