Skip to content

Commit

Permalink
remove asserts
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster committed May 7, 2024
1 parent 95a9be5 commit 2eeeaed
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
8 changes: 4 additions & 4 deletions job_templates/sag_cse_ccwf_pt/config_fed_client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@
}
}
{
# All tasks prefixed with wf1_ are routed to this ClientControllerExecutor
tasks = ["wf1_*"]
# All tasks prefixed with wf_ are routed to this ClientControllerExecutor
tasks = ["wf_*"]
executor {
id = "client_controller_executor1"
id = "client_controller_executor"
path = "nvflare.app_common.ccwf.client_controller_executor.ClientControllerExecutor"
# ClientControllerExecutor for running controllers on client-side.
args {
# list of controller ids from components to be run in order
controller_id_list = ["sag_ctl", "cse_ctl"]
task_name_prefix = "wf1"
task_name_prefix = "wf"
# persistor used to distribute and save final results for clients
persistor_id = "persistor"
}
Expand Down
2 changes: 1 addition & 1 deletion job_templates/sag_cse_ccwf_pt/config_fed_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
path = "nvflare.app_common.ccwf.server_ctl.ServerSideController"
args {
# the prefix for task names of this workflow
task_name_prefix = "wf1"
task_name_prefix = "wf"
# the maximum amount of time allowed for a client to miss a status report
max_status_report_interval = 300
# policy to choose which client to run the controller logic from
Expand Down
21 changes: 15 additions & 6 deletions nvflare/app_common/ccwf/client_controller_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def start_run(self, fl_ctx: FLContext):
self.persistor = self.engine.get_component(self.persistor_id)
if not isinstance(self.persistor, LearnablePersistor):
self.log_warning(
fl_ctx, f"Persistor {self.persistor_id} must be a Persistor instance, but got {type(self.persistor)}"
fl_ctx, f"Persistor {self.persistor_id} must be a Persistor instance but got {type(self.persistor)}"
)
self.persistor = None

Expand Down Expand Up @@ -240,7 +240,7 @@ def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort

self.update_status(action=f"finished_{controller_id}", error=None, all_done=True)

self.update_status(action=f"finished_start_task", error=None, all_done=True)
self.update_status(action="finished_start_task", error=None, all_done=True)

return make_reply(ReturnCode.OK)

Expand Down Expand Up @@ -288,8 +288,11 @@ def update_status(self, last_round=None, action=None, error=None, all_done=False

def _process_final_result(self, request: Shareable, fl_ctx: FLContext) -> Shareable:
peer_ctx = fl_ctx.get_peer_context()
assert isinstance(peer_ctx, FLContext)
client_name = peer_ctx.get_identity_name()
if peer_ctx:
client_name = peer_ctx.get_identity_name()
else:
self.log_error(fl_ctx, "Request from unknown client")
return make_reply(ReturnCode.BAD_REQUEST_DATA)
result = request.get(Constant.RESULT)

if not result:
Expand Down Expand Up @@ -330,7 +333,10 @@ def is_task_secure(self, fl_ctx: FLContext) -> bool:
def broadcast_final_result(self, result: Learnable, fl_ctx: FLContext):
targets = self.get_config_prop(Constant.RESULT_CLIENTS)

assert isinstance(targets, list)
if not isinstance(targets, list):
self.log_warning(fl_ctx, f"expected targets of result clients to be type list, but got {type(targets)}")
return None

if self.me in targets:
targets.remove(self.me)

Expand Down Expand Up @@ -360,7 +366,10 @@ def broadcast_final_result(self, result: Learnable, fl_ctx: FLContext):
fl_ctx=fl_ctx,
)

assert isinstance(resp, dict)
if not isinstance(resp, dict):
self.log_error(fl_ctx, f"bad response for final result from clients, expected dict but got {type(resp)}")
return

num_errors = 0
for t in targets:
reply = resp.get(t)
Expand Down

0 comments on commit 2eeeaed

Please sign in to comment.