Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication c++ layer #926

Closed
felipeblazing opened this issue Aug 5, 2020 · 28 comments
Closed

Communication c++ layer #926

felipeblazing opened this issue Aug 5, 2020 · 28 comments
Labels
Projects

Comments

@felipeblazing
Copy link
Contributor

felipeblazing commented Aug 5, 2020

We had a pretty interesting experience trying to get performance and correctness by sending all of our messages between nodes using ucx-py and dask to send messages. The single threaded nature of python, the fact that dask is using torando.ioloop and we were seeing things like coroutines run at the same time if we were awaiting a ucx.send. It has been really hard to troubleshoot and the performance isn't there for us.

We need to send and receive messages in the c++ layer to remove the issues we had. Seeing as how we have often been hasty in trying to implement ucx as fast as possible we are going to try and be smart and slow the heck down. Hell if it takes 3 times as long to develop and 1/2 as long to debug we will come out ahead :).

I kind of envision a few classes like this


template < typename SerializerFunction, typename BufferCommunicator> 
class SenderClass{
   SenderClass(serializer, bufferCommunicator)

    //stuff like broadcast, send_to_node, 
}

template < typename DeserializerFunction, typename BufferAssembler >
class ReceiverClass{
    ReceiverClass(deserializer,bufferAssembler)
}

SerializerFunc ==> f(vector<column_views>) returns a list of views to rmm buffers, metadata for reassembling these buffers into views
DeserializerFunc ==> takes a list of buffers and metadata and gives us a unique_ptr to a BlazingTable
BufferAssembler ==> collects all the buffers of a message and associate them with their metadata
BufferCommunicator ==> can send a single buffer from one node to another and have it arrive at the other nodes buffer assembler

We can use a combination of these things to send a message and receive it on the other end with a listener that basses the buffer to the bufferAssembler, when the buffer assembler is done the deserializer converts it to a cudf::table and a Metadata that we can use to add the message to the appropriate class

@felipeblazing felipeblazing added the ? - Needs Triage needs team to review and classify label Aug 5, 2020
@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 5, 2020

Other things that would be required is a listener for UCX or TCP. The only thing this listener does is wait for ANY message from ANY end point and pass that message along to the bufferAssembler. The bufferAssembler must be "started" before these message start arriving. This is interesting because it identifies a copule of things.

We need functions that are like


async_broadcast() //broadcast something to all nodes dispatched on another thread
async_send_node() // just sends a message to another node we do this in most places like join, order by

@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 5, 2020

BufferCommunicator
 void send(std::vector<std::string> node_ids, rmm::device_buffer payload, transmission_id)
 int send_begin(vector<ucp_endpoints> eps, begin_transmission,payload_metadata) <== synchronous and returns a transmission id to be used by other sends 
private:
map<node_id, ucp_endpoint> eps

The job of this class is to send a buffer from Endpoint to another. It should have no understanding of what payload it is sending. It takes in an rmm::device_buffer and an id that can be used by the BufferAssembler on the receiving side. It has a function to begin transmission so that we can associate each transmission with an assembler that gest made on the other side. it should block until we know the assemblers are made

@wmalpica

This comment has been minimized.

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

The BufferCommunicator sends buffers right? It is what knows about transport protocol (UCX vs TCP)?
And similarly BufferAssembler is the opposite, right? it is what receives buffers?
Can I suggest renaiming to BufferSender and BufferReceiver? Minor point.

Also for the listener of the messages, is that a responsability or feature of the BufferAssembler or is that the ReceiverClass?

Would there be one BufferAssembler per ReceiverClass? or per message? per query? or global singleton? Similarly the ReceiverClass is it one per BlazingContext()?

Where do the incoming message CacheMachine and outgoing message CacheMachine come to play?

@aucahuasi
Copy link
Member

What is the base UCX API that we will use in order to build all of those abstractions?
I remember @gcca did some demos with UCX C++ at that time the conclusions were that

  • whatever we build we need to use low level UCX APIs (UCT)
  • we need to use the listeners defined by UCX (so I guess the listener you are proposed will be tied to UCX right)

