Skip to content

Commit

Permalink
Merge pull request #168 from LUMC/fixutf8crash
Browse files Browse the repository at this point in the history
Fix crash when stderr bytes is not properly not aligned with the encoding
  • Loading branch information
rhpvorderman committed Jan 13, 2023
2 parents b823378 + 8005c66 commit 73d90e3
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 17 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Expand Up @@ -7,6 +7,12 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.
version 2.1.0-dev
---------------------------
+ Fixed a bug where pytest-workflow would crash on logs that used non-ASCII
characters where the chunk of size ``--stderr-bytes`` did not properly align
with the used encoding.

version 2.0.0
---------------------------
This major release greatly cleans up the output of pytest-workflow in case of
Expand Down
42 changes: 28 additions & 14 deletions src/pytest_workflow/plugin.py
Expand Up @@ -30,7 +30,8 @@
from .content_tests import ContentTestCollector
from .file_tests import FileTestCollector
from .schema import WorkflowTest, workflow_tests_from_schema
from .util import duplicate_tree, is_in_dir, replace_whitespace
from .util import (decode_unaligned, duplicate_tree, is_in_dir,
replace_whitespace)
from .workflow import Workflow, WorkflowQueue


Expand Down Expand Up @@ -450,7 +451,10 @@ def collect(self):
tests += [ExitCodeTest.from_parent(
parent=self,
workflow=workflow,
stderr_bytes=self.config.getoption("stderr_bytes"))]
stderr_bytes=self.config.getoption("stderr_bytes"),
stdout_encoding=self.workflow_test.stdout.encoding,
stderr_encoding=self.workflow_test.stderr.encoding,
)]

tests += [
FileTestCollector.from_parent(
Expand All @@ -476,11 +480,16 @@ def collect(self):

class ExitCodeTest(pytest.Item):
def __init__(self, parent: pytest.Collector,
workflow: Workflow, stderr_bytes: int):
workflow: Workflow,
stderr_bytes: int,
stdout_encoding: Optional[str] = None,
stderr_encoding: Optional[str] = None):
name = f"exit code should be {workflow.desired_exit_code}"
super().__init__(name, parent=parent)
self.stderr_bytes = stderr_bytes
self.workflow = workflow
self.stdout_encoding = stdout_encoding
self.stderr_encoding = stderr_encoding

def runtest(self):
# workflow.exit_code waits for workflow to finish.
Expand All @@ -489,16 +498,21 @@ def runtest(self):
def repr_failure(self, excinfo, style=None):
standerr = self.workflow.stderr_file
standout = self.workflow.stdout_file
with open(standout, "rb") as standout_file, \
open(standerr, "rb") as standerr_file:
if os.path.getsize(standerr) >= self.stderr_bytes:
standerr_file.seek(-self.stderr_bytes, os.SEEK_END)

with open(standout, "rb") as standout_file:
if os.path.getsize(standout) >= self.stderr_bytes:
standout_file.seek(-self.stderr_bytes, os.SEEK_END)
message = (f"'{self.workflow.name}' exited with exit code " +
f"'{self.workflow.exit_code}' instead of "
f"'{self.workflow.desired_exit_code}'.\nstderr: "
f"{standerr_file.read().strip().decode('utf-8')}"
f"\nstdout: "
f"{standout_file.read().strip().decode('utf-8')}")
return message
stdout_text = decode_unaligned(standout_file.read().strip(),
encoding=self.stdout_encoding)
with open(standerr, "rb") as standerr_file:
if os.path.getsize(standerr) >= self.stderr_bytes:
standerr_file.seek(-self.stderr_bytes, os.SEEK_END)
stderr_text = decode_unaligned(standerr_file.read().strip(),
encoding=self.stderr_encoding)

return (
f"'{self.workflow.name}' exited with exit code " +
f"'{self.workflow.exit_code}' instead of "
f"'{self.workflow.desired_exit_code}'.\n"
f"stderr: {stderr_text}\n"
f"stdout: {stdout_text}")
16 changes: 15 additions & 1 deletion src/pytest_workflow/util.py
Expand Up @@ -7,7 +7,7 @@
import sys
import warnings
from pathlib import Path
from typing import Callable, Iterator, List, Set, Tuple, Union
from typing import Callable, Iterator, List, Optional, Set, Tuple, Union

Filepath = Union[str, os.PathLike]

Expand Down Expand Up @@ -209,3 +209,17 @@ def file_md5sum(filepath: Path, block_size=64 * 1024) -> str:
for block in iter(lambda: file_handler.read(block_size), b''):
hasher.update(block)
return hasher.hexdigest()


def decode_unaligned(data: bytes, encoding: Optional[str] = None):
if encoding is None:
encoding = sys.getdefaultencoding()
for offset in range(4):
try:
decoded = data[offset:].decode(encoding=encoding, errors="strict")
return decoded
except UnicodeDecodeError:
continue
# When no return happens in the loop, decode again. This will throw an
# error that is not caught and shown to the user.
return data.decode(encoding=encoding)
14 changes: 14 additions & 0 deletions tests/test_miscellaneous_crashes.py
Expand Up @@ -14,6 +14,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with pytest-workflow. If not, see <https://www.gnu.org/licenses/

import textwrap

from pytest import ExitCode

from .test_success_messages import SIMPLE_ECHO


Expand All @@ -27,3 +31,13 @@ def test_same_name_different_files(pytester):
conflicting_message = (
"Conflicting tests: test_b.yml::simple echo, test_a.yml::simple echo.")
assert conflicting_message in result.stdout.str()


def test_non_ascii_logs_stderr_bytes(pytester):
test = textwrap.dedent("""
- name: print non-ascii
command: bash -c 'printf èèèèèèèèè && exit 1'
""")
pytester.makefile(".yml", test_non_ascii=test)
result = pytester.runpytest("--stderr-bytes", "7")
assert result.ret == ExitCode.TESTS_FAILED
23 changes: 21 additions & 2 deletions tests/test_utils.py
Expand Up @@ -14,16 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with pytest-workflow. If not, see <https://www.gnu.org/licenses/
import hashlib
import itertools
import os
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path

import pytest

from pytest_workflow.util import duplicate_tree, file_md5sum, \
git_check_submodules_cloned, git_root, \
from pytest_workflow.util import decode_unaligned, duplicate_tree, \
file_md5sum, git_check_submodules_cloned, git_root, \
is_in_dir, link_tree, replace_whitespace

WHITESPACE_TESTS = [
Expand Down Expand Up @@ -227,3 +229,20 @@ def test_duplicate_git_tree_submodule_symlinks(git_repo_with_submodules):
assert link.exists()
assert link.is_symlink()
assert link.resolve() == dest / "bird" / "sub"


@pytest.mark.parametrize(["offset", "encoding"],
list(itertools.product(
range(4), (None, "utf-8", "utf-16", "utf-32"))
))
def test_decode_unaligned(offset, encoding):
string = "èèèèèèèèèèè"
data = string.encode(encoding or sys.getdefaultencoding())
decoded = decode_unaligned(data[offset:], encoding)
assert string.endswith(decoded)


def test_decode_unaligned_wrong_encoding_throws_error():
data = "hello".encode("utf-8")
with pytest.raises(UnicodeDecodeError):
decode_unaligned(data, "utf-32-le")

0 comments on commit 73d90e3

Please sign in to comment.