Skip to content

Commit

Permalink
[dagster-shell] Fix deadlock with large output (#6435) (#6490)
Browse files Browse the repository at this point in the history
  • Loading branch information
kbd committed Feb 24, 2022
1 parent 7495fea commit f700471
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
45 changes: 22 additions & 23 deletions python_modules/libraries/dagster-shell/dagster_shell/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from dagster import check
from dagster.utils import safe_tempfile_path

OUTPUT_LOGGING_OPTIONS = ["STREAM", "BUFFER", "NONE"]


def execute_script_file(shell_script_path, output_logging, log, cwd=None, env=None):
"""Execute a shell script file specified by the argument ``shell_command``. The script will be
Expand Down Expand Up @@ -55,6 +57,9 @@ def execute_script_file(shell_script_path, output_logging, log, cwd=None, env=No
check.opt_str_param(cwd, "cwd", default=os.path.dirname(shell_script_path))
env = check.opt_dict_param(env, "env")

if output_logging not in OUTPUT_LOGGING_OPTIONS:
raise Exception("Unrecognized output_logging %s" % output_logging)

def pre_exec():
# Restore default signal disposition and invoke setsid
for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"):
Expand All @@ -70,43 +75,37 @@ def pre_exec():
# pylint: disable=subprocess-popen-preexec-fn
sub_process = None
try:
stdout_pipe = PIPE
stderr_pipe = STDOUT
if output_logging == "NONE":
stdout_pipe = stderr_pipe = None

sub_process = Popen(
["bash", shell_script_path],
stdout=PIPE,
stderr=STDOUT,
stdout=stdout_pipe,
stderr=stderr_pipe,
cwd=cwd,
env=env,
preexec_fn=pre_exec,
encoding="UTF-8",
)

log.info(f"Command pid: {sub_process.pid}")

# Will return the string result of reading stdout of the shell command
output = ""

if output_logging not in ["STREAM", "BUFFER", "NONE"]:
raise Exception("Unrecognized output_logging %s" % output_logging)

# Stream back logs as they are emitted
if output_logging == "STREAM":
for raw_line in iter(sub_process.stdout.readline, b""):
line = raw_line.decode("utf-8")
# Stream back logs as they are emitted
lines = []
for line in sub_process.stdout:
log.info(line.rstrip())
output += line

sub_process.wait()

# Collect and buffer all logs, then emit
if output_logging == "BUFFER":
output = "".join(
[raw_line.decode("utf-8") for raw_line in iter(sub_process.stdout.readline, b"")]
)
lines.append(line)
output = "".join(lines)
elif output_logging == "BUFFER":
# Collect and buffer all logs, then emit
output, _ = sub_process.communicate()
log.info(output)

# no logging in this case
elif output_logging == "NONE":
pass

sub_process.wait()
log.info("Command exited with return code {retcode}".format(retcode=sub_process.returncode))

return output, sub_process.returncode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,36 @@ def test_execute_file(tmp_file):
assert retcode == 0


def test_execute_file_large_buffered_output(tmp_file):
large_string = "0123456789" * (6600) # bigger than 2**16 buffer
with tmp_file(f"echo -n {large_string}") as (tmp_path, tmp_file):
output, retcode = execute_script_file(
tmp_file, output_logging="BUFFER", log=logging, cwd=tmp_path
)
assert retcode == 0
assert output == large_string


def test_execute_file_large_output_no_logging(tmp_file):
large_string = "0123456789" * (6600) # bigger than 2**16 buffer
with tmp_file(f"echo -n {large_string}") as (tmp_path, tmp_file):
output, retcode = execute_script_file(
tmp_file, output_logging="NONE", log=logging, cwd=tmp_path
)
assert retcode == 0
assert output == ""


def test_execute_file_large_line_stream_output(tmp_file):
large_string = "0123456789" * (100000) # one giant line > 2**16 buffer
with tmp_file(f"echo -n {large_string}") as (tmp_path, tmp_file):
output, retcode = execute_script_file(
tmp_file, output_logging="STREAM", log=logging, cwd=tmp_path
)
assert retcode == 0
assert output == large_string


def test_env(tmp_file):
cmd = "echo $TEST_VAR"
res, retcode = execute(
Expand Down

0 comments on commit f700471

Please sign in to comment.