-
Notifications
You must be signed in to change notification settings - Fork 0
Enforce byte-accurate stdin limits for strip-markup #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,9 @@ def print_usage() -> None: | |
| ) | ||
|
|
||
|
|
||
| MAX_INPUT_BYTES = 1024 * 1024 | ||
|
|
||
|
|
||
| def main() -> int: | ||
| """ | ||
| Main function. | ||
|
|
@@ -52,13 +55,36 @@ def main() -> int: | |
| print_usage() | ||
| return 1 | ||
| untrusted_string = arg_list[0] | ||
| if len(untrusted_string.encode()) > MAX_INPUT_BYTES: | ||
| print( | ||
| f"strip-markup: input exceeds maximum size of {MAX_INPUT_BYTES} bytes.", | ||
| file=sys.stderr, | ||
| ) | ||
| return 1 | ||
|
|
||
| ## Read untrusted_string from stdin if needed | ||
| if untrusted_string is None: | ||
| if sys.stdin is not None: | ||
| if "pytest" not in sys.modules: | ||
| sys.stdin.reconfigure(errors="ignore") # type: ignore | ||
| untrusted_string = sys.stdin.read() | ||
| if hasattr(sys.stdin, "buffer"): | ||
| raw_stdin = sys.stdin.buffer.read(MAX_INPUT_BYTES + 1) | ||
| if len(raw_stdin) > MAX_INPUT_BYTES: | ||
| print( | ||
| f"strip-markup: input exceeds maximum size of {MAX_INPUT_BYTES} bytes.", | ||
| file=sys.stderr, | ||
| ) | ||
| return 1 | ||
| encoding = getattr(sys.stdin, "encoding", None) or "utf-8" | ||
| untrusted_string = raw_stdin.decode(encoding, errors="ignore") | ||
| else: | ||
| if "pytest" not in sys.modules and hasattr(sys.stdin, "reconfigure"): | ||
| sys.stdin.reconfigure(errors="ignore") # type: ignore | ||
| untrusted_string = sys.stdin.read(MAX_INPUT_BYTES + 1) | ||
| if len(untrusted_string.encode()) > MAX_INPUT_BYTES: | ||
| print( | ||
| f"strip-markup: input exceeds maximum size of {MAX_INPUT_BYTES} bytes.", | ||
| file=sys.stderr, | ||
| ) | ||
| return 1 | ||
|
Comment on lines
-59
to
+87
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems similarly wrong IMO. |
||
| else: | ||
| ## No way to get an untrusted string, print nothing and | ||
| ## exit successfully | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,17 +42,27 @@ def get_data(self) -> str: | |
| return self.text.getvalue() | ||
|
|
||
|
|
||
| def _strip_control_characters(untrusted_string: str) -> str: | ||
| """ | ||
| Remove control characters that could be used for terminal escapes. | ||
| """ | ||
|
|
||
| return "".join( | ||
| char for char in untrusted_string if char.isprintable() or char in "\n\t" | ||
| ) | ||
|
|
||
|
|
||
|
Comment on lines
+45
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks possibly useful, but it makes me wonder where the job of strip_markup stops and sanitize_string starts. Doing this will essentially do a lot of the work sanitize_string does, except it won't insert underscores in place of Unicode and control characters. The underscores seem valuable at least to me because it tells the user something weird is happening, rather than just protecting them from it and leaving them clueless about it. It's also likely that the regex matching is substantially faster than this. Patrick and I have already been talking about the possibility that strip_markup is unnecessary now that we have sanitize_string. If we have to add something like this to make strip_markup even safe, that seems like a strong argument in favor of just getting rid of strip_markup and using sanitize_string instead. It does its job much better, and should be fairly fast at it. |
||
| def strip_markup(untrusted_string: str) -> str: | ||
| """ | ||
| Stripping function. | ||
| """ | ||
|
|
||
| markup_stripper: StripMarkupEngine = StripMarkupEngine() | ||
| markup_stripper.feed(untrusted_string) | ||
| strip_one_string: str = markup_stripper.get_data() | ||
| strip_one_string: str = _strip_control_characters(markup_stripper.get_data()) | ||
| markup_stripper = StripMarkupEngine() | ||
| markup_stripper.feed(strip_one_string) | ||
| strip_two_string: str = markup_stripper.get_data() | ||
| strip_two_string: str = _strip_control_characters(markup_stripper.get_data()) | ||
| if strip_one_string == strip_two_string: | ||
| return strip_one_string | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,12 +5,12 @@ | |
|
|
||
| # pylint: disable=missing-module-docstring,fixme | ||
|
|
||
| import unittest | ||
| import io | ||
| import sys | ||
| from io import StringIO | ||
| import unittest | ||
| from typing import Callable | ||
| from unittest import mock | ||
| from strip_markup.strip_markup import main as strip_markup_main | ||
| from strip_markup.strip_markup import MAX_INPUT_BYTES, main as strip_markup_main | ||
|
|
||
|
|
||
| class TestStripMarkupBase(unittest.TestCase): | ||
|
|
@@ -36,8 +36,8 @@ def _test_args( | |
| """ | ||
|
|
||
| args_arr: list[str] = [argv0, *args] | ||
| stdout_buf: StringIO = StringIO() | ||
| stderr_buf: StringIO = StringIO() | ||
| stdout_buf: io.StringIO = io.StringIO() | ||
| stderr_buf: io.StringIO = io.StringIO() | ||
| with ( | ||
| mock.patch.object(sys, "argv", args_arr), | ||
| mock.patch.object(sys, "stdout", stdout_buf), | ||
|
|
@@ -50,6 +50,35 @@ def _test_args( | |
| stdout_buf.close() | ||
| stderr_buf.close() | ||
|
|
||
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def _test_args_failure( | ||
| self, | ||
| main_func: Callable[[], int], | ||
| argv0: str, | ||
| stdout_string: str, | ||
| stderr_string: str, | ||
| args: list[str], | ||
| exit_code: int = 1, | ||
| ) -> None: | ||
| """ | ||
| Executes the provided main function expecting failure output. | ||
| """ | ||
|
|
||
| args_arr: list[str] = [argv0, *args] | ||
| stdout_buf: io.StringIO = io.StringIO() | ||
| stderr_buf: io.StringIO = io.StringIO() | ||
| with ( | ||
| mock.patch.object(sys, "argv", args_arr), | ||
| mock.patch.object(sys, "stdout", stdout_buf), | ||
| mock.patch.object(sys, "stderr", stderr_buf), | ||
| ): | ||
| returned_exit_code: int = main_func() | ||
| self.assertEqual(stdout_buf.getvalue(), stdout_string) | ||
| self.assertEqual(stderr_buf.getvalue(), stderr_string) | ||
| self.assertEqual(returned_exit_code, exit_code) | ||
| stdout_buf.close() | ||
| stderr_buf.close() | ||
|
|
||
|
Comment on lines
+53
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is superfluous, we can just augment the existing |
||
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def _test_stdin( | ||
| self, | ||
|
|
@@ -65,11 +94,12 @@ def _test_stdin( | |
| ensures its output matches an expected value. | ||
| """ | ||
|
|
||
| stdout_buf: StringIO = StringIO() | ||
| stderr_buf: StringIO = StringIO() | ||
| stdin_buf: StringIO = StringIO() | ||
| stdin_buf.write(stdin_string) | ||
| stdin_buf.seek(0) | ||
| stdout_buf: io.StringIO = io.StringIO() | ||
| stderr_buf: io.StringIO = io.StringIO() | ||
| stdin_bytes: io.BytesIO = io.BytesIO() | ||
| stdin_bytes.write(stdin_string.encode("utf-8")) | ||
| stdin_bytes.seek(0) | ||
| stdin_buf: io.TextIOWrapper = io.TextIOWrapper(stdin_bytes, encoding="utf-8") | ||
|
Comment on lines
-68
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit overcomplicated. Earlier I ended up reimplementing this in a way that is simpler and uses TextIOWrappers for all of stdout, stderr, and stdin, so this isn't necessary. |
||
| args_arr: list[str] = [argv0, *args] | ||
| with ( | ||
| mock.patch.object(sys, "argv", args_arr), | ||
|
|
@@ -84,6 +114,44 @@ def _test_stdin( | |
| stdout_buf.close() | ||
| stderr_buf.close() | ||
| stdin_buf.close() | ||
| stdin_bytes.close() | ||
|
|
||
| # pylint: disable=too-many-arguments,too-many-positional-arguments | ||
| def _test_stdin_failure( | ||
| self, | ||
| main_func: Callable[[], int], | ||
| argv0: str, | ||
| stdout_string: str, | ||
| stderr_string: str, | ||
| args: list[str], | ||
| stdin_string: str, | ||
| exit_code: int = 1, | ||
| ) -> None: | ||
| """ | ||
| Executes the provided main function expecting failure output when using stdin. | ||
| """ | ||
|
|
||
| stdout_buf: io.StringIO = io.StringIO() | ||
| stderr_buf: io.StringIO = io.StringIO() | ||
| stdin_bytes: io.BytesIO = io.BytesIO() | ||
| stdin_bytes.write(stdin_string.encode("utf-8")) | ||
| stdin_bytes.seek(0) | ||
| stdin_buf: io.TextIOWrapper = io.TextIOWrapper(stdin_bytes, encoding="utf-8") | ||
| args_arr: list[str] = [argv0, *args] | ||
| with ( | ||
| mock.patch.object(sys, "argv", args_arr), | ||
| mock.patch.object(sys, "stdin", stdin_buf), | ||
| mock.patch.object(sys, "stdout", stdout_buf), | ||
| mock.patch.object(sys, "stderr", stderr_buf), | ||
| ): | ||
| returned_exit_code: int = main_func() | ||
| self.assertEqual(stdout_buf.getvalue(), stdout_string) | ||
| self.assertEqual(stderr_buf.getvalue(), stderr_string) | ||
| self.assertEqual(returned_exit_code, exit_code) | ||
| stdout_buf.close() | ||
| stderr_buf.close() | ||
| stdin_buf.close() | ||
| stdin_bytes.close() | ||
|
Comment on lines
+119
to
+154
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This, again, could be implemented much more simply by augmenting the existing |
||
|
|
||
| def _test_safe_strings( | ||
| self, | ||
|
|
@@ -369,3 +437,85 @@ def test_malicious_markup_strings(self) -> None: | |
| """ | ||
|
|
||
| self._test_malicious_markup_strings(strip_markup_main, self.argv0) | ||
|
|
||
| def test_control_characters_are_removed(self) -> None: | ||
| """ | ||
| Ensure decoded control characters are stripped from output. | ||
| """ | ||
|
|
||
| self._test_args( | ||
| main_func=strip_markup_main, | ||
| argv0=self.argv0, | ||
| stdout_string="control sequence", | ||
| stderr_string="", | ||
| args=["control  sequence"], | ||
| ) | ||
|
|
||
| def test_carriage_return_is_removed(self) -> None: | ||
| """ | ||
| Ensure carriage returns cannot be used for overprinting attacks. | ||
| """ | ||
|
|
||
| self._test_args( | ||
| main_func=strip_markup_main, | ||
| argv0=self.argv0, | ||
| stdout_string="line one overwritten", | ||
| stderr_string="", | ||
| args=["line one overwritten"], | ||
| ) | ||
|
|
||
| def test_rejects_oversized_argument(self) -> None: | ||
| """ | ||
| Ensure overly large arguments are rejected to avoid excessive memory use. | ||
| """ | ||
|
|
||
| oversized_input: str = "x" * (MAX_INPUT_BYTES + 1) | ||
| error_msg: str = ( | ||
| f"strip-markup: input exceeds maximum size of {MAX_INPUT_BYTES} bytes.\n" | ||
| ) | ||
| self._test_args_failure( | ||
| main_func=strip_markup_main, | ||
| argv0=self.argv0, | ||
| stdout_string="", | ||
| stderr_string=error_msg, | ||
| args=[oversized_input], | ||
| ) | ||
|
|
||
| def test_rejects_oversized_stdin(self) -> None: | ||
| """ | ||
| Ensure overly large stdin payloads are rejected. | ||
| """ | ||
|
|
||
| oversized_input: str = "y" * (MAX_INPUT_BYTES + 1) | ||
| error_msg: str = ( | ||
| f"strip-markup: input exceeds maximum size of {MAX_INPUT_BYTES} bytes.\n" | ||
| ) | ||
| self._test_stdin_failure( | ||
| main_func=strip_markup_main, | ||
| argv0=self.argv0, | ||
| stdout_string="", | ||
| stderr_string=error_msg, | ||
| args=[], | ||
| stdin_string=oversized_input, | ||
| ) | ||
|
|
||
| def test_rejects_multibyte_oversized_stdin(self) -> None: | ||
| """ | ||
| Ensure byte-size enforcement applies when stdin includes multibyte characters. | ||
| """ | ||
|
|
||
| multibyte_char: str = "€" | ||
| multibyte_oversized_input: str = multibyte_char * ( | ||
| (MAX_INPUT_BYTES // len(multibyte_char.encode("utf-8"))) + 1 | ||
| ) | ||
| error_msg: str = ( | ||
| f"strip-markup: input exceeds maximum size of {MAX_INPUT_BYTES} bytes.\n" | ||
| ) | ||
| self._test_stdin_failure( | ||
| main_func=strip_markup_main, | ||
| argv0=self.argv0, | ||
| stdout_string="", | ||
| stderr_string=error_msg, | ||
| args=[], | ||
| stdin_string=multibyte_oversized_input, | ||
| ) | ||
|
Comment on lines
+467
to
+521
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As explained earlier, we don't want to reject large inputs. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this to help prevent a DoS when using strip_markup is... questionable at best. We're taking UTF-8 text, running it through an encoder to generate a copy of the entire input in a different format, then taking the length of that generated data? This immediately doubles memory consumption, so if we're worried about receiving multi-gigabyte strings causing a DoS, this is not in our best interest.
I also feel like trying to fight against DoS attacks here is wrong. sanitize_string supports truncating an overlong input, but that's to ensure that the output is a suitable size for passing to other things thereafter. It's not to prevent an attacker from passing massive input data, arguably the caller should be the one ensuring massive input data is rejected early on. (In some instances, like when receiving data from the network, the caller is the only one who can ensure massive input data is properly rejected, trying to defend against it here wouldn't even work.) On the other hand, a user might legitimately want to process a massive file with this or similar tools. Why prevent that?