Skip to content

Commit

Permalink
Instead of writing stdout/ stderr to temp files, write to artiact path
Browse files Browse the repository at this point in the history
On the slave service, we are writing stdout and stderr to two separate
temporary files, and later copying the contents of them to its final
path in the artifact directory.

This will not only be slightly more efficient, but it allows us to
implement returning the console output of in-progress builds from
slaves.
  • Loading branch information
TJ Lee committed Jul 1, 2015
1 parent a89619b commit df3ba3c
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 47 deletions.
72 changes: 44 additions & 28 deletions app/project_type/project_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import re
import signal
from subprocess import TimeoutExpired
from subprocess import TimeoutExpired, STDOUT
from tempfile import TemporaryFile
from threading import Event
import time
Expand Down Expand Up @@ -170,7 +170,8 @@ def command_in_project(self, command):
"""
return command

def execute_command_in_project(self, command, extra_environment_vars=None, timeout=None, **popen_kwargs):
def execute_command_in_project(self, command, extra_environment_vars=None, timeout=None, output_file=None,
**popen_kwargs):
"""
Execute a command in the context of the project
Expand All @@ -180,6 +181,9 @@ def execute_command_in_project(self, command, extra_environment_vars=None, timeo
:type extra_environment_vars: dict[str, str]
:param timeout: A maximum number of seconds before the process is terminated, or None for no timeout
:type timeout: int | None
:param output_file: The file to write console output to (both stdout and stderr). If not specified,
will generate a TemporaryFile. This method will close the file.
:type output_file: BufferedRandom | None
:param popen_kwargs: additional keyword arguments to pass through to subprocess.Popen
:type popen_kwargs: dict[str, mixed]
:return: a tuple of (the string output from the command, the exit code of the command)
Expand All @@ -190,17 +194,50 @@ def execute_command_in_project(self, command, extra_environment_vars=None, timeo
self._logger.debug('Executing command in project: {}', command)

# Redirect output to files instead of using pipes to avoid: https://github.com/box/ClusterRunner/issues/57
stdout_file = TemporaryFile()
stderr_file = TemporaryFile()
output_file = output_file if output_file is not None else TemporaryFile()
pipe = Popen_with_delayed_expansion(
command,
shell=True,
stdout=stdout_file,
stderr=stderr_file,
stdout=output_file,
stderr=STDOUT, # Redirect stderr to stdout, as we do not care to distinguish the two.
start_new_session=True, # Starts a new process group (so we can kill it without killing clusterrunner).
**popen_kwargs
)

clusterrunner_error_msgs = self._wait_for_pipe_to_close(pipe, command, timeout)
console_output = self._read_file_contents_and_close(output_file)
exit_code = pipe.returncode

if exit_code != 0:
max_log_length = 300
logged_console_output = console_output
if len(console_output) > max_log_length:
logged_console_output = '{}... (total output length: {})'.format(console_output[:max_log_length],
len(console_output))

# Note we are intentionally not logging at error or warning level here. Interpreting a non-zero return
# code as a failure is context-dependent, so we can't make that determination here.
self._logger.notice(
'Command exited with non-zero exit code.\nCommand: {}\nExit code: {}\nConsole output: {}\n',
command, exit_code, logged_console_output)
else:
self._logger.debug('Command completed with exit code {}.', exit_code)

exit_code = exit_code if exit_code is not None else -1 # Make sure we always return an int.
combined_command_output = '\n'.join([console_output] + clusterrunner_error_msgs)
return combined_command_output, exit_code

def _wait_for_pipe_to_close(self, pipe, command, timeout):
"""
Wait for the pipe to close (after comment completes) or until timeout. If timeout is reached, then
kill the pipe as well as any child processes spawned.
:type pipe: Popen
:type command: str
:type timeout: int | None
:return: the list of error encountered while waiting for the pipe to close.
:rtype: list[str]
"""
clusterrunner_error_msgs = []
command_completed = False
timeout_time = time.time() + (timeout or float('inf'))
Expand Down Expand Up @@ -246,28 +283,7 @@ def execute_command_in_project(self, command, extra_environment_vars=None, timeo
clusterrunner_error_msgs.append(
'ClusterRunner: {} ({}: "{}")'.format(error_message, type(ex).__name__, ex))

stdout, stderr = [self._read_file_contents_and_close(f) for f in [stdout_file, stderr_file]]
exit_code = pipe.returncode

if exit_code != 0:
max_log_length = 300
logged_stdout, logged_stderr = stdout, stderr
if len(stdout) > max_log_length:
logged_stdout = '{}... (total stdout length: {})'.format(stdout[:max_log_length], len(stdout))
if len(stderr) > max_log_length:
logged_stderr = '{}... (total stderr length: {})'.format(stderr[:max_log_length], len(stderr))

# Note we are intentionally not logging at error or warning level here. Interpreting a non-zero return code
# as a failure is context-dependent, so we can't make that determination here.
self._logger.notice(
'Command exited with non-zero exit code.\nCommand: {}\nExit code: {}\nStdout: {}\nStderr: {}\n',
command, exit_code, logged_stdout, logged_stderr)
else:
self._logger.debug('Command completed with exit code {}.', exit_code)

exit_code = exit_code if exit_code is not None else -1 # Make sure we always return an int.
combined_command_output = '\n'.join([stdout, stderr] + clusterrunner_error_msgs)
return combined_command_output, exit_code
return clusterrunner_error_msgs

def _read_file_contents_and_close(self, file):
"""
Expand Down
9 changes: 5 additions & 4 deletions app/slave/subjob_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,15 @@ def _execute_atom_command(self, atomic_command, atom_environment_vars, atom_arti
:rtype: int
"""
fs_util.create_dir(atom_artifact_dir)
# This console_output_file must be opened in 'w+b' mode in order to be interchangeable with the
# TemporaryFile instance that gets instantiated in self._project_type.execute_command_in_project.
console_output_file = open(os.path.join(atom_artifact_dir, Subjob.OUTPUT_FILE), mode='w+b')

start_time = time.time()
output, exit_code = self._project_type.execute_command_in_project(atomic_command, atom_environment_vars)
_, exit_code = self._project_type.execute_command_in_project(atomic_command, atom_environment_vars,
output_file=console_output_file)
elapsed_time = time.time() - start_time

console_output_path = os.path.join(atom_artifact_dir, Subjob.OUTPUT_FILE)
fs_util.write_file(output, console_output_path)

exit_code_output_path = os.path.join(atom_artifact_dir, Subjob.EXIT_CODE_FILE)
fs_util.write_file(str(exit_code) + '\n', exit_code_output_path)

Expand Down
1 change: 0 additions & 1 deletion test/unit/project_type/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ def fake_popen_constructor(command, stdout, stderr, *args, **kwargs):
fake_result = command_to_result_map[command_regex]
break
stdout.read.return_value = fake_result.stdout.encode()
stderr.read.return_value = fake_result.stderr.encode()
return Mock(spec=Popen, returncode=fake_result.return_code)

project_type_popen_patch.side_effect = fake_popen_constructor
Expand Down
23 changes: 11 additions & 12 deletions test/unit/project_type/test_project_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ def _mock_next_tempfile(self, contents):
next_tempfile_mock.read.return_value = contents
self.mock_temporary_files.append(next_tempfile_mock)

def _mock_stdout_and_stderr(self, stdout_content, stderr_content):
self._mock_next_tempfile(contents=stdout_content)
self._mock_next_tempfile(contents=stderr_content)
def _mock_console_output(self, console_output):
self._mock_next_tempfile(contents=console_output)

def test_required_constructor_args_are_correctly_detected_without_defaults(self):
actual_required_args = _FakeEnvWithoutDefaultArgs.required_constructor_argument_names()
Expand Down Expand Up @@ -78,7 +77,7 @@ def test_teardown_build_runs_teardown(self):

def test_execute_command_in_project_does_not_choke_on_weird_command_output(self):
some_weird_output = b'\xbf\xe2\x98\x82' # the byte \xbf is invalid unicode
self._mock_stdout_and_stderr(some_weird_output, b'')
self._mock_console_output(some_weird_output)
self.mock_popen.returncode = 0

project_type = ProjectType()
Expand All @@ -103,7 +102,7 @@ def test_constructor_argument_info_with_blacklist(
self.assertEqual(arg_name in arg_mapping, expected)

def test_calling_kill_subprocesses_will_break_out_of_command_execution_wait_loop(self):
self._mock_stdout_and_stderr(b'fake_output', b'fake_error')
self._mock_console_output(b'fake_output')
self.mock_popen.pid = 55555
self._simulate_hanging_popen_process()

Expand Down Expand Up @@ -132,13 +131,13 @@ def test_command_exiting_normally_will_break_out_of_command_execution_wait_loop(
# Simulate Popen.wait() timing out twice before command completes and returns output.
self.mock_popen.wait.side_effect = [timeout_exc, timeout_exc, expected_return_code]
self.mock_popen.returncode = expected_return_code
self._mock_stdout_and_stderr(b'fake_output', b'fake_error')
self._mock_console_output(b'fake_output')

project_type = ProjectType()
actual_output, actual_return_code = project_type.execute_command_in_project('echo The power is yours!')

self.assertEqual(self.mock_kill.call_count, 0, 'os.killpg should not be called when command exits normally.')
self.assertEqual(actual_output, 'fake_output\nfake_error', 'Output should contain stdout and stderr.')
self.assertEqual(actual_output, 'fake_output', 'Output did not contain expected contents.')
self.assertEqual(actual_return_code, expected_return_code, 'Actual return code should match expected.')
self.assertTrue(all([file.close.called for file in self.mock_temporary_files]),
'All created TemporaryFiles should be closed so that they are removed from the filesystem.')
Expand All @@ -149,14 +148,14 @@ def test_timing_out_will_break_out_of_command_execution_wait_loop_and_kill_subpr
expected_return_code = 1
self._simulate_hanging_popen_process(fake_returncode=expected_return_code)
self.mock_popen.pid = 55555
self._mock_stdout_and_stderr(b'fake output', b'fake error')
self._mock_console_output(b'fake output')

project_type = ProjectType()
actual_output, actual_return_code = project_type.execute_command_in_project(
command='sleep 99', timeout=250)

self.assertEqual(self.mock_kill.call_count, 1, 'os.killpg should be called when execution times out.')
self.assertEqual(actual_output, 'fake output\nfake error', 'Output should contain stdout and stderr.')
self.assertEqual(actual_output, 'fake output', 'Output did not contain expected contents.')
self.assertEqual(actual_return_code, expected_return_code, 'Actual return code should match expected.')
self.assertTrue(all([file.close.called for file in self.mock_temporary_files]),
'All created TemporaryFiles should be closed so that they are removed from the filesystem.')
Expand All @@ -170,7 +169,7 @@ def test_exception_raised_while_waiting_causes_termination_and_adds_error_messag
self.mock_popen.wait.side_effect = [timeout_exc, timeout_exc, value_err_exc, fake_failing_return_code]
self.mock_popen.returncode = fake_failing_return_code
self.mock_popen.pid = 55555
self._mock_stdout_and_stderr(b'', b'')
self._mock_console_output(b'')

project_type = ProjectType()
actual_output, actual_return_code = project_type.execute_command_in_project('echo The power is yours!')
Expand All @@ -184,7 +183,7 @@ def test_exception_raised_while_waiting_for_termination_adds_error_message_to_ou
mock_time.side_effect = [0.0, 100.0, 200.0, 300.0] # time increases by 100 seconds with each loop
fake_failing_return_code = -15
self.mock_popen.pid = 55555
self._mock_stdout_and_stderr(b'', b'')
self._mock_console_output(b'')
exception_message = 'Something terribly horrible just happened!'
self._simulate_hanging_popen_process(
fake_returncode=fake_failing_return_code, wait_exception=ValueError(exception_message))
Expand All @@ -198,7 +197,7 @@ def test_exception_raised_while_waiting_for_termination_adds_error_message_to_ou

def test_failing_exit_code_is_injected_when_no_return_code_available(self):
self.mock_popen.returncode = None # This will happen if we were not able to terminate the process.
self._mock_stdout_and_stderr(b'', b'')
self._mock_console_output(b'')

project_type = ProjectType()
actual_output, actual_return_code = project_type.execute_command_in_project('echo The power is yours!')
Expand Down
6 changes: 4 additions & 2 deletions test/unit/slave/test_subjob_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import Mock
from unittest.mock import Mock, mock_open

from app.slave.subjob_executor import SubjobExecutor
from test.framework.base_unit_test_case import BaseUnitTestCase
Expand Down Expand Up @@ -40,6 +40,7 @@ def test_execute_subjob_passes_correct_build_executor_index_to_execute_command_i
executor._project_type.execute_command_in_project = Mock(return_value=(1, 2))
self.patch('app.slave.subjob_executor.fs_util')
self.patch('app.slave.subjob_executor.shutil')
output_file_mock = self.patch('app.slave.subjob_executor.open', new=mock_open(read_data=''), create=True).return_value
os = self.patch('app.slave.subjob_executor.os')
os.path = Mock()
os.path.join = Mock(return_value='path')
Expand All @@ -56,4 +57,5 @@ def test_execute_subjob_passes_correct_build_executor_index_to_execute_command_i
executor.execute_subjob(build_id=1, subjob_id=2, subjob_artifact_dir='dir', atomic_commands=atomic_commands,
base_executor_index=6)

executor._project_type.execute_command_in_project.assert_called_with('command', expected_env_vars)
executor._project_type.execute_command_in_project.assert_called_with('command', expected_env_vars,
output_file=output_file_mock)

0 comments on commit df3ba3c

Please sign in to comment.