Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barriers Required for Distributed execution. #923

Closed
felipeblazing opened this issue Aug 4, 2020 · 13 comments
Closed

Barriers Required for Distributed execution. #923

felipeblazing opened this issue Aug 4, 2020 · 13 comments
Assignees
Labels
Projects

Comments

@felipeblazing
Copy link
Contributor

Right now Kernels handle distribution and ensuring completeness so that they continue when they have to communicate. Here is an example of what that looks like in aggregation.

Below we are iterating through batches that this kernel gets from its input cache, partitioning them and sending each node its corresponding partition. We store a count of how many partitions we sent to each node and how many we kept for ourselves.

while (input.wait_for_next()) {
    auto batch = input.next();
    CudfTableView batch_view = batch->view();
    std::vector<CudfTableView> partitioned;
    std::unique_ptr<CudfTable> hashed_data; // Keep table alive in this scope
    if (batch_view.num_rows() > 0) {
        std::vector<cudf::size_type> hased_data_offsets;
        std::tie(hashed_data, hased_data_offsets) = cudf::hash_partition(batch->view(), columns_to_hash, num_partitions);
        // the offsets returned by hash_partition will always start at 0, which is a value we want to ignore for cudf::split
        std::vector<cudf::size_type> split_indexes(hased_data_offsets.begin() + 1, hased_data_offsets.end());
        partitioned = cudf::split(hashed_data->view(), split_indexes);
    } else {
        //  copy empty view
        for (auto i = 0; i < num_partitions; i++) {
            partitioned.push_back(batch_view);
        }
    }

    ral::cache::MetadataDictionary metadata;
    for(int i = 0; i < this->context->getTotalNodes(); i++ ){
        auto partition = std::make_unique<ral::frame::BlazingTable>(partitioned[i], batch->names());
        if (this->context->getNode(i) == self_node){
            this->output_.get_cache()->addToCache(std::move(partition),"",true);
            node_count[self_node.id()]++;
        } else {
            node_count[this->context->getNode(i).id()]++;
            output_cache->addCacheData(std::make_unique<ral::cache::GPUCacheDataMetaData>(std::move(partition), metadata),"",true);
        }
    }
    }
    batch_count++;
}

After this code executes we send each node a count of how many partitions we sent them.


auto self_node = ral::communication::CommunicationData::getInstance().getSelfNode();
auto nodes = context->getAllNodes();
std::string worker_ids = "";


for(std::size_t i = 0; i < nodes.size(); ++i) {
    if(!(nodes[i] == self_node)) {
        ral::cache::MetadataDictionary metadata;
        messages_to_wait_for.push_back(metadata.get_values()[ral::cache::QUERY_ID_METADATA_LABEL] + "_" +
                                metadata.get_values()[ral::cache::KERNEL_ID_METADATA_LABEL] +	"_" +
                                metadata.get_values()[ral::cache::WORKER_IDS_METADATA_LABEL]);
        this->query_graph->get_output_cache()->addCacheData(
            std::unique_ptr<ral::cache::GPUCacheData>(new 
               ral::cache::GPUCacheDataMetaData(ral::utilities::create_empty_table({}, {}), metadata)),"",true);
    }
}


Then we collect all of the partition counts from each worker node. After this we sum them up and wait for our output cache to have that many partitions before we can say this kernel is finished.


auto self_node = ral::communication::CommunicationData::getInstance().getSelfNode();
int total_count = node_count[self_node.id()];
for (auto message : messages_to_wait_for){
    auto meta_message = this->query_graph->get_input_cache()->pullCacheData(message);
    total_count += std::stoi(static_cast<ral::cache::GPUCacheDataMetaData *>(meta_message.get())->getMetadata().get_values()[ral::cache::PARTITION_COUNT]);
}
this->output_cache()->wait_for_count(total_count);

We want to abstract away a few of the things that are happening here. We are often following this pattern of spreading data out and then theres a barrier to be able to continue. We want to remove this code from the kernel run function itself and have a more generic way of saying things like

As we discuss and implement the movement towards scheduling tasks to be run we need to have primitives that can do things like :

  • create a broadcast to all and expect broadcast from all primitive
  • create a method for preventing tasks from either being scheduled or run by the scheduler until some kind of condition is met (e.g. wait_for_count but disassociated from the actual run function so it is something that can be "injected" preferably through something like composition).
@felipeblazing felipeblazing added the ? - Needs Triage needs team to review and classify label Aug 4, 2020
@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

