Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Distributed Data Loader)The data loading time for sentment140 is too long. I waited for more than half an hour. #4

Closed
chaoyanghe opened this issue Jan 16, 2021 · 27 comments
Assignees

Comments

@chaoyanghe
Copy link
Member

Running this for loop is extremely loog.

for idx in range(data_attr["n_clients"]):

@yuchenlin
Copy link
Collaborator

it is expected to be long for SQuAD and sentiment140 because they are large, it's not a problem. @RaymondTseng double check?

@yuchenlin
Copy link
Collaborator

@chaoyanghe you can also go ahead and try to implement some alternative way and see if it can be more efficient. just make sure the format are consistent then it should be fine.

@chaoyanghe
Copy link
Member Author

I waited for more than 1 hour. it's stuck somewhere, debugging.

@yuchenlin
Copy link
Collaborator

even for squad, it's like only a few mins. Shouldn't be an hour. We don't do any preprocessing here, just simply load the data. You can put your exact running command here and @RaymondTseng and I will check it.

@chaoyanghe
Copy link
Member Author

I have located the reason. Each client (a process) will load the same dataset which contains all clients' data. This leads to OOM. As shown in this picture:
Screenshot from 2021-01-16 03-04-20

@chaoyanghe
Copy link
Member Author

As we can see from the memory cost of each process, each one cost 30+G.

@yuchenlin
Copy link
Collaborator

yuchenlin commented Jan 16, 2021

I see. I cannot think of a simple fix, because based on my understanding, we have to load all data in each process, right?

@yuchenlin
Copy link
Collaborator

Or are there any ways that you can load the necessary data for each process?

@chaoyanghe
Copy link
Member Author

We solved this in previous research. We can support another API like we did for CV. An example is:
https://github.com/FedML-AI/FedML/blob/49a3c760c7d166d6730c118eb0aafae872c852bf/fedml_api/data_preprocessing/cifar100/data_loader.py#L200

In this API, it distinguishes different processes with their ID and only load a specfic client' data in a single process.

@RaymondTseng Please follow this idea and fix it. Thanks.

@chaoyanghe
Copy link
Member Author

@yuchenlin yeah, we can load the exclusive data a process needs. I also did this in my current PipeTransformer research.
As for FL, our current testing only happens at the server side, so only the server needs to hold the global test data, which further saves memory.

@yuchenlin
Copy link
Collaborator

yuchenlin commented Jan 16, 2021

Can you explain a little bit more about how it works? I thought a process is not mapped to a specific client (or a group of clients) but based on dynamically sampling from the client pool? If so, how can we decide what clients we want to load for each process?

@yuchenlin
Copy link
Collaborator

We solved this in previous research. We can support another API like we did for CV. An example is:
https://github.com/FedML-AI/FedML/blob/49a3c760c7d166d6730c118eb0aafae872c852bf/fedml_api/data_preprocessing/cifar100/data_loader.py#L200

In this API, it distinguishes different processes with their ID and only load a specfic client' data in a single process.

@RaymondTseng Please follow this idea and fix it. Thanks.

Are you suggesting in this scenario, one process is only for one client, so # of processes = # of clients?

@chaoyanghe
Copy link
Member Author

chaoyanghe commented Jan 16, 2021

I can explain with the PyTorch API which is more understandable.
Normally, we define the dataset, and generate a Sampler class, put it to a dataloader.

In a distributed env, PyTorch provides a distributed Sampler. What we can do is setting the num_replicas and rank to load a piece of dataset which is exclusive to every replica:

torch.utils.data.distributed.DistributedSampler(self.train_dataset, num_replicas=num_replicas, rank=local_rank)

In our case, if we use this API, we can set num_replicas = # client, local_rank = client_index (starting from 0)

@chaoyanghe
Copy link
Member Author

We can copy the source code of PyTorch and customize a DistributedSampler in our FedNLP. Or we simply implement ours as we did in FedML for CIFAR100/ImageNet100/GLD, etc.

@chaoyanghe
Copy link
Member Author

We solved this in previous research. We can support another API like we did for CV. An example is:
https://github.com/FedML-AI/FedML/blob/49a3c760c7d166d6730c118eb0aafae872c852bf/fedml_api/data_preprocessing/cifar100/data_loader.py#L200
In this API, it distinguishes different processes with their ID and only load a specfic client' data in a single process.
@RaymondTseng Please follow this idea and fix it. Thanks.

Are you suggesting in this scenario, one process is only for one client, so # of processes = # of clients?

Yeah, # processes = # clients + 1 (server)

@chaoyanghe
Copy link
Member Author

chaoyanghe commented Jan 16, 2021

For BERT/GPT-3 pretraining, the scenario will be more complex, we can talk more on this if you are interested in it. The data loading I did at PipeTransformer research requires a customization from the low-level PyTorch code, which can either guarantee a shuffle in each epoch or save memory cost when sharing memory cross processes and machines.

