Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] asynchronous training strategies #225

Merged
merged 4 commits into from
Jul 17, 2022

Conversation

xieyxclack
Copy link
Collaborator

@xieyxclack xieyxclack commented Jul 11, 2022

Some asynchronous training strategies are provided in FederatedScope, together with several necessary modifications.

Resource information

  • We adopt the device information provided in FedScale to simulate clients' communication and computation ability. The file can be downloaded from here. And users can specify federate.resource_info_file=PATH_TO_DEIVCE_FILE to adopt the device information.
  • The device information would be randomly assigned to the server and clients. For federate.mode=standalone, the clients' device information would be directly exposed to the server at FedRunner for efficient simulation; for federate.mode=distributed, clients would send the device information to the server during the building process via join_in_info. (Please specify federate.join_in_info = ['client_resource']).
  • The device information would be used to calculate the time cost, and group sampler (in this pr) in this version.

Timestamp

  • Add a new attribute timestamp to Message. By default, the value of timestamp is 0.
  • The clients update the timestamp as the received one plus the execution time of local training and communication, which is estimated by the device information. And we ignore the time cost at the server.
  • The server handles the received messages in the order of their timestamps (we adopt a message cache at fed_runner.py to implement such mechanism), and lets the next broadcast inherit the timestamp from the message that triggers it.

Asynchronous Training Strategies

With the above features, we implement several asynchronous training strategies in FederatedScope, which bring some unique behaviors as follows:

  • Aggregator: 'goal_achieved' or 'time_up': (asyn.aggregator)
    • 'goal_achieved': perform aggregation when the defined number of feedback has been received;
    • 'time_up': perform aggregation when the allocated time budget has been run out;
  • Broadcast manner: 'after_aggregating' or 'after_receiving': (asyn.broadcast_manner)
    • 'after_aggregating': broadcast the up-to-date global model after performing federated aggregation;
    • 'after_receiving': broadcast the up-to-date global model after receiving the model update from clients;
  • Staleness toleration and discount: (asyn.staleness_toleration, asyn.staleness_discount_factor):
    • A threshold value for tolerate the staled updates from the clients;
    • The staled updates would be discounted during the fedeatred aggregation; (at aggregator.py)

Others:

  • Fix Confusing server identifier #110 : Only preserve the server identifier in logger when it needs to used for parsing results
  • Merge test data: For efficient simulation, clients' test data are merged together and exposed to the server at fed_runner.py. Therefore we can execute the evaluation at the server rather than at every client (which can be very intolerably slow when the client_num is large). Users can specify federate.merge_test_data=True to adopt such mechanism.
  • Re-format the code

ToDo:

  • Implement asynchronous training in distributed mode
  • Share local model for efficient simulation


self.msg_buffer['train'][round][sender] = content
if round >= self.state - self.staleness_toleration:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a subclass of Server, is it possible to inherit this piece of code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.staleness_toleration would be set to 0 when asyn.use=False, as shown in line145-147 in server.py

avg_model = self._para_weighted_avg(models, recover_fun=recover_fun)

return avg_model
staleness = [x[1]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that, in most cases, each aggregator class corresponds to a specific fl optimization algo. Thus, I am wondering is it better to make this class dedicated to FedAvg and define another specific asyncFedAvg/FedBuff/sth. else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented AsynClientsAvgAggregator for such a purpose

@@ -611,3 +611,17 @@ def merge_data(all_data):
merged_data[d_name][elem_name])

return merged_data


def merge_test_data(all_data):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide docstring for this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


