Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass arbitrary kwargs from run_docker/run_k8s to run_native #68

Merged
merged 12 commits into from
Mar 24, 2020

Conversation

nbren12
Copy link
Contributor

@nbren12 nbren12 commented Mar 20, 2020

Changing the signature of run_native (e.g. to add an option changing the logging behavior), requires editing 3 other files: run_kubernetes, run_docker, and the __main__.py. This PR allows run_kubernetes and run_docker to pass arbitrary arguments to run_native provided that they are serializable with json. A side effect is that run_kubernetes can now take an actual configuration dictionary rather than a url pointing to one.

It also tidies up the implementation of run_kubernetes especially. run_docker still has some unavoidable logic to do with creating the bind-mounts, but is slightly cleaner now too.

Copy link
Collaborator

@mcgibbon mcgibbon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks neat! We had a video chat about some small changes and future plans, wrote down my notes in the particular spots they were relevant.

@@ -13,7 +14,7 @@


def run_docker(
config_dict_or_location, outdir, docker_image, runfile=None, keyfile=None
config_dict_or_location, outdir, docker_image, runfile=None, keyfile=None, **kwargs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new arg should be explicit here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no new argument right now. that will be added in the future. I guess I could remove this kwargs for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please remove **kwargs for now.

Comment on lines 40 to 44
if isinstance(config_dict_or_location, str):
with fsspec.open(config_dict_or_location) as f:
config_dict = yaml.safe_load(f.read())
else:
config_dict = config_dict_or_location
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split this into a helper function, should be enough to solve this function being a little long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really want to hide this if-statement in a helper function, but I did move the I/O into a helper function called _load_yaml(url).

Comment on lines -43 to -45
config_location = _get_config_args(
config_dict_or_location, config_tempfile, bind_mount_args
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find where config_location is being added to bind_mount_args now, can you point me to it or add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not longer necessary to save the config dict to a temporary file, it is serialized to json by run_native.command as you can see if you print the contents of python_command.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, just be aware that maximum command lengths do exist (though on the order of a hundred KB or so).

@@ -143,6 +123,7 @@ def _get_local_data_bind_mounts(config_dict, bind_mount_args):
def _get_docker_args(docker_args, bind_mount_args, outdir):
bind_mount_args += ["-v", f"{os.path.abspath(outdir)}:{DOCKER_OUTDIR}"]
docker_args += ["--rm", "--user", f"{os.getuid()}:{os.getgid()}"]
return DOCKER_OUTDIR
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to return a global constant in a module-private function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I did it this way so that DOCKER_OUTDIR would essentially become an implementation detail of the _get_docker_args, but not a big deal to change.


outdir_in_docker = _get_docker_args(docker_args, bind_mount_args, outdir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should just use DOCKER_OUTDIR in place of this local variable (which will currently always be equal to DOCKER_OUTDIR) wherever you would use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

Comment on lines +69 to +73
if filesystem._is_local_path(outdir):
warnings.warn(
f"Output directory {outdir} is a local path, so it will not be accessible "
"once the job finishes."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning might not be possible with a command-centric approach, though we could put it instead in the function that generates the command? Or maybe we'll keep this API the same and it will just make use of the new interface. idk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning is specific to k8s, so we wouldn't want it to appear if run_native.command is called for another execution context. I would be fine leaving this since I don't see a problem run_k8s doing some introspection into its arguments. Ultimately, we will probably use run_k8s less as we move to the more general interface for making k8s jobs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this was mostly a note about an issue you'll run into in the future - you'll need a special command for submitting kubernetes fv3gfs runs basically so that you can post this warning.

@@ -117,12 +119,12 @@ def _submit_job(job, namespace):
api.create_namespaced_job(body=job, namespace=namespace)


def _create_job_object(config_location, outdir, docker_image, runfile, kube_config):
def _create_job_object(command, docker_image, kube_config):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!

Comment on lines 24 to 39
def call_via_subprocess(func):
this_module = func.__module__

def main(argv):
import json

args, kwargs = json.loads(argv[1])
func(*args, **kwargs)

def command(*args, **kwargs) -> str:
serialized = json.dumps([args, kwargs])
return ["python", "-m", this_module, serialized]

func.main = main
func.command = command
return func
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neato

args, kwargs = json.loads(argv[1])
func(*args, **kwargs)

def command(*args, **kwargs) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use @functools.wraps to give this function the same call signature as run_native. You may want to figure out how to just modify the call signature without modifying the name or docstring, and see if there are any side-effects I don't know about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the decorator. However, after reading the docs, I don't think functools.wraps does any sort of argument checking. Do you think that feature is important to include, since atm this is not a public API, and we can just assume that run_k8s and run_docker know what they are doing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does add argument checking. Looks good.

Comment on lines +438 to +448
@call_via_subprocess
def dummy_function(*obj):
output.append(hash(obj))

inputs = (1, 2, 3)

# first call is via "main"
serialized_args = dummy_function.command(*inputs)[-1]
argv = [None, serialized_args]
dummy_function.main(argv)
dummy_ans = output.pop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do add the code that modifies the signature of command to be the same as the wrapped function, be sure to test it :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather leave this safety guarantee to another PR unless you think it is very important.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an easy test. just something like with pytest.raises(TypeError): command(bad=None, args=True). I think it's important enough to include, as it can allow weird, hard-to-track bugs if this regresses in a way that isn't intentional.

Copy link
Contributor Author

@nbren12 nbren12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another "comment"

@@ -223,43 +217,6 @@ def maybe_get_file(*args, **kwargs):
pass


@pytest.mark.parametrize(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only tested the "fv3config.yml" bind-mount, which is no longer necessary since the configuration dictionary is serialized and passed to run_native directly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -151,13 +153,6 @@ def _check_toleration(toleration):
assert toleration.value == "climate-sim-pool"


def _check_command(command, outdir, config_location, runfile):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now covered by the run_native tests, since run_kubernetes no longer has primary responsibility for constructing its "command".

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, as long as it's tested.

Comment on lines -43 to -45
config_location = _get_config_args(
config_dict_or_location, config_tempfile, bind_mount_args
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not longer necessary to save the config dict to a temporary file, it is serialized to json by run_native.command as you can see if you print the contents of python_command.

@@ -13,7 +14,7 @@


def run_docker(
config_dict_or_location, outdir, docker_image, runfile=None, keyfile=None
config_dict_or_location, outdir, docker_image, runfile=None, keyfile=None, **kwargs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no new argument right now. that will be added in the future. I guess I could remove this kwargs for now.

Comment on lines 40 to 44
if isinstance(config_dict_or_location, str):
with fsspec.open(config_dict_or_location) as f:
config_dict = yaml.safe_load(f.read())
else:
config_dict = config_dict_or_location
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really want to hide this if-statement in a helper function, but I did move the I/O into a helper function called _load_yaml(url).

@@ -5,7 +5,7 @@

from .. import filesystem
from .._exceptions import DelayedImportError
from ._docker import _get_python_command
from ._native import run_native
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

@@ -26,6 +26,8 @@ def run_kubernetes(
gcp_secret=None,
image_pull_policy="IfNotPresent",
job_labels=None,
submit=True,
**kwargs,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove the kwargs, since this PR doesn't actually add any new ones yet. The stderr/out handling was gonna be a new PR.

Comment on lines +69 to +73
if filesystem._is_local_path(outdir):
warnings.warn(
f"Output directory {outdir} is a local path, so it will not be accessible "
"once the job finishes."
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This warning is specific to k8s, so we wouldn't want it to appear if run_native.command is called for another execution context. I would be fine leaving this since I don't see a problem run_k8s doing some introspection into its arguments. Ultimately, we will probably use run_k8s less as we move to the more general interface for making k8s jobs.

args, kwargs = json.loads(argv[1])
func(*args, **kwargs)

def command(*args, **kwargs) -> str:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the decorator. However, after reading the docs, I don't think functools.wraps does any sort of argument checking. Do you think that feature is important to include, since atm this is not a public API, and we can just assume that run_k8s and run_docker know what they are doing?

Comment on lines +438 to +448
@call_via_subprocess
def dummy_function(*obj):
output.append(hash(obj))

inputs = (1, 2, 3)

# first call is via "main"
serialized_args = dummy_function.command(*inputs)[-1]
argv = [None, serialized_args]
dummy_function.main(argv)
dummy_ans = output.pop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather leave this safety guarantee to another PR unless you think it is very important.

Copy link
Collaborator

@mcgibbon mcgibbon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two remaining small fixes (the first and last comments).

args, kwargs = json.loads(argv[1])
func(*args, **kwargs)

def command(*args, **kwargs) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does add argument checking. Looks good.

@@ -13,7 +14,7 @@


def run_docker(
config_dict_or_location, outdir, docker_image, runfile=None, keyfile=None
config_dict_or_location, outdir, docker_image, runfile=None, keyfile=None, **kwargs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please remove **kwargs for now.

Comment on lines -43 to -45
config_location = _get_config_args(
config_dict_or_location, config_tempfile, bind_mount_args
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, just be aware that maximum command lengths do exist (though on the order of a hundred KB or so).

Comment on lines +69 to +73
if filesystem._is_local_path(outdir):
warnings.warn(
f"Output directory {outdir} is a local path, so it will not be accessible "
"once the job finishes."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this was mostly a note about an issue you'll run into in the future - you'll need a special command for submitting kubernetes fv3gfs runs basically so that you can post this warning.

@@ -223,43 +217,6 @@ def maybe_get_file(*args, **kwargs):
pass


@pytest.mark.parametrize(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -151,13 +153,6 @@ def _check_toleration(toleration):
assert toleration.value == "climate-sim-pool"


def _check_command(command, outdir, config_location, runfile):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, as long as it's tested.

Comment on lines +438 to +448
@call_via_subprocess
def dummy_function(*obj):
output.append(hash(obj))

inputs = (1, 2, 3)

# first call is via "main"
serialized_args = dummy_function.command(*inputs)[-1]
argv = [None, serialized_args]
dummy_function.main(argv)
dummy_ans = output.pop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an easy test. just something like with pytest.raises(TypeError): command(bad=None, args=True). I think it's important enough to include, as it can allow weird, hard-to-track bugs if this regresses in a way that isn't intentional.

@nbren12
Copy link
Contributor Author

nbren12 commented Mar 24, 2020

@mcgibbon FYI, functools wraps doesn't enforce that the signatures match:

>>> from functools import wraps
>>> def f(a,b):
...     pass
...
>>> @wraps(f)
... def g(a):
...     pass
...
>>> g(10)

Copy link
Collaborator

@mcgibbon mcgibbon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Just one stray line which can be removed, then good to merge.

signature = inspect.signature(func)

def main(argv):
import json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import not needed here

@nbren12 nbren12 merged commit bb1c1fa into master Mar 24, 2020
@nbren12 nbren12 deleted the feature/centralize-run-native branch March 24, 2020 19:56
nbren12 added a commit that referenced this pull request Mar 31, 2020
nbren12 added a commit that referenced this pull request Mar 31, 2020
* Update history.rst with changes from #68

* Update HISTORY.rst

Co-Authored-By: Jeremy McGibbon <jeremym@vulcan.com>

* Update HISTORY.rst

Co-Authored-By: Jeremy McGibbon <jeremym@vulcan.com>

Co-authored-by: Jeremy McGibbon <jeremym@vulcan.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants