Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synchronous run and resume functionality + nbrun for runner API #1845

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions metaflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class and related decorators.
# Runner API
if sys.version_info >= (3, 7):
from .runner.metaflow_runner import Runner
from .runner.nbrun import NBRunner

__version_addl__ = []
_ext_debug("Loading top-level modules")
Expand Down
161 changes: 122 additions & 39 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import sys
import time
import asyncio
import tempfile
from typing import Dict, Iterator, Optional, Tuple
from metaflow import Run
Expand Down Expand Up @@ -86,6 +85,20 @@ async def wait(
await self.command_obj.wait(timeout, stream)
return self

@property
def returncode(self) -> Optional[int]:
"""
Gets the returncode of the underlying subprocess that is responsible
for executing the run.

Returns
-------
Optional[int]
The returncode for the subprocess that executes the run.
(None if process is still running)
"""
return self.command_obj.process.returncode

@property
def status(self) -> str:
"""
Expand Down Expand Up @@ -214,32 +227,49 @@ def __init__(
from metaflow.runner.click_api import MetaflowAPI

self.flow_file = flow_file
self.env_vars = os.environ.copy().update(env or {})
self.env_vars = os.environ.copy()
self.env_vars.update(env or {})
if profile:
self.env_vars["METAFLOW_PROFILE"] = profile
self.spm = SubprocessManager()
self.top_level_kwargs = kwargs
self.api = MetaflowAPI.from_cli(self.flow_file, start)
self.runner = self.api(**kwargs).run

def __enter__(self) -> "Runner":
return self

async def __aenter__(self) -> "Runner":
return self

def __exit__(self, exc_type, exc_value, traceback):
self.spm.cleanup()

async def __aexit__(self, exc_type, exc_value, traceback):
self.spm.cleanup()

def run(self, **kwargs) -> ExecutingRun:
def __get_executing_run(self, tfp_pathspec, command_obj):
try:
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=10)
run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)
except TimeoutError as e:
stdout_log = open(command_obj.log_files["stdout"]).read()
stderr_log = open(command_obj.log_files["stderr"]).read()
command = " ".join(command_obj.command)
error_message = "Error executing: '%s':\n" % command
if stdout_log.strip():
error_message += "\nStdout:\n%s\n" % stdout_log
if stderr_log.strip():
error_message += "\nStderr:\n%s\n" % stderr_log
raise RuntimeError(error_message) from e

def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
"""
Synchronous execution of the run. This method will *block* until
the run has completed execution.

Parameters
----------
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default.
They can be accessed later by reading the files present in the
ExecutingRun object (referenced as 'result' below) returned:
- result.stdout
- result.stderr
**kwargs : Any
Additional arguments that you would pass to `python ./myflow.py` after
the `run` command.
Expand All @@ -249,15 +279,53 @@ def run(self, **kwargs) -> ExecutingRun:
ExecutingRun
ExecutingRun object for this run.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
command = self.api(**self.top_level_kwargs).run(
pathspec_file=tfp_pathspec.name, **kwargs
)

try:
result = loop.run_until_complete(self.async_run(**kwargs))
result = loop.run_until_complete(result.wait())
return result
finally:
loop.close()
pid = self.spm.run_command(
[sys.executable, *command], env=self.env_vars, show_output=show_output
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_pathspec, command_obj)

def resume(self, show_output: bool = False, **kwargs):
"""
Synchronous resume execution of the run.
This method will *block* until the resumed run has completed execution.

Parameters
----------
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default.
They can be accessed later by reading the files present in the
ExecutingRun object (referenced as 'result' below) returned:
- result.stdout
- result.stderr
**kwargs : Any
Additional arguments that you would pass to `python ./myflow.py` after
the `resume` command.

Returns
-------
ExecutingRun
ExecutingRun object for this resumed run.
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
command = self.api(**self.top_level_kwargs).resume(
pathspec_file=tfp_pathspec.name, **kwargs
)

pid = self.spm.run_command(
[sys.executable, *command], env=self.env_vars, show_output=show_output
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_pathspec, command_obj)

async def async_run(self, **kwargs) -> ExecutingRun:
"""
Expand All @@ -277,33 +345,48 @@ async def async_run(self, **kwargs) -> ExecutingRun:
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
command = self.api(**self.top_level_kwargs).run(
pathspec_file=tfp_pathspec.name, **kwargs
)

command = self.runner(pathspec_file=tfp_pathspec.name, **kwargs)

pid = await self.spm.run_command(
pid = await self.spm.async_run_command(
[sys.executable, *command], env=self.env_vars
)
command_obj = self.spm.get(pid)

try:
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)
except TimeoutError as e:
stdout_log = open(
command_obj.log_files["stdout"], encoding="utf-8"
).read()
stderr_log = open(
command_obj.log_files["stderr"], encoding="utf-8"
).read()
command = " ".join(command_obj.command)
return self.__get_executing_run(tfp_pathspec, command_obj)

async def async_resume(self, **kwargs):
"""
Asynchronous resume execution of the run.
This method will return as soon as the resume has launched.

Parameters
----------
**kwargs : Any
Additional arguments that you would pass to `python ./myflow.py` after
the `resume` command.

Returns
-------
ExecutingRun
ExecutingRun object for this resumed run.
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
command = self.api(**self.top_level_kwargs).resume(
pathspec_file=tfp_pathspec.name, **kwargs
)

error_message = "Error executing: '%s':\n" % command
pid = await self.spm.async_run_command(
[sys.executable, *command], env=self.env_vars
)
command_obj = self.spm.get(pid)

if stdout_log.strip():
error_message += "\nStdout:\n%s\n" % stdout_log
return self.__get_executing_run(tfp_pathspec, command_obj)

if stderr_log.strip():
error_message += "\nStderr:\n%s\n" % stderr_log
def __exit__(self, exc_type, exc_value, traceback):
self.spm.cleanup()

raise RuntimeError(error_message) from e
async def __aexit__(self, exc_type, exc_value, traceback):
self.spm.cleanup()
131 changes: 131 additions & 0 deletions metaflow/runner/nbrun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import os
import ast
import shutil
import tempfile
from typing import Optional, Dict
from metaflow import Runner

try:
from IPython import get_ipython

ipython = get_ipython()
except ModuleNotFoundError:
print("'nbrun' requires an interactive python environment (such as Jupyter)")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'NBRunner'?



def get_current_cell():
if ipython:
return ipython.history_manager.input_hist_raw[-1]
return None


def format_flowfile(cell):
"""
Formats the given cell content to create a valid Python script that can be executed as a Metaflow flow.
"""
flowspec = [
x
for x in ast.parse(cell).body
if isinstance(x, ast.ClassDef) and any(b.id == "FlowSpec" for b in x.bases)
]

if not flowspec:
raise ModuleNotFoundError(
"The cell doesn't contain any class that inherits from 'FlowSpec'"
)

lines = cell.splitlines()[: flowspec[0].end_lineno]
lines += ["if __name__ == '__main__':", f" {flowspec[0].name}()"]
return "\n".join(lines)


class NBRunner(object):
"""
A class to run Metaflow flows from Jupyter notebook cells.
"""

def __init__(
self,
flow,
profile: Optional[str] = None,
env: Optional[Dict] = None,
base_dir: Optional[str] = None,
**kwargs
):
self.cell = get_current_cell()
self.flow = flow

self.env_vars = os.environ.copy()
self.env_vars.update(env or {})
self.env_vars.update({"JPY_PARENT_PID": ""})
if profile:
self.env_vars["METAFLOW_PROFILE"] = profile

self.base_dir = base_dir

if not self.cell:
raise ValueError("Couldn't find a cell.")

if self.base_dir is None:
# for some reason, using this is much faster
self.tempdir = tempfile.mkdtemp()
else:
self.tempdir = self.base_dir

self.tmp_flow_file = tempfile.NamedTemporaryFile(
prefix=self.flow.__name__,
suffix=".py",
mode="w",
dir=self.tempdir,
delete=False,
)

self.tmp_flow_file.write(format_flowfile(self.cell))
self.tmp_flow_file.flush()
self.tmp_flow_file.close()

self.runner = Runner(
flow_file=self.tmp_flow_file.name,
profile=profile,
env=self.env_vars,
**kwargs
)

def nbrun(self, **kwargs):
result = self.runner.run(show_output=True, **kwargs)
self.runner.spm.cleanup()
return result.run

def nbresume(self, **kwargs):
result = self.runner.resume(show_output=True, **kwargs)
self.runner.spm.cleanup()
return result.run

def cleanup(self):
"""Cleans up the temporary directory used to store the flow script"""
if self.base_dir is None:
shutil.rmtree(self.tempdir, ignore_errors=True)

def run(self, **kwargs):
"""
Runs the flow.
"""
return self.runner.run(**kwargs)

def resume(self, **kwargs):
"""
Resumes the flow.
"""
return self.runner.resume(**kwargs)

async def async_run(self, **kwargs):
"""
Asynchronously runs the flow.
"""
return await self.runner.async_run(**kwargs)

async def async_resume(self, **kwargs):
"""
Asynchronously resumes the flow.
"""
return await self.runner.async_resume(**kwargs)