Skip to content

Commit

Permalink
Refactor subprocess mixin to control setting up and tearing down chil… (
Browse files Browse the repository at this point in the history
#894)

The Subprocess mixin can be used to control setting up and tearing down child process. 
The `TestServerBase` will leverage this and it will be re-used by a xDS configuration generator.

Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
  • Loading branch information
KBaichoo committed Aug 12, 2022
1 parent 6f7ec58 commit 9882240
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 12 deletions.
1 change: 1 addition & 0 deletions test/integration/BUILD
Expand Up @@ -37,6 +37,7 @@ py_library(
"integration_test_fixtures.py",
"nighthawk_grpc_service.py",
"nighthawk_test_server.py",
"subprocess_mixin.py",
"utility.py",
],
data = [
Expand Down
21 changes: 9 additions & 12 deletions test/integration/nighthawk_test_server.py
Expand Up @@ -20,6 +20,7 @@
from rules_python.python.runfiles import runfiles

from test.integration.common import IpVersion, NighthawkException
from test.integration.subprocess_mixin import SubprocessMixin


def _substitute_yaml_values(runfiles_instance, obj, params):
Expand Down Expand Up @@ -102,7 +103,7 @@ class _TestCaseWarnErrorIgnoreList(
])


class TestServerBase(object):
class TestServerBase(SubprocessMixin):
"""Base class for running a server in a separate process.
Attributes:
Expand All @@ -127,6 +128,7 @@ def __init__(self, server_binary_path, config_template_path, server_ip, ip_versi
parameters (dict): Supply to provide configuration template parameter replacement values.
tag (str): Supply to get recognizeable output locations.
"""
SubprocessMixin.__init__(self)
assert ip_version != IpVersion.UNKNOWN
self.ip_version = ip_version
self.server_ip = server_ip
Expand Down Expand Up @@ -170,7 +172,7 @@ def _prepareForExecution(self):
dir=self.tmpdir) as tmp:
self._admin_address_path = tmp.name

def _serverThreadRunner(self):
def _argsForSubprocess(self):
args = []
if self.docker_image != "":
# TODO(#383): As of https://github.com/envoyproxy/envoy/commit/e8a2d1e24dc9a0da5273442204ec3cdfad1e7ca8
Expand All @@ -185,12 +187,10 @@ def _serverThreadRunner(self):
self._parameterized_config_path, "-l", "debug", "--base-id", self._instance_id,
"--admin-address-path", self._admin_address_path, "--concurrency", "1"
]
return args

logging.info("Test server popen() args: %s" % str.join(" ", args))
self._server_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = self._server_process.communicate()
logging.info("Process stdout: %s", stdout.decode("utf-8"))
logging.info("Process stderr: %s", stderr.decode("utf-8"))
def _serverThreadRunner(self):
stdout, stderr = super()._serverThreadRunner()
warnings, errors = _extractWarningsAndErrors(stdout.decode() + stderr.decode(),
self._request.node.name,
_TEST_SERVER_WARN_ERROR_IGNORE_LIST)
Expand Down Expand Up @@ -271,8 +271,7 @@ def start(self):
Returns:
Bool: True iff the server started successfully.
"""
self._server_thread.daemon = True
self._server_thread.start()
self.launchSubprocess()
return self._waitUntilServerListening()

def stop(self):
Expand All @@ -282,9 +281,7 @@ def stop(self):
Int: exit code of the server process.
"""
os.remove(self._admin_address_path)
self._server_process.terminate()
self._server_thread.join()
return self._server_process.returncode
return self.stopSubprocess()


class NighthawkTestServer(TestServerBase):
Expand Down
81 changes: 81 additions & 0 deletions test/integration/subprocess_mixin.py
@@ -0,0 +1,81 @@
"""Mixin for managing child subprocess with an additional thread."""

from threading import Thread, Condition
import subprocess
import logging
from abc import ABC, abstractmethod


class SubprocessMixin(ABC):
"""Mixin used to manage launching subprocess using a separate Python thread.
See test_subprocess_captures_stdout and test_subprocess_stop to see usage
of the mixin.
"""

