-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RPC] add C++ RPC infrastructure and distributed sampler #408
Conversation
I assume that I only need to port Re unit test: I think it's better to have some sort of tests, otherwise I wouldn't know whether Windows can work. IMO they don't have to be integrated into CI (like unit tests), but I need some scripts to set up a simulated, or connect to an existing, distributed environment. |
Also, what about PR #325 ? |
I closed this PR. It is out of date. |
Yes, only the TCPSocket class need to be changed. The other components are all platform-agnostic. I will add some unittests for distributed sampler. |
src/graph/graph_apis.cc
Outdated
DGL_REGISTER_GLOBAL("graph_index._CAPI_ReceiverRecvSubgraph") | ||
.set_body([] (DGLArgs args, DGLRetValue* rv) { | ||
// The data buffer will be allocated just once | ||
static char* data_buffer = new char[kMaxBufferSize]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memory is allocated when the process starts regardless of how people use DGL. Maybe allocate memory more explicitly. e.g., only when people starts to use the distributed module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it by using a global pointer, which will only be allocated by CreateReceiver
API
bool SocketCommunicator::InitSender(const char* ip, int port) { | ||
// Sender only has a client socket | ||
socket_.resize(1); | ||
socket_[0] = new TCPSocket(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use shared_ptr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to control the pointer by handle in Finalize() method.
src/graph/network/msg_queue.cc
Outdated
} | ||
} | ||
|
||
int MessageQueue::Add(const char* src, int size, bool is_blocking) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a large memory copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not very easy to implement zero-copy here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how much does the memory copy affect the performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me evaluate it by removing the message queue for a test.
I have updated |
It seems that the NodeFlow and Sampler API has been changed. I need to update the distributed sampler code. |
@jermainewang Could we change the branch from I have also pushed another PR to Da's local dgl branch. |
* refactor. * accelerate update_all in nodeflow. * fix. * refactor. * fix lint. * fix lint. * reorganize. * reorg. * remove. * add doc. * impl block_incidence_matrix * fix lint. * fix. * simple fix. * fix test. * fix interface. * fix eid. * fix comments.
@@ -105,18 +82,18 @@ DGL_REGISTER_GLOBAL("network._CAPI_ReceiverRecvSubgraph") | |||
if (size <= 0) { | |||
LOG(ERROR) << "Receive error: (size: " << size << ")"; | |||
} | |||
NodeFlow nf; | |||
NodeFlow* nf = new NodeFlow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is the memory free'd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the sampler free the memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are manually freed from Python code after instantiation of NodeFlow
objects (python/dgl/nodeflow.py:59
)
* update version * update news
* fix cython bug * fix * fix
* update README performance number * news
…mutable graph. (#437) * fix rgcn tutorial * small fix * upd * findedge/s * upd * upd * upd * upd * add test * remove redundancy * upd * upd * upd * upd * add edge_subgraph * explicit cast * add test immutable subg * reformat * reformat * fix bug * upd * hotfix * subgraph docs * fix adj mat * fix edges() order * add test * revert * upd * fix * upd
…449) * fix send with builtin bug * test cases * remove todo comment
* refactored sampler code * docstring * fix tutorial
* fix performance bug in EdgeSoftmax * minor
* fix bug in pickling * fix lint
…#457) * forgot prefetch switch * yet another stupid bug
const IdArray& node_mapping, | ||
const IdArray& edge_mapping, | ||
const IdArray& layer_offsets, | ||
const IdArray& flow_offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface makes changing the nodeflow structure subtle; one needs to also change the interface here (and the deserialization, and all the calling statements).
Is it possible to change this to something like size_t SerializeNodeFlow(char *buffer, const NodeFlow &nf)
?
EDIT: or maybe make these methods of NodeFlow
instead (e.g. size_t NodeFlow::Serialize(char *buffer)
and void NodeFlow::Deserialize(const char *buffer, size_t len)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem. I'll do this change after new NodeFlow structure merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If "new NodeFlow structure" refers to the one with optional tensor data (which is the one I'm using) then it won't be merged until after PR #453. It's your call but I would prefer changing the interface now so later on NodeFlow could be extended further more easily.
Description
This PR add the C++ RPC infrastructure and distributed sampler API.
Now we need to add Windows wrapper for the c socket API for tcp_socket.cc. Anyone can help me do this job? I don't have the windows development environment.
Also, this code has been tested on multiple machines outside the unittest. If we need to add unittest test for the network code?
Checklist
Please feel free to remove inapplicable items for your PR.
or have been fixed to be compatible with this change
Changes