Skip to content

Commit

Permalink
synchronous run with logs
Browse files Browse the repository at this point in the history
  • Loading branch information
madhur-ob committed May 23, 2024
1 parent 790e270 commit 4d6460a
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 19 deletions.
35 changes: 22 additions & 13 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import sys
import time
import asyncio
import tempfile
from typing import Dict, Iterator, Optional, Tuple
from metaflow import Run
Expand Down Expand Up @@ -210,7 +209,6 @@ def __init__(
self.env_vars["METAFLOW_PROFILE"] = profile
self.spm = SubprocessManager()
self.api = MetaflowAPI.from_cli(self.flow_file, start)
self.runner = self.api(**kwargs).run

def __enter__(self) -> "Runner":
return self
Expand Down Expand Up @@ -240,15 +238,27 @@ def run(self, **kwargs) -> ExecutingRun:
ExecutingRun
ExecutingRun object for this run.
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
command = self.api.run(pathspec_file=tfp_pathspec.name, **kwargs)

try:
result = loop.run_until_complete(self.async_run(**kwargs))
result = loop.run_until_complete(result.wait())
return result
finally:
loop.close()
pid = self.spm.run_command([sys.executable, *command], env=self.env_vars)
command_obj = self.spm.get(pid)

try:
pathspec = read_from_file_when_ready(tfp_pathspec.name, timeout=5)
run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)
except TimeoutError as e:
stdout_log = open(command_obj.log_files["stdout"]).read()
stderr_log = open(command_obj.log_files["stderr"]).read()
command = " ".join(command_obj.command)
error_message = "Error executing: '%s':\n" % command
if stdout_log.strip():
error_message += "\nStdout:\n%s\n" % stdout_log
if stderr_log.strip():
error_message += "\nStderr:\n%s\n" % stderr_log
raise RuntimeError(error_message) from e

async def async_run(self, **kwargs) -> ExecutingRun:
"""
Expand All @@ -268,10 +278,9 @@ async def async_run(self, **kwargs) -> ExecutingRun:
"""
with tempfile.TemporaryDirectory() as temp_dir:
tfp_pathspec = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
command = self.api.run(pathspec_file=tfp_pathspec.name, **kwargs)

command = self.runner(pathspec_file=tfp_pathspec.name, **kwargs)

pid = await self.spm.run_command(
pid = await self.spm.async_run_command(
[sys.executable, *command], env=self.env_vars
)
command_obj = self.spm.get(pid)
Expand Down
122 changes: 116 additions & 6 deletions metaflow/runner/subprocess_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import shutil
import asyncio
import tempfile
import threading
import subprocess
from typing import List, Dict, Optional, Callable, Iterator, Tuple

Expand Down Expand Up @@ -42,7 +43,44 @@ async def __aenter__(self) -> "SubprocessManager":
async def __aexit__(self, exc_type, exc_value, traceback):
self.cleanup()

async def run_command(
def run_command(
self,
command: List[str],
env: Optional[Dict[str, str]] = None,
cwd: Optional[str] = None,
show_output: bool = False,
) -> int:
"""
Run a command synchronously and return its process ID.
Parameters
----------
command : List[str]
The command to run in List form.
env : Optional[Dict[str, str]], default None
Environment variables to set for the subprocess; if not specified,
the current enviornment variables are used.
cwd : Optional[str], default None
The directory to run the subprocess in; if not specified, the current
directory is used.
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default.
They can be accessed later by reading the files present in the
CommandManager object:
- command_obj.log_files["stdout"]
- command_obj.log_files["stderr"]
Returns
-------
int
The process ID of the subprocess.
"""

command_obj = CommandManager(command, env, cwd)
pid = command_obj.run(show_output=show_output)
self.commands[pid] = command_obj
return pid

async def async_run_command(
self,
command: List[str],
env: Optional[Dict[str, str]] = None,
Expand All @@ -69,7 +107,7 @@ async def run_command(
"""

command_obj = CommandManager(command, env, cwd)
pid = await command_obj.run()
pid = await command_obj.async_run()
self.commands[pid] = command_obj
return pid

Expand All @@ -80,7 +118,7 @@ def get(self, pid: int) -> Optional["CommandManager"]:
Parameters
----------
pid : int
The process ID of the subprocess (returned by run_command).
The process ID of the subprocess (returned by run_command or async_run_command).
Returns
-------
Expand Down Expand Up @@ -144,7 +182,7 @@ async def wait(
Wait for the subprocess to finish, optionally with a timeout
and optionally streaming its output.
You can only call `wait` if `run` has already been called.
You can only call `wait` if `async_run` has already been called.
Parameters
----------
Expand Down Expand Up @@ -178,7 +216,79 @@ async def wait(
"within %s seconds." % (self.process.pid, command_string, timeout)
)

async def run(self):
def run(self, show_output: bool = False):
"""
Run the subprocess synchronously. This can only be called once.
This also waits on the process implicitly.
Parameters
----------
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default.
They can be accessed later by reading the files present in:
- self.log_files["stdout"]
- self.log_files["stderr"]
"""

if not self.run_called:
self.temp_dir = tempfile.mkdtemp()
stdout_logfile = os.path.join(self.temp_dir, "stdout.log")
stderr_logfile = os.path.join(self.temp_dir, "stderr.log")

def stream_to_stdout_and_file(pipe, log_file):
with open(log_file, "w") as file:
for line in iter(pipe.readline, ""):
if show_output:
sys.stdout.write(line)
file.write(line)
pipe.close()

try:
self.process = subprocess.Popen(
self.command,
cwd=self.cwd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)

self.log_files["stdout"] = stdout_logfile
self.log_files["stderr"] = stderr_logfile

self.run_called = True

stdout_thread = threading.Thread(
target=stream_to_stdout_and_file,
args=(self.process.stdout, stdout_logfile),
)
stderr_thread = threading.Thread(
target=stream_to_stdout_and_file,
args=(self.process.stderr, stderr_logfile),
)

stdout_thread.start()
stderr_thread.start()

self.process.wait()

stdout_thread.join()
stderr_thread.join()

return self.process.pid
except Exception as e:
print("Error starting subprocess: %s" % e)
self.cleanup()
else:
command_string = " ".join(self.command)
print(
"Command '%s' has already been called. Please create another "
"CommandManager object." % command_string
)

async def async_run(self):
"""
Run the subprocess asynchronously. This can only be called once.
Expand Down Expand Up @@ -357,7 +467,7 @@ async def main():

async with SubprocessManager() as spm:
# returns immediately
pid = await spm.run_command(cmd)
pid = await spm.async_run_command(cmd)
command_obj = spm.get(pid)

print(pid)
Expand Down

0 comments on commit 4d6460a

Please sign in to comment.