Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions packages/engine/src/worker/runner/python/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
from batch import load_dataset


# TODO(permanently?): Keep in sync with PACKAGE_TYPE
def str_from_pkg_type(pkg_type):
return ["init", "context", "state", "output"][pkg_type]


def assert_eq(a, b):
assert a == b, (a, b)

Expand Down Expand Up @@ -71,8 +76,8 @@ def data(self):

class PyPackage:
def __init__(self, fb):
self.type = fb.Type()
self.name = fb.Name()
self.type = str_from_pkg_type(fb.Type())
self.name = fb.Name().decode('utf-8')
# TODO: Instead of using numpy, just replace `Serialized` with
# a string in the flatbuffers file?
self.payload = json_from_np(fb.InitPayload().InnerAsNumpy())
Expand Down
38 changes: 32 additions & 6 deletions packages/engine/src/worker/runner/python/package.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,54 @@
import importlib
from pathlib import Path
import sys
import traceback

import logging

def get_pkg_path(pkg_name, pkg_type):
return Path("../../../simulation/package/{}/packages/{}/package.py".format(
# The engine should be started from the engine's root directory in the repo.
return Path("./src/simulation/package/{}/packages/{}/package.py".format(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this works, it works. But it's something I'd like to revisit at some point, not happy about all of our relative paths and such, especially since things like running Python with -m can affect it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this does rely on running the engine from a specific directory.

pkg_type, pkg_name
))


def load_fns(pkg_name, pkg_type):
# Read code.
path = get_pkg_path(pkg_name, pkg_type)
code = "import ........worker.runner.python.hash_util\n"+path.read_text()

try:
# TODO: Discuss whether there is some significant drawback to
# changing `sys.path` here (vs some more complicated way
# of importing files from parent directories of the
# package file's directory).
code = ("import pathlib\n" +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed in a later PR so it's okay

"import sys\n" +
"sys.path.insert(1, pathlib.Path.cwd()/'worker'/'runner'/'python')\n" +
"import hash_util\n" +
path.read_text())
except IOError:
logging.info("`" + str(path) + "` doesn't exist, possibly intentionally")
return [None, None, None]

# Run code.
pkg_globals = {}
bytecode = compile(code.decode("utf-8"), pkg_name, "exec")
exec(bytecode, pkg_globals)
try:
bytecode = compile(code, pkg_name, "exec")
exec(bytecode, pkg_globals)
except Exception:
# Have to catch generic Exception, because
# package author's code could throw anything
e = str(traceback.format_exception(*sys.exc_info()))
raise RuntimeError(
"Couldn't import package " + str(path) + ": " + e
)

# Extract functions.
fn_names = ["start_experiment", "start_sim", "run_task"]
fns = [(name, pkg_globals.get(name)) for name in fn_names]
fns = [pkg_globals.get(name) for name in fn_names]

# Validate functions.
for (fn_name, fn) in fns:
for (fn_name, fn) in zip(fn_names, fns):
if fn is not None and not callable(fn):
raise Exception(
"Couldn't import package {}: {} should be callable, not {}".format(
Expand Down
14 changes: 9 additions & 5 deletions packages/engine/src/worker/runner/python/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, experiment_id, worker_index):
try:
self.experiment_ctx = self.start_experiment()
except Exception as e: # Have to catch generic Exception
self.handle_runner_error(e, sys.exc_info())
self.handle_runner_error(sys.exc_info())
raise e

def start_experiment(self):
Expand All @@ -36,7 +36,7 @@ def start_experiment(self):
self.pkgs[pkg_id] = pkg = Package(
name=config.name,
pkg_type=config.type,
owned_fields=config.owned_fields
owned_fields=[] # TODO: Propagate `config.owned_fields` here.
)

if pkg.start_experiment is not None:
Expand Down Expand Up @@ -137,7 +137,11 @@ def kill(self):
self.batches.free()
self.messenger = None

def handle_runner_error(self, exc, tb):
# `exc_info` is whatever tuple of info `sys.exc_info()` returned about
# an exception. Calling `sys.exc_info()` inside `handle_runner_error`
# wouldn't work, because `sys.exc_info` can only return info about an
# exception that has just occurred.
def handle_runner_error(self, exc_info):
# User errors definitely need to be sent back to the Rust process, so
# they can be sent further to the user and displayed.

Expand All @@ -149,7 +153,7 @@ def handle_runner_error(self, exc, tb):
# Rust process that a runner error occurred so it immediately knows
# that the runner exited.

error = "Runner error: " + str(traceback.format_exception(type(exc), exc, tb))
error = "Runner error: " + str(traceback.format_exception(*exc_info))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we either pass the destructured args, or can we just type hint?

Or at least a comment saying it comes from the function so you can destructure

logging.error(error) # First make sure the error gets logged; then try to send it.
self.messenger.send_runner_error(error)
time.sleep(2) # Give the Rust process time to receive the error message.
Expand Down Expand Up @@ -202,4 +206,4 @@ def run(self):

except Exception as e:
# Catch generic Exception to make sure it's logged before the runner exits.
self.handle_runner_error(e, sys.exc_info())
self.handle_runner_error(sys.exc_info())
11 changes: 6 additions & 5 deletions packages/engine/src/worker/runner/python/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ def maybe_add_custom_fns(self, to_add, custom_property, pkg):

custom_fns = getattr(self, pkg.type + '_' + custom_property)
for field_name in to_add:
if not pkg.owns_field.get(field_name):
raise RuntimeError(
"Packages can only specify " + custom_property + " for fields they own, not '" +
field_name + "' in " + pkg.name
)
# TODO: Uncomment after propagating owned_fields:
# if not pkg.owns_field.get(field_name):
# raise RuntimeError(
# "Packages can only specify " + custom_property + " for fields they own, not '" +
# field_name + "' in " + pkg.name
# )

if field_name in custom_fns:
raise RuntimeError(
Expand Down