Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simulator end run for all clients #2514

Merged
merged 13 commits into from
Apr 19, 2024
Merged
2 changes: 2 additions & 0 deletions nvflare/private/fed/app/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def define_simulator_parser(simulator_parser):
simulator_parser.add_argument("-t", "--threads", type=int, help="number of parallel running clients")
simulator_parser.add_argument("-gpu", "--gpu", type=str, help="list of GPU Device Ids, comma separated")
simulator_parser.add_argument("-m", "--max_clients", type=int, default=100, help="max number of clients")
parser.add_argument("--end_run_all", default=False, action=argparse.BooleanOptionalAction)


def run_simulator(simulator_args):
Expand All @@ -41,6 +42,7 @@ def run_simulator(simulator_args):
threads=simulator_args.threads,
yhwen marked this conversation as resolved.
Show resolved Hide resolved
gpu=simulator_args.gpu,
max_clients=simulator_args.max_clients,
end_run_all=simulator_args.end_run_all,
)
run_status = simulator.run()

Expand Down
41 changes: 32 additions & 9 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,15 @@

class SimulatorRunner(FLComponent):
def __init__(
self, job_folder: str, workspace: str, clients=None, n_clients=None, threads=None, gpu=None, max_clients=100
self,
job_folder: str,
workspace: str,
clients=None,
n_clients=None,
threads=None,
gpu=None,
max_clients=100,
end_run_all=False,
):
super().__init__()

Expand All @@ -80,6 +88,7 @@ def __init__(
self.threads = threads
self.gpu = gpu
self.max_clients = max_clients
self.end_run_all = end_run_all

self.ask_to_stop = False

Expand Down Expand Up @@ -145,6 +154,7 @@ def setup(self):
self.args.env = os.path.join("config", AppFolderConstants.CONFIG_ENV)
cwd = os.getcwd()
self.args.job_folder = os.path.join(cwd, self.args.job_folder)
self.args.end_run_all = self.end_run_all

if not os.path.exists(self.args.workspace):
os.makedirs(self.args.workspace)
Expand Down Expand Up @@ -519,17 +529,13 @@ def run(self, gpu):
lock = threading.Lock()
timeout = self.kv_list.get("simulator_worker_timeout", 60.0)
for i in range(self.args.threads):
executor.submit(lambda p: self.run_client_thread(*p), [self.args.threads, gpu, lock, i, timeout])
executor.submit(
lambda p: self.run_client_thread(*p), [self.args.threads, gpu, lock, self.args.end_run_all, timeout]
)

# wait for the server and client running thread to finish.
executor.shutdown()

for client in self.federated_clients:
yhwen marked this conversation as resolved.
Show resolved Hide resolved
if client.client_name not in self.end_run_clients:
self.do_one_task(
client, self.args.threads, gpu, lock, timeout=timeout, task_name=RunnerTask.END_RUN
)

except Exception as e:
self.logger.error(f"SimulatorClientRunner run error: {secure_format_exception(e)}")
finally:
Expand All @@ -548,7 +554,7 @@ def _shutdown_client(self, client):
# Ignore the exception for the simulator client shutdown
self.logger.warn(f"Exception happened to client{client.name} during shutdown ")

def run_client_thread(self, num_of_threads, gpu, lock, rank, timeout=60):
def run_client_thread(self, num_of_threads, gpu, lock, end_run_all, timeout=60):
stop_run = False
interval = 1
client_to_run = None # indicates the next client to run
Expand All @@ -571,9 +577,26 @@ def run_client_thread(self, num_of_threads, gpu, lock, rank, timeout=60):
self.end_run_clients.append(end_run_client)

client.simulate_running = False

if end_run_all:
self._end_run_clients(client, gpu, lock, num_of_threads, timeout)
except Exception as e:
self.logger.error(f"run_client_thread error: {secure_format_exception(e)}")

def _end_run_clients(self, client, gpu, lock, num_of_threads, timeout):
yhwen marked this conversation as resolved.
Show resolved Hide resolved
while len(self.end_run_clients) != len(self.federated_clients):
end_run_client = None
with lock:
for client in self.federated_clients:
if client.client_name not in self.end_run_clients and not client.simulate_running:
end_run_client = client
self.end_run_clients.append(end_run_client.client_name)
break
if end_run_client:
end_run_client.simulate_running = True
self.do_one_task(client, num_of_threads, gpu, lock, timeout=timeout, task_name=RunnerTask.END_RUN)
end_run_client.simulate_running = False

def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC):
open_port = get_open_ports(1)[0]
client_workspace = os.path.join(self.args.workspace, SimulatorConstants.JOB_NAME, "app_" + client.client_name)
Expand Down
Loading