def __init__(self):
"""Create SubprocessMixin."""
self._server_thread = Thread(target=self._serverThreadRunner)
self._server_process = None
self._has_launched = False
self._has_launched_cv = Condition()

@abstractmethod
def _argsForSubprocess(self) -> list[str]:
"""Return the args to launch the subprocess."""
pass

def _serverThreadRunner(self):
"""Routine executed by the python thread that launches the child subprocess.
Derived classes may wish to extend this to use the stdout, stderr of
the child process.
"""
args = self._argsForSubprocess()
logging.info("Test server popen() args: %s" % str.join(" ", args))
self._server_process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
with self._has_launched_cv:
self._has_launched = True
self._has_launched_cv.notify_all()
stdout, stderr = self._server_process.communicate()
logging.info("Process stdout: %s", stdout.decode("utf-8"))
logging.info("Process stderr: %s", stderr.decode("utf-8"))
return stdout, stderr

def launchSubprocess(self):
"""Start the subprocess."""
self._server_thread.daemon = True
self._server_thread.start()

def waitUntilSubprocessLaunched(self):
"""Blocks until the subprocess has launched at least once."""

def hasLaunched():
return self._has_launched

with self._has_launched_cv:
self._has_launched_cv.wait_for(hasLaunched)
assert self._has_launched

def waitForSubprocessNotRunning(self):
"""Wait for the subprocess to not be running assuming it exits."""
if not self._has_launched or not self._server_thread.is_alive():
return
self._server_thread.join()

def hasLaunched():
return self._has_launched

with self._has_launched_cv:
self._has_launched_cv.wait_for(hasLaunched)
assert self._has_launched

def stopSubprocess(self) -> int:
"""Stop the subprocess.
Returns:
Int: exit code of the server process.
"""
self._server_process.terminate()
self._server_thread.join()
return self._server_process.returncode
8 changes: 8 additions & 0 deletions test/integration/unit_tests/BUILD
Expand Up @@ -17,3 +17,11 @@ py_test(
"//test/integration:integration_test_base_lean",
],
)

py_test(
name = "test_subprocess_mixin",
srcs = ["test_subprocess_mixin.py"],
deps = [
"//test/integration:integration_test_base_lean",
],
)
54 changes: 54 additions & 0 deletions test/integration/unit_tests/test_subprocess_mixin.py
@@ -0,0 +1,54 @@
"""Contains unit tests for subprocess_mixin.py."""

import pytest

from test.integration.subprocess_mixin import SubprocessMixin


class TestSubprocessMixin(SubprocessMixin):
"""Helper class for testing the SubprocessMixin."""

def __init__(self, args):
"""Create the TestSubprocessMixin."""
super().__init__()
self.args = args
self.stdout = b''
self.stderr = b''

def _argsForSubprocess(self) -> list[str]:
return self.args

def _serverThreadRunner(self):
self.stdout, self.stderr = super()._serverThreadRunner()


def test_subprocess_captures_stdout():
"""Test the subprocess captures stdout."""
child_process = TestSubprocessMixin(['echo', 'stdout'])
child_process.launchSubprocess()
child_process.waitUntilSubprocessLaunched()
child_process.waitForSubprocessNotRunning()
assert b'stdout' in child_process.stdout


def test_subprocess_captures_stderr():
"""Test the subprocess captures stderr."""
child_process = TestSubprocessMixin(['logger', '--no-act', '-s', 'stderr'])
child_process.launchSubprocess()
child_process.waitUntilSubprocessLaunched()
child_process.waitForSubprocessNotRunning()
assert child_process.stderr != b''


def test_subprocess_stop():
"""Test the subprocess can be stopped."""
child_process = TestSubprocessMixin(['sleep', '120'])
child_process.launchSubprocess()
child_process.waitUntilSubprocessLaunched()
ret_code = child_process.stopSubprocess()
# Non-zero exit is expected as the subprocess should be killed.
assert ret_code != 0


if __name__ == '__main__':
raise SystemExit(pytest.main([__file__, '--assert=plain']))

0 comments on commit 9882240

Please sign in to comment.