@chaoyanghe chaoyanghe changed the title The data loading time for sentment140 is too long. I wanted for more than half an hour. The data loading time for sentment140 is too long. I waited for more than half an hour. Jan 16, 2021
@yuchenlin
Copy link
Collaborator

yuchenlin commented Jan 16, 2021

What if we have 10000 clients in a dataset? Are we going to use 10001 processes? The word client in our dataloader is like the concept of user. So it can be very large, right?

@chaoyanghe
Copy link
Member Author

chaoyanghe commented Jan 16, 2021

Can you explain a little bit more about how it works? I thought a process is not mapped to a specific client (or a group of clients) but based on dynamically sampling from the client pool? If so, how can we decide what clients we want to load for each process?

In FedML framework, the mapping between client and process is not random. We defined a client index for each client and then directly let process_i create all classes related to client_i.

@chaoyanghe
Copy link
Member Author

chaoyanghe commented Jan 16, 2021

What if we have 10000 clients in a dataset? Are we going to use 10001 processes? The word client in our dataloader is like the concept of user. So it can be very large, right?

Good question. This explains why we need two args: WORKER_NUM and CLIENT_NUM. The first one represents the number of processes in system-wise, while the second one represents the ML-wise concept (user, agent, etc). When CLIENT_NUM >> WORKER_NUM, we will activate "client_sampling", which uniformlly samples a fraction of clients and asks for a collaborative training in a smaller scope. So this mitigates the system-wise challenge. In practical system, we have a profiling subsystem to check client side status, and choose those users which are available (e.g., in battery charging). This is the most naive way, so more research is required here. Salman has one paper talks about the scalability issue from ML convergence analysis, but it is still not enough for practice. A trade-off between system and ML optimization is hard to be found or proved, especialy in distributed ML system. ML researchers require a system does not hurt the convergence, while a system guy may ask for a simpler design. So MLSys research is to concile these two.

@chaoyanghe
Copy link
Member Author

At Google, their current system design can only handle 200 parallel clients for training. Scaling up to more may hurt the accuracy or requires a distributed server to do load balance, still an open problem.

@RaymondTseng
Copy link
Contributor

RaymondTseng commented Jan 16, 2021

Are you saying that we need an additional function to map client id to process id? @chaoyanghe

@chaoyanghe
Copy link
Member Author

chaoyanghe commented Jan 16, 2021

@RaymondTseng I mean we need a distributed data loader. Please check this function in CV as a reference: https://github.com/FedML-AI/FedML/blob/49a3c760c7d166d6730c118eb0aafae872c852bf/fedml_api/data_preprocessing/cifar100/data_loader.py#L200

@chaoyanghe chaoyanghe changed the title The data loading time for sentment140 is too long. I waited for more than half an hour. (Distributed Data Loader)The data loading time for sentment140 is too long. I waited for more than half an hour. Jan 16, 2021
@RaymondTseng
Copy link
Contributor

@chaoyanghe The current data loader can load the data according to client_idx. My understanding is we need a function to create a mapping relation between process id and client id. In each round, server can sample some clients for each process and create that mapping relation. If one process has more than one client index, we just use a for loop to load all the data.

@yuchenlin
Copy link
Collaborator

@chaoyanghe The current data loader can load the data according to client_idx. My understanding is we need a function to create a mapping relation between process id and client id. In each round, server can sample some clients for each process and create that mapping relation. If one process has more than one client index, we just use a for loop to load all the data.

That's also my point. In our exps, it is very common that # of clients (users) >> # of processes (workers). So we have to create a mapping from a process_id to a GROUP of client ids when we load data. Then, we can load the necessary data for each process.

@RaymondTseng Let's discuss more later when we finish the FedOpt and maybe refractor the data-loading part of the script to a single place so that we can optimize it one time.

@chaoyanghe
Copy link
Member Author

chaoyanghe commented Jan 17, 2021

@RaymondTseng @yuchenlin We already did this for FedML-IoT and FedCV. I think this README.md is more understandable and expresses well what I mean. I think it is the same thing as Zihang said.

https://github.com/FedML-AI/FedML/tree/master/fedml_api/data_preprocessing/MNIST

Client Sampling Example

For each round of sampling (2 workers, 8 rounds):

client_indexes = [993 859], [507 818], [37 726], [642 577], [544 515],[978 22],[778 334]

Then for device 0, the data includes:

["f_00993", "f_00507", "f_00037", "f_00642", "f_00698", "f_00544", "f_00978", "f_00778"]

For device 1, the data includes:

["f_00859", "f_00818", "f_00726", "f_00762", "f_00577", "f_00515", "f_00022", "f_00334"]

@chaoyanghe
Copy link
Member Author

With the development of many subprojects, we already had many functionalities that can be unified into the FedML framework. This distributed data loader is a good example.

@Elliebababa Could you please help to develop such an uniformed API (algorithmic primitive) for every project? Until now, three projects meet this issue because of using a larger dataset.

https://github.com/FedML-AI/FedML/tree/master/fedml_api/data_preprocessing/MNIST

@Elliebababa
Copy link
Contributor

Sure. I will have a look at it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants