Skip to content

Commit

Permalink
inline runner no longer needs to use sandbox()
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Marin committed Mar 25, 2019
1 parent e5f626f commit 0f0297b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
29 changes: 19 additions & 10 deletions mrjob/inline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from mrjob.util import save_current_environment
from mrjob.util import save_cwd
from mrjob.util import save_sys_path
from mrjob.util import save_sys_std

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,12 +99,18 @@ def _invoke_task_func(self, task_type, step_num, task_num):

# Don't care about pickleability since this runs in the same process
def invoke_task(stdin, stdout, stderr, wd, env):
with save_current_environment(), save_cwd(), save_sys_path():
with save_current_environment(), save_cwd(), save_sys_path(), \
save_sys_std():
# pretend we're running the script in the working dir
os.environ.update(env)
os.chdir(wd)
sys.path = [os.getcwd()] + sys.path

# pretend we've redirected stdin/stdout/stderr
sys.stdin = stdin
sys.stdout = stdout
sys.stderr = stderr

input_uri = None
try:
args = self._args_for_task(step_num, task_type)
Expand All @@ -117,8 +124,6 @@ def invoke_task(stdin, stdout, stderr, wd, env):
args = list(args) + [input_uri, input_uri]

task = self._mrjob_cls(args)
task.sandbox(stdin=stdin, stdout=stdout, stderr=stderr)

task.execute()
except:
# so users can figure out where the exception came from;
Expand Down Expand Up @@ -158,15 +163,19 @@ def _run_step_on_spark(self, step, step_num):
# use abspath() on input URIs before changing working dir
task_args = self._spark_script_args(step_num)

with save_current_environment(), save_cwd(), save_sys_path():
os.environ.update(_fix_env(self._opts['cmdenv']))
os.chdir(wd)
sys.path = [os.getcwd()] + sys.path
with open(stdout_path, 'wb') as stdout, \
open(stderr_path, 'wb') as stderr:
with save_current_environment(), save_cwd(), save_sys_path(), \
save_sys_std():
os.environ.update(_fix_env(self._opts['cmdenv']))
os.chdir(wd)
sys.path = [os.getcwd()] + sys.path

task = self._mrjob_cls(task_args)
task.sandbox(stdout=stdout_path, stderr=stderr_path)
# pretend we redirected stdout and stderr
sys.stdout, sys.stderr = stdout, stderr

task.execute()
task = self._mrjob_cls(task_args)
task.execute()

def _log_cause_of_error(self, ex):
"""Just tell what file we were reading from (since they'll see
Expand Down
23 changes: 23 additions & 0 deletions mrjob/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,29 @@ def save_cwd():
finally:
os.chdir(original_cwd)

@contextmanager
def save_sys_std():
"""Context manager that saves the current values of `sys.stdin`,
`sys.stdout`, and `sys.stderr`, and flushes these filehandles before
and after switching them out."""

stdin, stdout, stderr = sys.stdin, sys.stdout, sys.stderr

try:
sys.stdout.flush()
sys.stderr.flush()

yield

try:
# at this point, sys.stdout/stderr may have been patched
sys.stdout.flush()
sys.stderr.flush()
except:
pass
finally:
sys.stdin, sys.stdout, sys.stderr = stdin, stdout, stderr


@contextmanager
def save_sys_path():
Expand Down

0 comments on commit 0f0297b

Please sign in to comment.