@felipeblazing
Copy link
Contributor Author

The BufferCommunicator sends buffers right? It is what knows about transport protocol (UCX vs TCP)?
And similarly BufferAssembler is the opposite, right? it is what receives buffers?
Can I suggest renaiming to BufferSender and BufferReceiver? Minor point.

Good idea

Also for the listener of the messages, is that a responsability or feature of the BufferAssembler or is that the ReceiverClass?

Neither, The call back on the ucx listener that respodns to incomding messages will use a tag to route the information to the appropriate place. When a message is goign to be sent, first goes over a list of tags that will map to that message. The call_back can access a global_context where the mapping of tag to BufferAssembler is stored and it sends the buffer over to the assembler

Would there be one BufferAssembler per ReceiverClass? or per message? per query? or global singleton? Similarly the ReceiverClass is it one per BlazingContext()?

A ReceiverClass is generated by the first message which is begin_transmission. its assembler just takes that message and puts it into a vector of device_buffers, when it has accumulated them all (on a seperate thread with a condition variable waiting for them all to arrive, it then uses the deserializer to convert those buffers and the metadata from begin_transmission into our table representation.

Where do the incoming message CacheMachine and outgoing message CacheMachine come to play?

A thread is polling on the outgoing one to call the send to its specific destination.
The incoming messag queue is the place we can route messages that we have not split off into their own cache and kernel. The Receiver handles pushing into the incoming message queue or into the specific cache it is being routed to

@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 5, 2020

What is the base UCX API that we will use in order to build all of those abstractions?

UCP

I remember @gcca did some demos with UCX C++ at that time the conclusions were that

  • whatever we build we need to use low level UCX APIs (UCT)

This is no longer the case. A great deal of work has been done to ensure this is no longer the case. The biggest risk here is proper configuration of the network

  • we need to use the listeners defined by UCX (so I guess the listener you are proposed will be tied to UCX right)

actually we are using the ucp_ep_h class like cuml

@aucahuasi
Copy link
Member

I see, great, another question:

The biggest risk here is proper configuration of the network

How do we are going to test these new abstractions? I guess in case UCP doesn't detects any RDMA channel then it will fallback to TCP automagically right? or do we need to implement some UCPTCPChannelMessageCarrierSomething in order to support TCP?

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

Neither, The call back on the ucx listener that respodns to incomding messages will use a tag to route the information to the appropriate place.

Is the listener in python or c++?

will use a tag to route the information to the appropriate place.

This tag is the message metadata (CacheData metadata)? to route the information into the appropriate CacheMachine?

When a message is goign to be sent, first goes over a list of tags that will map to that message.

I dont understand

The call_back can access a global_context where the mapping of tag to BufferAssembler is stored and it sends the buffer over to the assembler

What is the mapping of tag to BufferAssembler? Is the tag associated to a CacheMachine (i.e. a destination?) is there a BufferAssembler per CacheMachine? Is there one BufferAssembler or many?

@wmalpica
Copy link
Contributor

wmalpica commented Aug 5, 2020

A thread is polling on the outgoing one to call the send to its specific destination.

The polling thread, i imagine as soon as a message is pulled from the CacheMachine, it does the sending (serializing, BufferCommunicating) on a separate thread?

I imagine we also want that sending on a separate thread, be part of a thread pool with an upper limit on the max number of outgoing messages

@felipeblazing
Copy link
Contributor Author

How do we are going to test these new abstractions? I guess in case UCP doesn't detects any RDMA channel then it will fallback to TCP automagically right? or do we need to implement some UCPTCPChannelMessageCarrierSomething in order to support TCP?

That's a really good pair of questions. For the first. A nightmare/dream about a dragon dog hybrid that was inadvertently killing people seeking love and attention gave me some great insight on this. A dragon dog hybrid seemed like a good idea at first, but not everything was thought out when the dragon dog came to life. Namely that it would be needy, and that it seeking comfort from others was deadly to those whose affection it wanted.

We have tried to add UCX a few times before. Early on we had lots of issues with genuine bugs in UCX. We gave up many times before because we got better performance from TCP. This is definitely no longer the case. We test these new abstractions quite easily actually.

You can test the Sender and Receiver classes by making a version that takes a BufferSender or BufferReceiver that we KNOW will work, a dummy one, and the same thing for the Assembler, deserialier, serializer. We can write performance tests with this and dask to verify that it can handle high concurrency and to validate the transfer rate on different hardware. But to do this we must make these things pluggable.

As far as falling back that is handled by how you launched this to begin with. It depends on all the UCX__ env variables

@felipeblazing
Copy link
Contributor Author

The polling thread, i imagine as soon as a message is pulled from the CacheMachine, it does the sending (serializing, BufferCommunicating) on a separate thread?

yes

I imagine we also want that sending on a separate thread, be part of a thread pool with an upper limit on the max number of outgoing messages

Correct it should use a thread pool. Only the begin transmission message should be synchronous to ensure that you don't start sending a message before it can be assembled

@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 6, 2020

Here is the beginning of a working implementation of MessageSender

#pragma once

#include "execution_graph/logic_controllers/CacheMachine.h"
#include "blazingdb/concurrency/BlazingThread.h"

using gpu_raw_buffer_container = blazingdb::transport::gpu_raw_buffer_container;

/*
*using gpu_raw_buffer_container = std::tuple<std::vector<std::size_t>, std::vector<const char *>,
   std::vector<ColumnTransport> 
   std::vector<std::unique_ptr<rmm::device_buffer>> >;

This is no good and needs to be replaced its so hard to tell whats going on in code if i have to see which part of the tuple has what
*/

/**
 * A Class that can be used to poll messages and then send them off.
 * Creating this class serves the purpose of allowing us to specify different combinations of serializers, conversion from buffer to frame combined with different methods for sending and addressing. 
 * @tparam SerializerFunction A function that can convert whatever we pull from Cache into metadata and buffers to be sent over some protocol
 * @tparam BufferToFrameFunction A function that can convert an rmm::device_buffer into a frame that can be sent over a protocol, is often a no op
 * @tparam NodeAddress NodeAddress will be used to give the SendingFunction a common api for getting a nodes address 
 * @tparam
 */
template <typename SerializerFunction, typename BufferToFrameFunction, typename NodeAddress, typename Sender >
class message_sender {
public:
    message_sender(std::shared_ptr<ral::cache::CacheMachine> output_cache, size_t num_threads, std::map<std::string, NodeAddress> node_address_map ) : output_cache{ output_cache}, pool{ num_threads}{

    }
    
private:
    ctpl::thread_pool<BlazingThread> pool; /**< The thread pool used by the the sender to send messages via some protocol */
    std::shared_ptr<ral::cache::CacheMachine> output_cache; /**< The thread pool used by the the sender to send messages via some protocol */
    std::map<std::string, NodeAddress> node_address_map; /**< A map of worker_id to NodeAddress */
    
    /**
     * A polling function that listens on a cache for data to exist and then sends it off via some protocol
     */
    void run_polling();
};


void message_sender::run_polling(){
    while(true){
        std::pair<std::unique_ptr<ral::frame::BlazingTable>,MetadataDictionary >  data_and_metadata = 
            std::unique_ptr<ral::cache::GPUCacheDataMetaData>(
                static_cast<ral::cache::GPUCacheDataMetaData*>(output_cache.pullCacheData())
                )->decacheWithMetaData();

        pool.push([message{move(data_and_metadata.first)},metadata{data_and_metadata.second}, node_addresses, output_cache] (int thread_id) {
            gpu_raw_buffer_container serialized SerializerFunction(std::move(message));
            //TODO: this needs to change we need to make the container a struct
            std::vector<std::size_t> buffer_sizes = std::get<0>(serialized);
            std::vector<const char *> buffers = std::get<1>(serialized);
            std::vector<ColumnTransport> column_transports = std::get<2>(serialized);
			std::vector<std::unique_ptr<rmm::device_buffer>> column_scopes std::get<3>(serialized);

            try{
                //Initializes the sender with information needed for communicating the function that begins transmission
                //This constructor will make a ucx call iwth all the metadata and wait for it to complete
                Sender sender(node_address_map,metadata, buffer_sizes, column_transports); 
                
                for(size_t buffer_index = 0; buffer_index < buffers.size(); buffer_index++){
                    sender.send(
                        BufferToFrameFunction(buffers[buffer_index]),
                        buffer_sizes[buffer_index]
                    );
                }

            }catch (ral::exception::communication_initialization e){

            }catch (ral::exception::communication_transmission e){
                
            }catch (std::exception e){

            }

        });

    }
}

@gcca
Copy link
Contributor

gcca commented Aug 6, 2020

@felipeblazing, related with the @percy's comment

whatever we build we need to use low level UCX APIs (UCT)
This is no longer the case. A great deal of work has been done to ensure this is no longer the case. The biggest risk here is proper configuration of the network

I think we will need test the UCX (high level, no UCP) behavior, beacause

  • in a machine with GDR and IPC support,
  • running a UCX demo to not deal with interface configuration (UCT),
  • even when ucx_info detected the layers IPC and GDR,

this UCX demo only used the cuda_cpy layer.

@wmalpica wmalpica added Design and removed ? - Needs Triage needs team to review and classify labels Aug 6, 2020
@gcca
Copy link
Contributor

gcca commented Aug 6, 2020

@felipeblazing, related with the Listener and BufferCommunicator (and possiby with GPUCacheDataMetadata):

  • UCX use the concept IOV to improve sending multiple buffers
  • thinking about the distribution algorithm; in particular, receiving/sending the pivots or the scattered data from/to different nodes,
  • we could take advantage (or prepare the API) to use this feature having something SendBuffers

Another consideration (maybe for future):

  • UCX performs a copy even when we use ipc or gdr,
  • I think it works that way so as not to deal with a communication protocol, i.e.,
    • open connections (sender, receiver)
    • send data, copy data locally (to receiver memory)
    • close connection (sender, receiver)
  • maybe we want improve that particularly for sending scattered data without copy,
  • to do that, we need to keep pointers (of sender) alive while we transfer the data
  • So, the listener needs to consider a new type of message to tell the sender that he can release the pointers.

@felipeblazing
Copy link
Contributor Author

@felipeblazing, related with the @percy's comment

whatever we build we need to use low level UCX APIs (UCT)
This is no longer the case. A great deal of work has been done to ensure this is no longer the case. The biggest risk here is proper configuration of the network

I think we will need test the UCX (high level, no UCP) behavior, beacause

  • in a machine with GDR and IPC support,

This is how cuml does it. I am pretty sure they are taking advantage of these protocols. This is set by the end_points and what capacity those endpoints have

  • running a UCX demo to not deal with interface configuration (UCT),

We arent configuring the interface at all. This is done by ucx-py

  • even when ucx_info detected the layers IPC and GDR,

this UCX demo only used the cuda_cpy layer.

This was an issue in the past and should now be resolved.
Let me know if I am missing something here @gcca

@felipeblazing
Copy link
Contributor Author

@felipeblazing, related with the Listener and BufferCommunicator (and possiby with GPUCacheDataMetadata):

  • UCX use the concept IOV to improve sending multiple buffers

Show me a link of what you mean

  • thinking about the distribution algorithm; in particular, receiving/sending the pivots or the scattered data from/to different nodes,
  • we could take advantage (or prepare the API) to use this feature having something SendBuffers

Please look at the code above

Another consideration (maybe for future):

  • UCX performs a copy even when we use ipc or gdr,

  • I think it works that way so as not to deal with a communication protocol, i.e.,

    • open connections (sender, receiver)
    • send data, copy data locally (to receiver memory)
    • close connection (sender, receiver)
  • maybe we want improve that particularly for sending scattered data without copy,

  • to do that, we need to keep pointers (of sender) alive while we transfer the data

  • So, the listener needs to consider a new type of message to tell the sender that he can release the pointers.

Yes right now thats exactly how it works in the example I provided above

@jeanp413
Copy link
Contributor

jeanp413 commented Aug 6, 2020

template <typename SerializerFunction, typename BufferToFrameFunction, typename NodeAddress, typename Sender >
class message_sender {

Couple observations:

  1. SerializerFunction and BufferToFrameFunction should be part of an interface, they are related to each other.
  2. Are the template parameters really necessary? I also image NodeAddress as an interface with something like getTCPAddress, getUCXAddress and the sender is the one who gets the corresponding address whether is sending data through tpc or ucx.
  3. Why CacheMachine is part of message_sender. I don't think they should be related

@felipeblazing
Copy link
Contributor Author

template <typename SerializerFunction, typename BufferToFrameFunction, typename NodeAddress, typename Sender >
class message_sender {

Couple observations:

  1. SerializerFunction and BufferToFrameFunction should be part of an interface, they are related to each other.

serializer converts a BlazingTable ==> metadata and buffers
BufferToFrame convets each one of those buffes to something hta tcan be sent, in the case of ucx this is a no op, in the case of TCP it moves the gpu buffer to cpu

  1. Are the template parameters really necessary? I also image NodeAddress as an interface with something like getTCPAddress, getUCXAddress and the sender if the one who gets the corresponding address id its sending data through tpc or ucx accordingly.

The template parameters are so that you can mix and match these in case that becomes something that needs to be done. So imagine in the future there is library FastSend. And we FastSend like UCX can send gpu buffers directly but it has its own protocol for specifying information about how the buffer is going to be sent. We could change jsut the serializer here but use the same BufferToFrameFunction and change out the NodeAddress from UCX to FastSend (again thats a made up name) and have a FastSend Sender. Say that FastSend can use both TCP and UCX end points. Now The NodeAddress might be TCP or UCX but the Sender needs to be FastSend
The purpose here is to be able to change out only the minimum needed to so that each protocol only has to implement one or two new ones of these.

TCP can use the same Serialier as UCX
BufferToFrameFunction is different between tcp and ucx
NodeAddress could be the same or different between these two
Sender is different

@felipeblazing
Copy link
Contributor Author

getTCPAddress, getUCXAddress

remember the MessageSender class doesn't necessarily know what kind of address its getting

@jeanp413
Copy link
Contributor

jeanp413 commented Aug 6, 2020

serializer converts a BlazingTable ==> metadata and buffers
BufferToFrame convets each one of those buffes to something hta tcan be sent, in the case of ucx this is a no op, in the case of TCP it moves the gpu buffer to cpu

ok then they should be just one function why SerializerFunction will need to return a pointer to gpu buffer for tcp just return the converted cpu buffer directly

The template parameters are so that you can mix and match these in case that becomes something that needs to be done. So imagine in the future there is library FastSend. And we FastSend like UCX can send gpu buffers directly but it has its own protocol for specifying information about how the buffer is going to be sent. We could change jsut the serializer here but use the same BufferToFrameFunction and change out the NodeAddress from UCX to FastSend (again thats a made up name) and have a FastSend Sender. Say that FastSend can use both TCP and UCX end points. Now The NodeAddress might be TCP or UCX but the Sender needs to be FastSend

That also could be done with interfaces and it's more explicit about what methods are required to be implemented.

remember the MessageSender class doesn't necessarily know what kind of address its getting

that's right only the Sender cares about it, so if it's an UCXSender it will call getUCXAddress

@felipeblazing
Copy link
Contributor Author

ok then they should be just one function why SerializerFunction will need to return a pointer to gpu buffer for tcp just return the converted cpu buffer directly

Becuse if they both return gpu pointers then you have only 1 serializer function for both of them

The serialier function is complicated. The converting from gpu to cpu is not. Seperating these means that you ahve fewer differences between protocols

That also could be done with interfaces and it's more explicit about what methods are required to be implemented.

The point is that in the future we might not want the NodeAddress to have bth UCX and TCP connection information in them. For now we are doing this for expediency. So it might not have a getUCXAddress when its tcp or a getTCPAddress when its UCX

that's right only the Sender cares about it, so if it's an UCXSender it will call getUCXAddress

So I do see the point about the Address being tied to the sender.. I agree that Sender should be an interface. We could have NodeAddress be a pointer that gets typecast by the sender.

@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 6, 2020

what about something that looks more like this

template <typename SerializerFunction, typename BufferToFrameFunction, typename Sender >
class message_sender {
public:
    message_sender(std::shared_ptr<ral::cache::CacheMachine> output_cache, size_t num_threads, std::map<std::string, NodeAddress *> node_address_map ) : output_cache{ output_cache}, pool{ num_threads}{

    }
    
private:
    ctpl::thread_pool<BlazingThread> pool; /**< The thread pool used by the the sender to send messages via some protocol */
    std::shared_ptr<ral::cache::CacheMachine> output_cache; /**< The thread pool used by the the sender to send messages via some protocol */
    std::map<std::string, NodeAddress *> node_address_map; /**< A map of worker_id to NodeAddress */
    
    /**
     * A polling function that listens on a cache for data to exist and then sends it off via some protocol
     */
    void run_polling();
};


void message_sender::run_polling(){
    while(true){
        std::pair<std::unique_ptr<ral::frame::BlazingTable>,MetadataDictionary >  data_and_metadata = 
            std::unique_ptr<ral::cache::GPUCacheDataMetaData>(
                static_cast<ral::cache::GPUCacheDataMetaData*>(output_cache.pullCacheData().release())
                )->decacheWithMetaData();

        pool.push([message{move(data_and_metadata.first)},metadata{data_and_metadata.second}, node_address_map, output_cache] (int thread_id) {
            gpu_raw_buffer_container serialized SerializerFunction(std::move(message));
            //TODO: this needs to change we need to make the container a struct
            std::vector<std::size_t> buffer_sizes = std::get<0>(serialized);
            std::vector<const char *> buffers = std::get<1>(serialized);
            std::vector<ColumnTransport> column_transports = std::get<2>(serialized);
			std::vector<std::unique_ptr<rmm::device_buffer>> column_scopes std::get<3>(serialized);

            try{
                //Initializes the sender with information needed for communicating the function that begins transmission
                //This constructor will make a ucx call iwth all the metadata and wait for it to complete
                Sender sender(node_address_map,metadata, buffer_sizes, column_transports); 
                
                for(size_t buffer_index = 0; buffer_index < buffers.size(); buffer_index++){
                    sender.send(
                        BufferToFrameFunction(buffers[buffer_index]),
                        buffer_sizes[buffer_index],
                    );
                }

            }catch (ral::exception::communication_initialization e){

            }catch (ral::exception::communication_transmission e){
                
            }catch (std::exception e){

            }

        });

    }
}



class buffer_sender{
public:
    Sender(std::map<std::string, NodeAddress *> node_address_map,metadata,buffer_sizes,column_transports){
        //iterate for workers this is destined for
        for( auto worker_id : StringUtil::split(metadata[WORKER_IDS_METADATA_LABEL],",")){
            if(node_address_map.find(worker_id) == node_address_map.end()){
                throw std::exception(); //TODO: make a real exception here
            }
            destinations.push_back(node_address_mapp[worker_id])
        }
    }
protected:
    virtual void send_begin_transmission() = delete;
    virtual void send(buffer_frame * frame, size_t frame_size) = delete;
private:
    std::vector<NodeAddress *> destinations;
}

@jeanp413
Copy link
Contributor

jeanp413 commented Aug 6, 2020

Becuse if they both return gpu pointers then you have only 1 serializer function for both of them
The serialier function is complicated. The converting from gpu to cpu is not. Seperating these means that you ahve fewer differences between protocols

You could have serializeCudfTable as a pure function and call it inside SerializerFunction for both ucx and tcp, and in the tcp version add the conversion to cpu. You don't need to separate that into two template arguments. And make the SerializerFunction an interface with it's corresponding deserialize counterpart.

The point is that in the future we might not want the NodeAddress to have bth UCX and TCP connection information in them. For now we are doing this for expediency. So it might not have a getUCXAddress when its tcp or a getTCPAddress when its UCX

If at one point that happens just change the interface.

@wmalpica wmalpica added this to Needs prioritizing in Scrum board Aug 10, 2020
@felipeblazing
Copy link
Contributor Author

Right now this is how we send the metadata for a df

void ucx_buffer_transport::send_begin_transmission() {
	std::vector<char> buffer_to_send = make_begin_transmission();

	std::vector<ucs_status_ptr_t> requests;
	for(auto const & node : destinations) {
		requests.push_back(ucp_tag_send_nb(
			node.get_ucp_endpoint(), buffer_to_send.data(), buffer_to_send.size(), ucp_dt_make_contig(1), tag, send_begin_callback_c));
	}

	for(auto & request : requests) {
		if(UCS_PTR_IS_ERR(request)) {
			// TODO: decide how to do cleanup i think we just throw an initialization exception
		} else if(UCS_PTR_STATUS(request) == UCS_OK) {
			ucp_request_release(request);
			increment_begin_transmission();
		} else {
			// Message was not completed we set the uid for the callback
			auto blazing_request = reinterpret_cast<ucx_request *>(&request);
			blazing_request->uid = reinterpret_cast<blazing_ucp_tag *>(&tag)->message_id;
		}
	}
	// TODO: call ucp_worker_progress here
	wait_for_begin_transmission();
}

This is wait for begin transmission

void ucx_buffer_transport::wait_for_begin_transmission() {
	std::unique_lock<std::mutex> lock(mutex);
	completion_condition_variable.wait(lock, [this] {
		if(transmitted_begin_frames >= destinations.size()) {
			return true;
		} else {
			return false;
		}
	});
}

While this guarantees that the message was delivered it does not guarantee that the other side has initialized the Message_receiver on the other side. We should change this function to actually wait for an acknowledge from the other side. We could do something along the lines of.

Server 1 sends being transmission to Server 2
Server 2 returns and acknowledgement to Server 1 using the first 6 bytes of the tag. Then we incement transmitted_begin_frames and not before.

So server 1 has to listen for the message it sent N times. We could use the final 2 bytes as a representative of which ral_id is acknowledging if we want to track that.

@felipeblazing
Copy link
Contributor Author

Big bummer realization. There are certain things that we are expected to keep in cope throughout query execution like info_tag which if I am not mistaken must stay in scope until the call back is called

void ucx_message_listener::poll_begin_message_tag(ucp_tag_t tag){
    for(;;;){
        ucp_tag_recv_info_t info_tag;
        auto message_tag = ucp_tag_probe_nb(
            ucp_worker, tag, begin_tag_mask, 1, &info_tag);
        if(message != NULL){
            //we have a msg to process
            std::vector<char> message_metadata(info_tag.length);
            auto request = ucp_tag_msg_recv_nb(ucp_worker, message_metadata.data(), info_tag.length,
                                  ucp_dt_make_contig(1), message_tag,
                                  recv_begin_callback_c);
            

@felipeblazing
Copy link
Contributor Author

There is no global place to store a representation of the graph. Because communications spans across queries we need to do something like store a map of query_id ==> graph so we can do things like get the output caches.

@felipeblazing
Copy link
Contributor Author

felipeblazing commented Aug 17, 2020


void send_begin_callback_c(void * request, ucs_status_t status) {
	auto blazing_request = reinterpret_cast<ucx_request *>(request);
	auto transport = uid_to_buffer_transport[blazing_request->uid];
	transport->increment_begin_transmission();
	ucp_request_release(request);
}

Instead of incrementing transport->increment_begin_transmission(); here we need to do it via a callback after receiving a message of acknowledgement from the receiver. The message will have the same tag as the sender tag with the last two bytes changed to 0xFFFF

so you wait until you receive that acknowledgement using somethign like

  blazing_status status;
  auto request = ucp_tag_msg_recv_nb(ucp_worker, 
				&status,
				sizeof(blazing_status),
				ucp_dt_make_contig(1), message_tag,
				recv_begin_confirm_callback_c);

and then it is recv_begin_confirm_callback_c that actually calls the increment function

@wmalpica wmalpica moved this from Needs prioritizing to Not Started in Scrum board Aug 19, 2020
@wmalpica wmalpica moved this from Not Started to Done in Scrum board Sep 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Scrum board
  
Done
Development

No branches or pull requests

5 participants