merged_data = dict()
merged_data['test'] = all_data[1]['test']
num_of_sample_per_client = [len(all_data[1]['test'].dataset)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayrayraykk Is this general enough? e.g., for node classification dataset

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since global eval is True in node/link tasks, we do not need set federate.merge_test_data=True. But if global eval is False in node/link tasks, federate.merge_test_data=True might raise errors.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the format of the test set in graph federated learning is quiet different from others

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If node/link-level tasks are exceptional, we'd better exclude them in assertions about merge_test_data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reused merge_data in this pr, which would raise NotImplementedError for node/link-level tasks

def get_device_info(filename):
if filename is None or not os.path.exists(filename):
logger.info('The device information file is not provided')
print(filename)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used for testing and I have removed it

# model after performing federated aggregation;
# 'after_receiving': broadcast the up-to-date global model after receiving
# the model update from clients
cfg.asyn.overselection = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my knowledge, overselection strategy also works for sync mode.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since overselection should be used together with staledness_toleration in this version, and sync+overselection can be regarded as a specific case of asynchronous: sampled_num >=aggregation goal, toleration threshold=0 and broadcast after aggregation

@@ -41,6 +44,9 @@ def __init__(self,
self.gpu_manager = GPUManager(gpu_available=self.cfg.use_gpu,
specified_device=self.cfg.device)

# get device information
self.device_info = get_device_info(config.federate.device_info_file)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks confusing. device always makes me think about gpu and cpu.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change device_info to resource_info

heapq.heappush(server_msg_cache, msg)
else:
self._handle_msg(msg)
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no chance to enter this branch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify the while loop

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of codes involve too many conditional branches (from lines 186 to 261). It would be better if we could extract and organize them into other sub-function(s) to improve the readability

Copy link
Collaborator

@joneswong joneswong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better make the data type and format more clear.

"""
Set up the server
"""
self.server_id = 0
if self.mode == 'standalone':
if self.server_id in self.data:
if self.cfg.federate.merge_test_data:
from federatedscope.core.auxiliaries.data_builder import \
Copy link
Collaborator

@joneswong joneswong Jul 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this module involves backend-related dependence? if it is not the case, no need to put import here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved the import statement to the top of the file

else:
self.comp_speed = None
self.comm_bandwidth = None
self.model_size = sys.getsizeof(pickle.dumps(self.model)) / 1024.0 * 8.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what unit? MB? give a comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert self.comm_bandwidth is not None and self.comp_speed \
is not None, "The requirement join_in_info " \
"'client_info' does not exist."
join_in_info['client_info'] = self.model_size / \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key 'client_info' is much more general than what the corresponding value really is, right? BTW, I am wondering what if an FL algo exchanges information other than model parameters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change client_info to client_resource.

Copy link
Collaborator

@joneswong joneswong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that some changes about Server are inherited by other subclasses (somehow, like "copy and paste"). Please check this stuff for Client as there are also several subclasses of it in our package.

joneswong
joneswong previously approved these changes Jul 13, 2022
Copy link
Collaborator

@joneswong joneswong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. As this pr is huge, we'd better ask another reviewer to double-check these changes.

@xieyxclack
Copy link
Collaborator Author

I have rebased the branch to the master and resolved the conflicts. Please double-check the modifications, thx @yxdyc

Copy link
Collaborator

@yxdyc yxdyc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. It seems that most of the changes will not affect the runs using non-asynchronous mode. Plz see the inline comments.

@@ -146,7 +146,7 @@ python federatedscope/main.py --cfg federatedscope/example_configs/femnist.yaml
Then you can observe some monitored metrics during the training process as:

```
INFO: Server #0 has been set up ...
INFO: Server has been set up ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we need to unify to retain the server id because of the following two reasons:

  1. when parsing the log results, the worker id $x$ is parsed via "[Server|Client] # x". If we remove the server id, we may have to modify some historical parsing codes to specifically check the conditions w.r.t. Server;
  2. for compatibility with potential support of decentralized FL, in which scenario there may be multiple workers acting as servers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will preserve the server id (default #0)

augmentation_factor=3.0):
# Served as an example, this cost model is adapted from FedScale at
# https://github.com/SymbioticLab/FedScale/blob/master/fedscale/core/
# internal/client.py#L35 (MIT License)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MIT or Apache License?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the mistakes, I have corrected it.

heapq.heappush(server_msg_cache, msg)
else:
self._handle_msg(msg)
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of codes involve too many conditional branches (from lines 186 to 261). It would be better if we could extract and organize them into other sub-function(s) to improve the readability


@timestamp.setter
def timestamp(self, value):
self._timestamp = value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to support multiple timestamp formats. For example, here we can automatically convert value to a uniform certain format with the help of the python datetime package.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added an assertion here in this version

@@ -276,13 +306,15 @@ def check_and_move_on(self,
# round or finishing the evaluation
if self.check_buffer(self.state, min_received_num, check_eval_result):
if not check_eval_result: # in the training process
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new PR, this "if" branch is too long (308~406), which is error-prone if we add more conditional branches within it. Plz try to simplify it into several sub-functions

from federatedscope.core.auxiliaries.worker_builder import get_server_cls, get_client_cls


class AsynCIFAR10Test(unittest.TestCase):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, consider adding a simply constructed asynchronous example (as shown in Figure 2 of the paper), that asserts that the order of printed client ids is consistent with the order maintained by the simulator of the new PR.



def calculate_time_cost(instance_number,
model_size,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

model_size can be a more general one such as comm_size

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

simplify and modificate according to comments

preserver server_id
Copy link
Collaborator

@joneswong joneswong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved!

@joneswong joneswong merged commit c518a31 into alibaba:master Jul 17, 2022
Schichael pushed a commit to Schichael/FederatedScope_thesis that referenced this pull request Sep 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Confusing server identifier Mechanisms to support asynchronous training protocol in FL
4 participants