Skip to content

Commit

Permalink
Merge pull request securefederatedai#16 from scngupta-dsp/testflow_in…
Browse files Browse the repository at this point in the history
…ternalloops

Test case for validating internal loops
  • Loading branch information
psfoley committed Dec 3, 2022
2 parents d723723 + a7006d0 commit 50707dd
Showing 1 changed file with 251 additions and 0 deletions.
251 changes: 251 additions & 0 deletions tests/github/experimental/testflow_internalloop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Copyright (C) 2020-2022 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from openfl.experimental.interface.fl_spec import FLSpec
from openfl.experimental.interface.participants import Aggregator, Collaborator
from openfl.experimental.interface.fl_spec import LocalRuntime
from openfl.experimental.placement.placement import aggregator, collaborator
import numpy as np


class bcolors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"


class TestFlow_InternalLoop(FLSpec):
def __init__(self, model=None, optimizer=None, rounds=None, **kwargs):
super().__init__(**kwargs)
self.training_rounds = rounds
self.train_count = 0
self.end_count = 0

@aggregator
def start(self):
"""
Flow start.
"""
print(
f"{bcolors.OKBLUE}Testing FederatedFlow - Test for Internal Loops - Round: {self.train_count} of Training Rounds: {self.training_rounds}{bcolors.ENDC}"
)
self.model = np.zeros((10, 10, 10)) # Test model
self.collaborators = self.runtime.collaborators
self.next(self.agg_model_mean, foreach="collaborators")

@collaborator
def agg_model_mean(self):
"""
Calculating the mean of the model created in start.
"""

self.agg_mean_value = np.mean(self.model)
print(
f"<Collab>: {self.input} Mean of Agg model: {self.agg_mean_value} "
)
self.next(self.collab_model_update)

@collaborator
def collab_model_update(self):
"""
Initializing the model with random numbers.
"""
print(f"<Collab>: {self.input} Initializing the model randomly ")
self.model = np.random.randint(1, len(self.input), (10, 10, 10))
self.next(self.local_model_mean)

@collaborator
def local_model_mean(self):
"""
Calculating the mean of the model created in train.
"""
self.local_mean_value = np.mean(self.model)
print(f"<Collab>: {self.input} Local mean: {self.local_mean_value} ")
self.next(self.join)

@aggregator
def join(self, inputs):
"""
Joining inputs from collaborators
"""
self.agg_mean = sum(input.local_mean_value for input in inputs) / len(
inputs
)
print(f"Aggregated mean : {self.agg_mean}")
self.next(self.internal_loop)

@aggregator
def internal_loop(self):
"""
Internally Loop for training rounds
"""
self.train_count = self.train_count + 1
if self.training_rounds == self.train_count:
self.next(self.end)
else:
self.next(self.start)

@aggregator
def end(self):
"""
This is the 'end' step. All flows must have an 'end' step, which is the
last step in the flow.
"""
self.end_count = self.end_count + 1
print(f"This is the end of the flow")


def validate_flow(flow_obj, expected_flow_steps):
"""
Validate:
1. If the given training round were completed
2. If all the steps were executed
3. If each collaborator step was executed
4. If end was executed once
"""
validate_flow_error = [] # List to capture any errors in the flow

from metaflow import Metaflow, Flow, Run, Task, Step

cli_flow_obj = Flow("TestFlow_InternalLoop") # Flow object from CLI
cli_flow_steps = list(cli_flow_obj.latest_run) # Steps from CLI
cli_step_names = [step.id for step in cli_flow_steps]

# 1. If the given training round were completed
if not flow_obj.training_rounds == flow_obj.train_count:
validate_flow_error.append(
f"{bcolors.FAIL}... Error : Number of training completed is not equal to training rounds {bcolors.ENDC} \n"
)

for step in cli_flow_steps:
task_count = 0
func = getattr(flow_obj, step.id)
for task in list(step):
task_count = task_count + 1

# Each aggregator step should be executed for training rounds times
if (func.aggregator_step == True) and (
task_count != flflow.training_rounds
):
validate_flow_error.append(
f"{bcolors.FAIL}... Error : More than one execution detected for Aggregator Step: {step} {bcolors.ENDC} \n"
)

# Each collaborator step should be executed for (training rounds)*(number of collaborator) times
if (func.collaborator_step == True) and (
task_count != len(flow_obj.collaborators) * flflow.training_rounds
):
validate_flow_error.append(
f"{bcolors.FAIL}... Error : Incorrect number of execution detected for Collaborator Step: {step}. Expected: {flflow.training_rounds*len(flow_obj.collaborators)} Actual: {task_count}{bcolors.ENDC} \n"
)

steps_present_in_cli = [
step for step in expected_flow_steps if step in cli_step_names
]
missing_steps_in_cli = [
step for step in expected_flow_steps if step not in cli_step_names
]
extra_steps_in_cli = [
step for step in cli_step_names if step not in expected_flow_steps
]

if len(steps_present_in_cli) != len(expected_flow_steps):
validate_flow_error.append(
f"{bcolors.FAIL}... Error : Number of steps fetched from Datastore through CLI do not match the Expected steps provided {bcolors.ENDC} \n"
)

if len(missing_steps_in_cli) != 0:
validate_flow_error.append(
f"{bcolors.FAIL}... Error : Following steps missing from Datastore: {missing_steps_in_cli} {bcolors.ENDC} \n"
)

if len(extra_steps_in_cli) != 0:
validate_flow_error.append(
f"{bcolors.FAIL}... Error : Following steps are extra in Datastore: {extra_steps_in_cli} {bcolors.ENDC} \n"
)

if not flow_obj.end_count == 1:
validate_flow_error.append(
f"{bcolors.FAIL}... Error : End function called more than one time...{bcolors.ENDC}"
)

if validate_flow_error:
display_validate_errors(validate_flow_error)
raise Exception(f"{bcolors.FAIL}Test for Internal Loop FAILED")
else:
print(
f"""{bcolors.OKGREEN}\n **** Summary of internal flow testing ****
No issues found and below are the tests that ran successfully
1. Number of training completed is equal to training rounds
2. Cli steps and Expected steps are matching
3. Number of tasks are aligned with number of rounds and number of collaborators
4. End function executed one time {bcolors.ENDC}"""
)


def display_validate_errors(validate_flow_error):
"""
Function to display error that is captured during flow test
"""
print("".join(validate_flow_error))


if __name__ == "__main__":

# Setup participants
aggregator = Aggregator()
aggregator.private_attributes = {}

# Setup collaborators with private attributes
collaborator_names = [
"Portland",
"Seattle",
"Chandler",
"Bangalore",
"Delhi",
"Paris",
"London",
"New York",
]
collaborators = [Collaborator(name=name) for name in collaborator_names]

local_runtime = LocalRuntime(
aggregator=aggregator,
collaborators=collaborators
)
print(f"Local runtime collaborators = {local_runtime._collaborators}")

model = None
best_model = None
optimizer = None
top_model_accuracy = 0

flflow = TestFlow_InternalLoop(model, optimizer, 5, checkpoint=True)
flflow.runtime = local_runtime
flflow.run()

# Flow Test Begins
expected_flow_steps = [
"join",
"internal_loop",
"agg_model_mean",
"collab_model_update",
"local_model_mean",
"start",
"end",
] # List to verify expected steps
try:
validate_flow(
flflow, expected_flow_steps
) # Function to validate the internal flow
except Exception as e:
raise e
# Flow Test Ends

0 comments on commit 50707dd

Please sign in to comment.