To add to the design needs. In the spirit of trying to be able to move towards concepts that will allow us to move towards a batch being executed as a job, we also need something that can tell if all the batches or jobs that correspond to a kernel are done. So if we have the most of the work done by an executor, which is on a separate thread(s), the kernel needs a way to know when its done, so that it can set the output cache to finish().

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

For keeping track of batches that are done, i think we should add some member functions and variables to the kernel interface.

functions:

void increment_batch_tracker(){
    std::lock_guard<std::mutex> lock(batch_tracker_mutex);
    batch_tracker++;
}
void wait_for_count(int num_batches_launched){
    std::unique_lock<std::mutex> lock(batch_tracker_mutex);
    batch_tracker_condition.wait(lock, [batch_tracker] {
         return batch_tracker == num_batches_launched;
  }
}

variables:

int batch_tracker;
std::condition_variable batch_tracker_condition;
std::mutex batch_tracker_mutex;

This would be used in something like this:

kstatus run() {
     int num_batches_launched = 0;
     while input has next {
           num_batches_launched++;
           run_batch_in_executor(); // this runs on another thread in an executor and it will call increment_batch_tracker()
   }
   wait_for_count(num_batches_launched);  // this allows us to wait until all the batches have been executed before finishing the kernel
}

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

For the patterns of distributing data, we could have a messenger class that is part of a kernel that has a state and enables this sort of functionality. This class would have a couple of features:

In its constructor, it would receive information about the kernel id, input id and output ids. Or anything it needs for the metadata header that is part of the messages. It would also get a pointer to the outgoing message cacheMachine. It would also get a list of all the nodes, or maybe the context, which has all that.

It would have functions for common sending paradigms:
void broadcast(std::unique_ptr<BlazingTable> data) this function would send one table to all nodes
void scatter(std::vector<std::unique_ptr<BlazingTable>> datas) this function would send each table its corresponding nodes. It would assume that the size of the vector is the same as the number of nodes
When it would send data, it would have internal counters for knowing how many messages it sent to each node

It would also have a function for sending metadata to all the nodes, such as the partition counts to expect:
void send_total_partition_counts()
This function could use the counts it would be maintaining internally

It would also have a function for getting info from all nodes for how much it should expect to receive. Similar to what was stated as needed above:

int get_total_partition_counts(){
auto self_node = ral::communication::CommunicationData::getInstance().getSelfNode();
int total_count = node_count[self_node.id()];
for (auto message : messages_to_wait_for){
    auto meta_message = this->query_graph->get_input_cache()->pullCacheData(message);
    total_count += std::stoi(static_cast<ral::cache::GPUCacheDataMetaData *>(meta_message.get())->getMetadata().get_values()[ral::cache::PARTITION_COUNT]);
}
return total_count;
}

@wmalpica wmalpica added this to Needs prioritizing in Scrum board Aug 5, 2020
@wmalpica wmalpica removed the ? - Needs Triage needs team to review and classify label Aug 5, 2020
@felipeblazing
Copy link
Contributor Author

@williamBlazing
for

void increment_batch_tracker(){
    std::lock_guard<std::mutex> lock(batch_tracker_mutex);
    batch_tracker++;
}

we should just use an atomic here. theres no reason to block the mutex

for

void wait_for_count(int num_batches_launched){
    std::unique_lock<std::mutex> lock(batch_tracker_mutex);
    batch_tracker_condition.wait(lock, [batch_tracker] {
         return batch_tracker == num_batches_launched;
  }
}

The linux select function comes to mind. https://www.tutorialspoint.com/unix_system_calls/_newselect.htm as does how go works with channels for its selects statement. Like here the logic we are using is kind of ugly.

ExecutingThread says to some cache. hey I am going to wait for you to be done. I am goign to stop doipng things after until you are. Two things can happen here that are hard to deal with.

  1. we never get that count and this function blocks forever. You can add a timeout or something that tracks that it didnt work but the main problem was that we have a thread waiting for ever.
  2. We have to make a different time of wait function for times when we don't know what that total count will be
kstatus run() {
     int num_batches_launched = 0;
     while input has next {
           num_batches_launched++;
           run_batch_in_executor(); // this runs on another thread in an executor and it will call increment_batch_tracker()
   }
   wait_for_count(num_batches_launched);  // this allows us to wait until all the batches have been executed before finishing the kernel
}

In this wait_for_count is being used in a way differently from what it was before. Remember this tells you how much to wait depending not only on what comes from yourself but also things that come from OTHER nodes. So you have to collect this count from all the other nodes.

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

@felipeblazing

For increment_batch_tracker I was not going to an atomic, because we are already going to be using a mutex for the condition variable. We could just use an atomic. I dont care.

For your suggestion of using the linux select function, I think we already have patterns in our code for waiting functions, I dont think we should introduce a new way of doing things.

The function wait_for_count that i am talking about is not for the node_count or the node messages. Its for tracking batches that will be run by a kernel, and only finishing the kernel when all the batches are done. That is its sole purpose.

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

  1. we never get that count and this function blocks forever. You can add a timeout or something that tracks that it didnt work but the main problem was that we have a thread waiting for ever.

If we dont get the count, its because something went wrong in the job. THe job should throw and error and we should handle that error.

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

  1. We have to make a different time of wait function for times when we don't know what that total count will be

As I mentioned before, this is for tracking batches run by a kernel. We will always know what the count is, because you increment the counter when you create the job or batch.

@felipeblazing
Copy link
Contributor Author

  1. We have to make a different time of wait function for times when we don't know what that total count will be

As I mentioned before, this is for tracking batches run by a kernel. We will always know what the count is, because you increment the counter when you create the job or batch.

What about a situation where wait_for_count doesn't just depend on things that came from my node. For example for my join to be ready to proceed past the partitionKernel it needs to wait for all of its batches to be processed but it ALSO must wait for all other nodes to have sent the batches that correspond to me over and that those have been added to the output_cache

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

  1. We have to make a different time of wait function for times when we don't know what that total count will be

As I mentioned before, this is for tracking batches run by a kernel. We will always know what the count is, because you increment the counter when you create the job or batch.

What about a situation where wait_for_count doesn't just depend on things that came from my node. For example for my join to be ready to proceed past the partitionKernel it needs to wait for all of its batches to be processed but it ALSO must wait for all other nodes to have sent the batches that correspond to me over and that those have been added to the output_cache

The wait_for_count that i am talking about above is for tracking batches run by a kernel. That is something that would be used for kernels that distribute and those that dont. Given that CacheMachine has a function that is similar with the same name, lets rename it to: kernel::wait_for_batches_count()

For kernels that distribute, if you need to wait for all the data and messages you are expecting to receive, that would be accomplished by
int get_total_partition_counts() from the proposed messenger class.
Sidenote: Lets call it instead MessageManager class (to diferentiate it from a class that actually does communication, this class is only interacting with creating messages and putting them in outgoing message caches, or receiving said messages).

So in the case of waiting for all the batches of the kernel to finish AND also waiting for it to receive all the expected messages, it would look like something like this:

// num_batches_launched is a variable which has been incremented for every batch job run
this->wait_for_batches_count(num_batches_launched);  // this allows us to wait until all the batches have been executed.  (this is the kernel)
int total_partition_counts = messageManager->get_total_partition_counts();
this->output_cache()->wait_for_count(total_partition_counts);

Note that this example is not necessarily for the join, because the join has two outputs, but the principle is the same. In a join kernel, you would basically have two messageManagers for each table

@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 5, 2020

it to: kernel::wait_for_batches_count()

could that be wait_for_processed_count() ?

: Lets call it instead MessageManager class

What about MessageFactory ? it really is a factory method right?
@williamBlazing

@wmalpica wmalpica added the Design label Aug 6, 2020
@wmalpica
Copy link
Contributor

wmalpica commented Aug 6, 2020

it to: kernel::wait_for_batches_count()

could that be wait_for_processed_count() ?

Absolutely

: Lets call it instead MessageManager class

What about MessageFactory ? it really is a factory method right?
@williamBlazing

No, its not only a factory. It has sending and receiving functions and it has a state

@felipeblazing
Copy link
Contributor Author

No, its not only a factory. It has sending and receiving functions and it has a state

Maybe I am misunderstanding. I thought message managers purpose was to generate the Metadata class in CacheMachine.h. It sends by adding to the output cache and the messages it receives end up in the appropriate cache. In the case of messages that need to be retrieved during a kernels execution it goes into the general input cache where the message can be pulled by name.

@wmalpica
Copy link
Contributor

The class would be able to send and receive messages as you mention, but
If you see in my comments above, i talk about the class ALSO being able to do:
send_total_partition_counts
and
get_total_partition_counts

This requires it to also have a state about how many messages it has sent and all that.

@wmalpica wmalpica moved this from Needs prioritizing to Not Started in Scrum board Aug 19, 2020
@rommelDB rommelDB self-assigned this Aug 20, 2020
@rommelDB rommelDB moved this from Not Started to WIP in Scrum board Aug 20, 2020
@roaramburu roaramburu moved this from WIP to Paused in Scrum board Sep 2, 2020
@wmalpica wmalpica moved this from Paused to Done in Scrum board Dec 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Scrum board
  
Done
Development

No branches or pull requests

3 participants