Skip to content

Conversation

@CodingCat
Copy link
Member

No description provided.

@CodingCat
Copy link
Member Author

@mli @piiswrong

Copy link
Member

@yzhliu yzhliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to track customer_id in the scheduler (for detecting nodes from the same ip:port), that's why customer_id is recorded everywhere. Am I right?

void Accept(const Message& recved) { recv_queue_.Push(recved); }
void Accept(const Message& recved) {
recv_queue_.Push(recved);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to make these functions inline
(app_id(), customer_id(), Accept())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* \brief whether it is ready for sending. thread safe
*/
bool IsReady() { return ready_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline my_node, GetTimestamp, IsReady

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

void ProcessDataMsg(Message* msg);

void UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set, Meta* nodes,
Meta* recovery_nodes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls add comments for these functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* \param recv_handle the functino for processing a received message
*/
Customer(int id, const RecvHandle& recv_handle);
Customer(int app_id, int customer_id, const RecvHandle& recv_handle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc for customer_id

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

bool is_worker_, is_server_, is_scheduler_;
int num_servers_, num_workers_;
bool barrier_done_;
std::unordered_map<int, std::unordered_map<int, bool>> barrier_done_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to insert space between two >, i.e., > >.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

int drop_rate_ = 0;
std::atomic<int> timestamp_{0};
std::mutex start_mu_;
int init_stage = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init_stage_
is it the variable to deal with multiple instances starting in one process?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

customers_[id] = customer;
int app_id = CHECK_NOTNULL(customer)->app_id();
if (customers_.find(app_id) == customers_.end()) {
customers_[app_id] = *(new std::unordered_map<int, Customer*>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory leak

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how? please forgive that I have nearly forgotten everything of c++, but this should be alive as long as the whole process

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

customers_[...] = will call copy constructor of the right one, i.e., *(new ...). Then you just lost the pointer and have no chance to free the memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I understand correctly, it is because of the [] operator of unordered_map: http://www.cplusplus.com/reference/unordered_map/unordered_map/operator[]/? (If k does not match the key of any element in the container, the function inserts a new element with that key and returns a reference to its mapped value. Notice that this always increases the container size by one, even if no mapped value is assigned to the element (the element is constructed using its default constructor).)

56c90af should fix it

<< customer_id << " already exists\n";
customers_[app_id][customer_id] = customer;
if (barrier_done_.find(app_id) == barrier_done_.end()) {
barrier_done_[app_id] = *(new std::unordered_map<int, bool>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory leak

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how? please forgive that I have nearly forgotten everything of c++, but this should be alive as long as the whole process

src/van.cc Outdated
new std::thread(&Van::Receiving, this));
init_stage++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sleep here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like some leftover after some changes, removed

app.set_request_handle(ReqHandle);

Start();
Start(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is always 0 for ps::Start(), do we have to add arg customer_id for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just a test file, in the following PR (against mxnet, this will be different numbers)

@CodingCat
Copy link
Member Author

We need to track customer_id in the scheduler (for detecting nodes from the same ip:port), that's why customer_id is recorded everywhere. Am I right?

yeah, that's part of the reason, we track customer_id everywhere because multiple customers would share the same I/O channel (ip_addr:port) and we need to dispatch the msg to the right thread running the customer

@CodingCat
Copy link
Member Author

@Javelinjs thanks for the review, I addressed some and replied in the comments

@CodingCat
Copy link
Member Author

@Javelinjs more comments about this?

@yzhliu
Copy link
Member

yzhliu commented Dec 11, 2017

good to me @mli

@CodingCat
Copy link
Member Author

thank @Javelinjs , ping @mli


// node's address string (i.e. ip:port) -> node id
// this map is updated when ip:port is received for the first time
std::unordered_map<std::string, int> connected_nodes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add '_' at the end for private variables

msg.meta.head = req_head;
if (req_body.size()) msg.meta.body = req_body;
int ts = obj_->NewRequest(recv_id);
int ts = obj_ -> NewRequest(recv_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space around ->

@mli
Copy link
Member

mli commented Dec 18, 2017

Is there a test case to test running multiple workers in a single process

@CodingCat
Copy link
Member Author

@mli , no test case here, but I have tested with mxnet-spark (modified version)

will add one here

@CodingCat
Copy link
Member Author

looks like the behavior of the program is quite different in Mac and Travis...I will look into it further

@CodingCat
Copy link
Member Author

please do not merge for now, there is some race condition which I am still looking into

@CodingCat CodingCat force-pushed the unique_id_2 branch 2 times, most recently from a856a8c to f7bb369 Compare December 24, 2017 17:08
@CodingCat
Copy link
Member Author

@mli @Javelinjs thanks for the review

it turns out that there is an issue when connecting to the client socket creator for more than one time, I fixed the duplicate connections and repeated tests for many times (with the newly added multi-worker tests)

now it is ready for the further review

Happy Christmas!

@CodingCat
Copy link
Member Author

@mli @Javelinjs ping ....

@mli mli merged commit aee3252 into dmlc:master Jan 2, 2018
@CodingCat
Copy link
Member Author

thanks for the review and merging, will move forward to submit a patch in mxnet repo this week @Javelinjs @mli

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants