diff --git a/metaflow/__init__.py b/metaflow/__init__.py index 961571d9e5..a13d40e7d4 100644 --- a/metaflow/__init__.py +++ b/metaflow/__init__.py @@ -149,6 +149,7 @@ class and related decorators. # Runner API if sys.version_info >= (3, 7): from .runner.metaflow_runner import Runner + from .runner.nbrun import NBRunner __version_addl__ = [] _ext_debug("Loading top-level modules") diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index 43d82233b1..d37aec52f1 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -1,7 +1,6 @@ import os import sys import time -import asyncio import tempfile from typing import Dict, Iterator, Optional, Tuple from metaflow import Run @@ -86,6 +85,20 @@ async def wait( await self.command_obj.wait(timeout, stream) return self + @property + def returncode(self) -> Optional[int]: + """ + Gets the returncode of the underlying subprocess that is responsible + for executing the run. + + Returns + ------- + Optional[int] + The returncode for the subprocess that executes the run. + (None if process is still running) + """ + return self.command_obj.process.returncode + @property def status(self) -> str: """ @@ -214,12 +227,13 @@ def __init__( from metaflow.runner.click_api import MetaflowAPI self.flow_file = flow_file - self.env_vars = os.environ.copy().update(env or {}) + self.env_vars = os.environ.copy() + self.env_vars.update(env or {}) if profile: self.env_vars["METAFLOW_PROFILE"] = profile self.spm = SubprocessManager() + self.top_level_kwargs = kwargs self.api = MetaflowAPI.from_cli(self.flow_file, start) - self.runner = self.api(**kwargs).run def __enter__(self) -> "Runner": return self @@ -227,19 +241,35 @@ def __enter__(self) -> "Runner": async def __aenter__(self) -> "Runner": return self - def __exit__(self, exc_type, exc_value, traceback): - self.spm.cleanup() - - async def __aexit__(self, exc_type, exc_value, traceback): - self.spm.cleanup() - - def run(self, **kwargs) -> ExecutingRun: + def __get_executing_run(self, tfp_pathspec, command_obj): + try: + pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=10) + run_object = Run(pathspec, _namespace_check=False) + return ExecutingRun(self, command_obj, run_object) + except TimeoutError as e: + stdout_log = open(command_obj.log_files["stdout"]).read() + stderr_log = open(command_obj.log_files["stderr"]).read() + command = " ".join(command_obj.command) + error_message = "Error executing: '%s':\n" % command + if stdout_log.strip(): + error_message += "\nStdout:\n%s\n" % stdout_log + if stderr_log.strip(): + error_message += "\nStderr:\n%s\n" % stderr_log + raise RuntimeError(error_message) from e + + def run(self, show_output: bool = False, **kwargs) -> ExecutingRun: """ Synchronous execution of the run. This method will *block* until the run has completed execution. Parameters ---------- + show_output : bool, default False + Suppress the 'stdout' and 'stderr' to the console by default. + They can be accessed later by reading the files present in the + ExecutingRun object (referenced as 'result' below) returned: + - result.stdout + - result.stderr **kwargs : Any Additional arguments that you would pass to `python ./myflow.py` after the `run` command. @@ -249,15 +279,53 @@ def run(self, **kwargs) -> ExecutingRun: ExecutingRun ExecutingRun object for this run. """ - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + with tempfile.TemporaryDirectory() as temp_dir: + tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) + command = self.api(**self.top_level_kwargs).run( + pathspec_file=tfp_pathspec.name, **kwargs + ) - try: - result = loop.run_until_complete(self.async_run(**kwargs)) - result = loop.run_until_complete(result.wait()) - return result - finally: - loop.close() + pid = self.spm.run_command( + [sys.executable, *command], env=self.env_vars, show_output=show_output + ) + command_obj = self.spm.get(pid) + + return self.__get_executing_run(tfp_pathspec, command_obj) + + def resume(self, show_output: bool = False, **kwargs): + """ + Synchronous resume execution of the run. + This method will *block* until the resumed run has completed execution. + + Parameters + ---------- + show_output : bool, default False + Suppress the 'stdout' and 'stderr' to the console by default. + They can be accessed later by reading the files present in the + ExecutingRun object (referenced as 'result' below) returned: + - result.stdout + - result.stderr + **kwargs : Any + Additional arguments that you would pass to `python ./myflow.py` after + the `resume` command. + + Returns + ------- + ExecutingRun + ExecutingRun object for this resumed run. + """ + with tempfile.TemporaryDirectory() as temp_dir: + tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) + command = self.api(**self.top_level_kwargs).resume( + pathspec_file=tfp_pathspec.name, **kwargs + ) + + pid = self.spm.run_command( + [sys.executable, *command], env=self.env_vars, show_output=show_output + ) + command_obj = self.spm.get(pid) + + return self.__get_executing_run(tfp_pathspec, command_obj) async def async_run(self, **kwargs) -> ExecutingRun: """ @@ -277,33 +345,48 @@ async def async_run(self, **kwargs) -> ExecutingRun: """ with tempfile.TemporaryDirectory() as temp_dir: tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) + command = self.api(**self.top_level_kwargs).run( + pathspec_file=tfp_pathspec.name, **kwargs + ) - command = self.runner(pathspec_file=tfp_pathspec.name, **kwargs) - - pid = await self.spm.run_command( + pid = await self.spm.async_run_command( [sys.executable, *command], env=self.env_vars ) command_obj = self.spm.get(pid) - try: - pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5) - run_object = Run(pathspec, _namespace_check=False) - return ExecutingRun(self, command_obj, run_object) - except TimeoutError as e: - stdout_log = open( - command_obj.log_files["stdout"], encoding="utf-8" - ).read() - stderr_log = open( - command_obj.log_files["stderr"], encoding="utf-8" - ).read() - command = " ".join(command_obj.command) + return self.__get_executing_run(tfp_pathspec, command_obj) + + async def async_resume(self, **kwargs): + """ + Asynchronous resume execution of the run. + This method will return as soon as the resume has launched. + + Parameters + ---------- + **kwargs : Any + Additional arguments that you would pass to `python ./myflow.py` after + the `resume` command. + + Returns + ------- + ExecutingRun + ExecutingRun object for this resumed run. + """ + with tempfile.TemporaryDirectory() as temp_dir: + tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False) + command = self.api(**self.top_level_kwargs).resume( + pathspec_file=tfp_pathspec.name, **kwargs + ) - error_message = "Error executing: '%s':\n" % command + pid = await self.spm.async_run_command( + [sys.executable, *command], env=self.env_vars + ) + command_obj = self.spm.get(pid) - if stdout_log.strip(): - error_message += "\nStdout:\n%s\n" % stdout_log + return self.__get_executing_run(tfp_pathspec, command_obj) - if stderr_log.strip(): - error_message += "\nStderr:\n%s\n" % stderr_log + def __exit__(self, exc_type, exc_value, traceback): + self.spm.cleanup() - raise RuntimeError(error_message) from e + async def __aexit__(self, exc_type, exc_value, traceback): + self.spm.cleanup() diff --git a/metaflow/runner/nbrun.py b/metaflow/runner/nbrun.py new file mode 100644 index 0000000000..fde1a2eafe --- /dev/null +++ b/metaflow/runner/nbrun.py @@ -0,0 +1,131 @@ +import os +import ast +import shutil +import tempfile +from typing import Optional, Dict +from metaflow import Runner + +try: + from IPython import get_ipython + + ipython = get_ipython() +except ModuleNotFoundError: + print("'nbrun' requires an interactive python environment (such as Jupyter)") + + +def get_current_cell(): + if ipython: + return ipython.history_manager.input_hist_raw[-1] + return None + + +def format_flowfile(cell): + """ + Formats the given cell content to create a valid Python script that can be executed as a Metaflow flow. + """ + flowspec = [ + x + for x in ast.parse(cell).body + if isinstance(x, ast.ClassDef) and any(b.id == "FlowSpec" for b in x.bases) + ] + + if not flowspec: + raise ModuleNotFoundError( + "The cell doesn't contain any class that inherits from 'FlowSpec'" + ) + + lines = cell.splitlines()[: flowspec[0].end_lineno] + lines += ["if __name__ == '__main__':", f" {flowspec[0].name}()"] + return "\n".join(lines) + + +class NBRunner(object): + """ + A class to run Metaflow flows from Jupyter notebook cells. + """ + + def __init__( + self, + flow, + profile: Optional[str] = None, + env: Optional[Dict] = None, + base_dir: Optional[str] = None, + **kwargs + ): + self.cell = get_current_cell() + self.flow = flow + + self.env_vars = os.environ.copy() + self.env_vars.update(env or {}) + self.env_vars.update({"JPY_PARENT_PID": ""}) + if profile: + self.env_vars["METAFLOW_PROFILE"] = profile + + self.base_dir = base_dir + + if not self.cell: + raise ValueError("Couldn't find a cell.") + + if self.base_dir is None: + # for some reason, using this is much faster + self.tempdir = tempfile.mkdtemp() + else: + self.tempdir = self.base_dir + + self.tmp_flow_file = tempfile.NamedTemporaryFile( + prefix=self.flow.__name__, + suffix=".py", + mode="w", + dir=self.tempdir, + delete=False, + ) + + self.tmp_flow_file.write(format_flowfile(self.cell)) + self.tmp_flow_file.flush() + self.tmp_flow_file.close() + + self.runner = Runner( + flow_file=self.tmp_flow_file.name, + profile=profile, + env=self.env_vars, + **kwargs + ) + + def nbrun(self, **kwargs): + result = self.runner.run(show_output=True, **kwargs) + self.runner.spm.cleanup() + return result.run + + def nbresume(self, **kwargs): + result = self.runner.resume(show_output=True, **kwargs) + self.runner.spm.cleanup() + return result.run + + def cleanup(self): + """Cleans up the temporary directory used to store the flow script""" + if self.base_dir is None: + shutil.rmtree(self.tempdir, ignore_errors=True) + + def run(self, **kwargs): + """ + Runs the flow. + """ + return self.runner.run(**kwargs) + + def resume(self, **kwargs): + """ + Resumes the flow. + """ + return self.runner.resume(**kwargs) + + async def async_run(self, **kwargs): + """ + Asynchronously runs the flow. + """ + return await self.runner.async_run(**kwargs) + + async def async_resume(self, **kwargs): + """ + Asynchronously resumes the flow. + """ + return await self.runner.async_resume(**kwargs) diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index 440084e757..84a6083f6c 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -5,6 +5,7 @@ import shutil import asyncio import tempfile +import threading import subprocess from typing import List, Dict, Optional, Callable, Iterator, Tuple @@ -42,7 +43,44 @@ async def __aenter__(self) -> "SubprocessManager": async def __aexit__(self, exc_type, exc_value, traceback): self.cleanup() - async def run_command( + def run_command( + self, + command: List[str], + env: Optional[Dict[str, str]] = None, + cwd: Optional[str] = None, + show_output: bool = False, + ) -> int: + """ + Run a command synchronously and return its process ID. + + Parameters + ---------- + command : List[str] + The command to run in List form. + env : Optional[Dict[str, str]], default None + Environment variables to set for the subprocess; if not specified, + the current enviornment variables are used. + cwd : Optional[str], default None + The directory to run the subprocess in; if not specified, the current + directory is used. + show_output : bool, default False + Suppress the 'stdout' and 'stderr' to the console by default. + They can be accessed later by reading the files present in the + CommandManager object: + - command_obj.log_files["stdout"] + - command_obj.log_files["stderr"] + Returns + ------- + int + The process ID of the subprocess. + """ + + command_obj = CommandManager(command, env, cwd) + pid = command_obj.run(show_output=show_output) + self.commands[pid] = command_obj + return pid + + async def async_run_command( self, command: List[str], env: Optional[Dict[str, str]] = None, @@ -69,7 +107,7 @@ async def run_command( """ command_obj = CommandManager(command, env, cwd) - pid = await command_obj.run() + pid = await command_obj.async_run() self.commands[pid] = command_obj return pid @@ -80,7 +118,7 @@ def get(self, pid: int) -> Optional["CommandManager"]: Parameters ---------- pid : int - The process ID of the subprocess (returned by run_command). + The process ID of the subprocess (returned by run_command or async_run_command). Returns ------- @@ -144,7 +182,7 @@ async def wait( Wait for the subprocess to finish, optionally with a timeout and optionally streaming its output. - You can only call `wait` if `run` has already been called. + You can only call `wait` if `async_run` has already been called. Parameters ---------- @@ -178,7 +216,79 @@ async def wait( "within %s seconds." % (self.process.pid, command_string, timeout) ) - async def run(self): + def run(self, show_output: bool = False): + """ + Run the subprocess synchronously. This can only be called once. + + This also waits on the process implicitly. + + Parameters + ---------- + show_output : bool, default False + Suppress the 'stdout' and 'stderr' to the console by default. + They can be accessed later by reading the files present in: + - self.log_files["stdout"] + - self.log_files["stderr"] + """ + + if not self.run_called: + self.temp_dir = tempfile.mkdtemp() + stdout_logfile = os.path.join(self.temp_dir, "stdout.log") + stderr_logfile = os.path.join(self.temp_dir, "stderr.log") + + def stream_to_stdout_and_file(pipe, log_file): + with open(log_file, "w") as file: + for line in iter(pipe.readline, ""): + if show_output: + sys.stdout.write(line) + file.write(line) + pipe.close() + + try: + self.process = subprocess.Popen( + self.command, + cwd=self.cwd, + env=self.env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + universal_newlines=True, + ) + + self.log_files["stdout"] = stdout_logfile + self.log_files["stderr"] = stderr_logfile + + self.run_called = True + + stdout_thread = threading.Thread( + target=stream_to_stdout_and_file, + args=(self.process.stdout, stdout_logfile), + ) + stderr_thread = threading.Thread( + target=stream_to_stdout_and_file, + args=(self.process.stderr, stderr_logfile), + ) + + stdout_thread.start() + stderr_thread.start() + + self.process.wait() + + stdout_thread.join() + stderr_thread.join() + + return self.process.pid + except Exception as e: + print("Error starting subprocess: %s" % e) + self.cleanup() + else: + command_string = " ".join(self.command) + print( + "Command '%s' has already been called. Please create another " + "CommandManager object." % command_string + ) + + async def async_run(self): """ Run the subprocess asynchronously. This can only be called once. @@ -357,7 +467,7 @@ async def main(): async with SubprocessManager() as spm: # returns immediately - pid = await spm.run_command(cmd) + pid = await spm.async_run_command(cmd) command_obj = spm.get(pid) print(pid)