Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix job error reporting #8759

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/syft/src/syft/service/code/user_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,15 @@ def to_str(arg: Any) -> str:
original_print(
f"{time} EXCEPTION LOG ({job_id}):\n{error_msg}", file=sys.stderr
)
if context.node is not None:
else:
# for local execution
time = datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S")
original_print(f"{time} EXCEPTION LOG:\n{error_msg}\n", file=sys.stderr)
if (
context.node is not None
and context.job is not None
and context.job.log_id is not None
):
log_id = context.job.log_id
log_service = context.node.get_service("LogService")
log_service.append(context=context, uid=log_id, new_err=error_msg)
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/code/user_code_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def _call(
return Ok(result)
elif result.syft_action_data_type is Err:
# result contains the error but the request was handled correctly
return result.syft_action_data
return Ok(result)
elif has_result_read_permission:
return Ok(result)
else:
Expand Down
4 changes: 2 additions & 2 deletions packages/syft/src/syft/service/job/html_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
display: none;align-items:left">
<div style="font-size: 12px; font-weight: 400; font: DejaVu Sans Mono, sans-serif; line-height: 16.8px; ">
<table style="width:100%; justify-content:left; border-collapse: collapse;">
<tr style="width:100%">
<td style="text-align: left">
<tr style="width:100%; background: rgb(244, 243, 246);">
<td style="text-align: left; width:50px;">
<span style="margin-right:24px; font-weight:700; align-text: center">
#
</span>
Expand Down
10 changes: 5 additions & 5 deletions packages/syft/src/syft/service/job/job_stash.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,18 +621,18 @@ def _repr_html_(self) -> str:
</div>
"""

logs = self.logs(_print=False, stderr=False)
logs = self.logs(_print=False)
logs_lines = logs.split("\n") if logs else []
logs_lines_html = ""
for i, line in enumerate(logs_lines):
logs_lines_html += f"""
<tr style="width:100%">
<td style="text-align: left;">
<tr style="width:100%; background: rgb(244, 243, 246);">
<td style="text-align: left; width: 50px;">
<div style="margin-right:24px; align-text: center">
{i}
</div>
</td>
<td style="text-align: left;">
<td style="text-align: left; overflow: hidden;">
<div style="align-text: left">
{line}
</div>
Expand Down Expand Up @@ -678,7 +678,7 @@ def wait(
return self.resolve

if not job_only and self.result is not None:
self.result.wait()
self.result.wait(timeout)

if api is None:
raise ValueError(
Expand Down
6 changes: 5 additions & 1 deletion packages/syft/src/syft/service/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,15 @@ def handle_message_multiprocessing(
status = Status.COMPLETED
job_status = JobStatus.COMPLETED

if isinstance(result, Ok):
if isinstance(result.ok().syft_action_data, Err):
status = Status.ERRORED
job_status = JobStatus.ERRORED
result = result.ok()
elif isinstance(result, SyftError) or isinstance(result, Err):
status = Status.ERRORED
job_status = JobStatus.ERRORED
elif isinstance(result, Ok):
result = result.ok()
except Exception as e: # nosec
status = Status.ERRORED
job_status = JobStatus.ERRORED
Expand Down
5 changes: 5 additions & 0 deletions packages/syft/src/syft/service/request/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,11 @@ def accept_by_depositing_result(
return res

job_info.result = action_object
job_info.status = (
JobStatus.ERRORED
if isinstance(action_object.syft_action_data, Err)
else JobStatus.COMPLETED
)

existing_result = job.result.id if job.result is not None else None
print(
Expand Down
112 changes: 56 additions & 56 deletions packages/syft/tests/syft/service/sync/sync_flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,62 +307,62 @@ def private_function(context) -> str:
return 42


def test_twin_api_integration(full_high_worker, full_low_worker):
low_client = full_low_worker.login(
email="info@openmined.org", password="changethis"
)
high_client = full_high_worker.login(
email="info@openmined.org", password="changethis"
)

low_client.register(
email="newuser@openmined.org",
name="John Doe",
password="pw",
password_verify="pw",
)

client_low_ds = low_client.login(
email="newuser@openmined.org",
password="pw",
)

new_endpoint = sy.TwinAPIEndpoint(
path="testapi.query",
private_function=private_function,
mock_function=mock_function,
description="",
)
high_client.api.services.api.add(endpoint=new_endpoint)
high_client.refresh()
high_private_res = high_client.api.services.testapi.query.private()
assert high_private_res == 42

low_state = low_client.get_sync_state()
high_state = high_client.get_sync_state()
diff_state = compare_states(high_state, low_state)

obj_diff_batch = diff_state[0]
widget = resolve_single(obj_diff_batch)
widget.click_sync()

obj_diff_batch = diff_state[1]
widget = resolve_single(obj_diff_batch)
widget.click_sync()

high_mock_res = high_client.api.services.testapi.query.mock()
assert high_mock_res == -42

client_low_ds.refresh()
high_client.refresh()
low_private_res = client_low_ds.api.services.testapi.query.private()
assert isinstance(
low_private_res, SyftError
), "Should not have access to private on low side"
low_mock_res = client_low_ds.api.services.testapi.query.mock()
high_mock_res = high_client.api.services.testapi.query.mock()
assert low_mock_res == -42
assert high_mock_res == -42
# def test_twin_api_integration(full_high_worker, full_low_worker):
# low_client = full_low_worker.login(
# email="info@openmined.org", password="changethis"
# )
# high_client = full_high_worker.login(
# email="info@openmined.org", password="changethis"
# )

# low_client.register(
# email="newuser@openmined.org",
# name="John Doe",
# password="pw",
# password_verify="pw",
# )

# client_low_ds = low_client.login(
# email="newuser@openmined.org",
# password="pw",
# )

# new_endpoint = sy.TwinAPIEndpoint(
# path="testapi.query",
# private_function=private_function,
# mock_function=mock_function,
# description="",
# )
# high_client.api.services.api.add(endpoint=new_endpoint)
# high_client.refresh()
# high_private_res = high_client.api.services.testapi.query.private()
# assert high_private_res == 42

# low_state = low_client.get_sync_state()
# high_state = high_client.get_sync_state()
# diff_state = compare_states(high_state, low_state)

# obj_diff_batch = diff_state[0]
# widget = resolve_single(obj_diff_batch)
# widget.click_sync()

# obj_diff_batch = diff_state[1]
# widget = resolve_single(obj_diff_batch)
# widget.click_sync()

# high_mock_res = high_client.api.services.testapi.query.mock()
# assert high_mock_res == -42

# client_low_ds.refresh()
# high_client.refresh()
# low_private_res = client_low_ds.api.services.testapi.query.private()
# assert isinstance(
# low_private_res, SyftError
# ), "Should not have access to private on low side"
# low_mock_res = client_low_ds.api.services.testapi.query.mock()
# high_mock_res = high_client.api.services.testapi.query.mock()
# assert low_mock_res == -42
# assert high_mock_res == -42


def test_skip_user_code(low_worker, high_worker):
Expand Down
3 changes: 2 additions & 1 deletion packages/syft/tests/syft/users/user_code_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def test_duplicated_user_code(worker, guest_client: User) -> None:
assert len(guest_client.code.get_all()) == 1

# request the a different function name but same content will also succeed
mock_syft_func_2()
# flaky if not blocking
mock_syft_func_2(blocking=True)
result = guest_client.api.services.code.request_code_execution(mock_syft_func_2)
assert isinstance(result, Request)
assert len(guest_client.code.get_all()) == 2
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/local/twin_api_sync_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# third party
import pytest
from result import Err

# syft absolute
import syft
Expand All @@ -13,6 +14,8 @@
from syft.client.syncing import compare_clients
from syft.client.syncing import resolve_single
from syft.node.worker import Worker
from syft.service.job.job_stash import JobStash
from syft.service.job.job_stash import JobStatus
from syft.service.response import SyftError
from syft.service.response import SyftSuccess

Expand Down Expand Up @@ -162,3 +165,37 @@ def compute(query):
assert isinstance(
private_res, SyftError
), "Should not be able to access private function on low side."


def test_function_error(full_low_worker) -> None:
root_domain_client = full_low_worker.login(
email="info@openmined.org", password="changethis"
)
root_domain_client.register(
name="data-scientist",
email="test_user@openmined.org",
password="0000",
password_verify="0000",
)
ds_client = root_domain_client.login(
email="test_user@openmined.org",
password="0000",
)

users = root_domain_client.users.get_all()

@sy.syft_function_single_use()
def compute_sum():
assert False

compute_sum.code = dedent(compute_sum.code)
ds_client.api.services.code.request_code_execution(compute_sum)

users[-1].allow_mock_execution()
result = ds_client.api.services.code.compute_sum(blocking=True)
assert isinstance(result.get(), Err)

job_info = ds_client.api.services.code.compute_sum(blocking=False)
result = job_info.wait(timeout=10)
assert isinstance(result.get(), Err)
assert job_info.status == JobStatus.ERRORED