Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allreduce/broadcast cache #98

Open
wants to merge 50 commits into
base: master
from

Conversation

Projects
None yet
2 participants
@chenqin
Copy link
Contributor

commented Jul 2, 2019

The goal of this pr is to implement immutable cache in rabit to help failed worker recover not syned allreduces in bootstrap time. examples of such are

  • distributed xgboost load dataset and sync number of columns before loadcheckpoint.
  • init histgram in fast hist algorithm only runs before completion of first iteration checkpoint

It's specifically designed to help recovered node catch up with rest of nodes with minimal overhead in non recovery mode.

  • setcache only writes locally after allreduce/broadcast complete
  • getcache send/recv two integer in one allreduce call and decide if all nodes are running in sync
  • when nodes are in sync, skip reading cache, only recovery nodes in bootstrap do one time catch rebuild from nearest nodes

Chen Qin and others added some commits Jun 5, 2019

clarify rabit cache strategy, cache is set only by successful collective
call involving all nodes with unique cache key. if all nodes call
getcache at same time, we keep rabit run collective call. If some nodes
call getcache while others not, we backfill cache from those nodes with
most entries

@chenqin chenqin marked this pull request as ready for review Jul 2, 2019

@chenqin

This comment has been minimized.

Copy link
Contributor Author

commented Jul 9, 2019

also needs to point out rabit has initial design of split checkpoint from result cache. There are good amount of trade off in making those differences.

it's different, result cache in the original rabit is more about an optimization to avoid recompute in the health node and it is completely hidden from the user...if we do not have result cache, we can also achieve the same ft functionality which is coordinate every worker to start from a checkpoint and re-compute

If I understand correctly, It's same as fail all nodes and load checkpoint starts from same code path. in this case checkpoint has to be stored in remote file system since all nodes are failing and restart. If we decided to go for this approach, we can get ride of allreducerobust and link to allreducebase. Checkpoint and loadcheckpoint can be done relatively easily with dmlc::stream pointing to a url path.
I wasn't familiar how interface between xgb-spark and xgboost was done. How can we make sure xgboost node failure will not cause spark task fail?

but this PR exposes SetCache/GetCache to XGBoost layer, which means, as I said, when I develop a new tree growing algorithm in xgboost, I need to worry about what is to be from cache so that I should set a cache instead of a checkpoint and what is checkpointed so that I can expect it is recovered not needing anything else

That's fair point, another way is to eliminate cache proposed in this pr and keep resbuf and checkpoint payload in rocksdb (disk) and always keep unique list of allreduce/broadcast call payloads identified by signature (hide to user) take fast hist as example, kv store keeps three category of results on disk.
`

bootstrap section (before first success checkpoint) (essentially used to serve as cache)
'dmatrix::load': 127
'columnsamper::init': 3
'histogram::init' : 'binary'
...

iteration section (after last checkpoint section) (essentially what resbuf query and pushtemp did)
synchisto:seq0: ''
...
buildhistorgram:seq10:''

last success checkpoint (essentially checkpoint payload)
version x: ''

`

so the checkpoint will be like any node do async snapshot of all three sections and upload to hdfs while executing next iterations mutating content of table. https://rocksdb.org/blog/2015/11/10/use-checkpoints-for-efficient-snapshots.html

fix things broken for years in distributed recovery

I am not sure about whether it has fixed the issue

first of all, someone still complain about different errors in xgboost repo based on your changes, and it is not tested with anything in production

second, I am seeing changes in DMatrix::Load() and nothing in approx algorithm, that means nothing is changed in the current XGB-Spark code path, but obviously XGB-Spark single task recovery is not working...

third, there is something weird like https://github.com/dmlc/xgboost/pull/4636/files#diff-3d55545d6652c5ed31b10509913e0f48R210 and https://github.com/dmlc/xgboost/pull/4636/files#diff-10aac201acca00858b390ff0d679434fR227

example pr actual works in progress.

As a new feature, I haven't expose it to either c_api nor java side for good reason of stability of failure recovery

I am not talking about XGB users, I am talking about rabit user, so you have exposed new APIs to XGBoost devs to (repeat) worry about what is to be from cache so that I should set a cache instead of a checkpoint and what is checkpointed so that I can expect it is recovered not needing anything else

back to xgboost's usage of allreduce, a quick glance of the PR tells me things that are missing from the original recovery

1. column_number: if you loaded a checkpoint of model, it should be recovered from the model itself instead of relying on another allreduce operation

2. sketch: for approx, it is recalculated everytime, looks like fine, but for fast-histogram, it reuses the init results, so we need to checkpoint it => obviously, what is to be checkpointed is different for different tree growing algorithms, as a user of rabit, I should not using different APIs for this situation but only change what is passed to SaveRabitCheckpoint()...the logic is that HBase provides a single set of APIs like Put/Get, whether to add random salt in my key is my business logic, but not something I should defer to HBase to demand some API called Put and PutWithHashSalt()

same as rocksdb like, I inline to have rocksdb as lib to deal with consistency and snapshot gen. XGB-Spark might be able to integrate with uploading local binary to remote and vice versa.

@CodingCat

This comment has been minimized.

Copy link
Member

commented Jul 9, 2019

it's same as fail all nodes and load checkpoint starts from same code path. in this case checkpoint has to be stored in remote file system since all nodes are failing and restart.

no, my proposal is actually inspired by flink's rollback mechanism http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf (3.2.2)

@chenqin

This comment has been minimized.

Copy link
Contributor Author

commented Jul 9, 2019

it's same as fail all nodes and load checkpoint starts from same code path. in this case checkpoint has to be stored in remote file system since all nodes are failing and restart.

no, my proposal is actually inspired by flink's rollback mechanism http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf (3.2.2)

Yes, I read about it before, two key points IMO

  1. source can be rewind anytime
  2. restart all hosts once single point of failure happens.
@CodingCat

This comment has been minimized.

Copy link
Member

commented Jul 10, 2019

it's same as fail all nodes and load checkpoint starts from same code path. in this case checkpoint has to be stored in remote file system since all nodes are failing and restart.

no, my proposal is actually inspired by flink's rollback mechanism http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf (3.2.2)

Yes, I read about it before, two key points IMO

  1. source can be rewind anytime
  2. restart all hosts once single point of failure happens.
  1. there is no addressable source for machine learning, so as long as the partition data is there , it should be fine

  2. we are not to move flink's mechanism to here directly, let's focus on this part upon initialization, retrieve their allocated shard of state. , in rabit + xgb context, those steps happened only in the first iteration is treated as initialization, the model from the last iteration is allocated shard of state...so once there is a failure, tracker can detect that, and then broadcast a command to all workers

this command instructs the workers to load checkpoint (this step actually rollback the model to the last iteration), and then the workers are to run initialization and load the result buffer or rerun allreduce/broadcast....

but I found the current code structure makes it very difficult to pull workers back when they are blocked for an allreduce/broadcast (<= if we can do this....hmmm....)

@chenqin

This comment has been minimized.

Copy link
Contributor Author

commented Jul 10, 2019

it's same as fail all nodes and load checkpoint starts from same code path. in this case checkpoint has to be stored in remote file system since all nodes are failing and restart.

no, my proposal is actually inspired by flink's rollback mechanism http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf (3.2.2)

Yes, I read about it before, two key points IMO

  1. source can be rewind anytime
  2. restart all hosts once single point of failure happens.
  1. there is no addressable source for machine learning, so as long as the partition data is there , it should be fine

that's true

  1. we are not to move flink's mechanism to here directly, let's focus on this part upon initialization, retrieve their allocated shard of state. , in rabit + xgb context, those steps happened only in the first iteration is treated as initialization, the model from the last iteration is allocated shard of state...so once there is a failure, tracker can detect that, and then broadcast a command to all workers

Just double confirm allreduce and broadcast are synchronous blocking call and results are same across all nodes, unlike partitioned operator keyby some partition key and hold part of state of logic operation. Rabit state is much naive simpler. So if we want to apply what the paper described, we can also do checkpoint or any rank really.

yeah, we can instrument init in rabit level but logic actually runs on framework like xgboost. Certain amount of in memory objects (user states in flink term) in xgboost needs to recompute from last checkpoint and we just go back to restart strategy.

chenqin added some commits Jul 11, 2019

try avoid seqno overflow to negative by offseting specifial flag value
adding cache seq no to checkpoint/load checkpoint/check point ack to avoid
confusion from cache recovery
Merge pull request #2 from chenqin/test
Merge back debugging and stall checkpoint fixes back to master
explicit ActionSummary(int flag, int minseqno = kSpecialOp) {
seqcode = (minseqno << 4) | flag;
explicit ActionSummary(int action_flag, int role_diff_flag = 0,
int minseqno = kSpecialOp, int maxseqno = kSpecialOp) {

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

why we need to regulate max seq no

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

I don't think there is something like int overflow,

if seqno is increment by 1 for each operation, in fast histo, for depthwise growing, (for iterations after the first one) we only have 1 + tree's height sync for histogram, for loss-guided, we have at most number of leaf nodes sync; for approx, the number of sync is tree's height (* 2?),

why it is overflowing?

This comment has been minimized.

Copy link
@chenqin

chenqin Jul 18, 2019

Author Contributor

why we need to regulate max seq no

maxseqno is keeping cache_seq and seperate itself from allreduce seqno, what it does is to get number of cache entries in healthy and decide if backfill recovered host is needed.

why it is overflowing?

First what overflow is minseqno dmlc/xgboost#4250 (comment). what minseqno does is to keep minimal seqno within all nodes, with one exception.

kSpecialOp which is very large number https://github.com/dmlc/rabit/blob/master/src/allreduce_robust.h#L165
when it construct, it bitwise move 4 digits to leave space for four masks, latter move 5 digits as we tried to add one more mask.

https://github.com/dmlc/rabit/blob/master/src/allreduce_robust.h#L188
This is likely cause overflow as singed int type (1 << 26) << 5 + 2^6-1(flags) > 2^31-1

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

why the relationship is defined as min and max? you just distinguish operations from different sources

same for SeqType, why it is called OR/AND ?

@CodingCat
Copy link
Member

left a comment

there are many changes with no clear reason in this PR, it's not in anywhere close to be ready

Show resolved Hide resolved src/allreduce_robust.cc Outdated
Show resolved Hide resolved src/allreduce_robust.h Outdated
Show resolved Hide resolved src/engine_empty.cc
// there are difference sequence number the nodes proposed
// this means we want to do recover execution of the lower sequence
// action instead of normal execution
static const int kDiffSeq = 8;
static const int kDiffSeq = 16;

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

do we really need to add a bit? maybe make the new operation as 9, and just do XOR to check?

This comment has been minimized.

Copy link
@chenqin

chenqin Jul 18, 2019

Author Contributor

those bits could exists at same time 2 | 4 | 8 | 16
XOR lose it's property as reducer might run multiple times on different nodes

This comment has been minimized.

Copy link
@chenqin

chenqin Jul 18, 2019

Author Contributor

humm, a min, I think we should use u_int32_t 2^32 -1

for one, it gives use bigger address space to place seqno.
for second, it can avoid overflow.

regarding to that extra bit, let me dig a bit.

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

I searched code base a bit, I didn't see the different bits in the flag can be set at the same time (except the big indicating difference) ? can you point me to the code if there is any?

This comment has been minimized.

Copy link
@chenqin

chenqin Jul 19, 2019

Author Contributor

okay, I am afraid we can't use 9, action summary will go through each bit and try to apply either & or | to get permutation of flag sets (1111, 1101 etc).
1
10
100
1000

About do XOR, I have same thoughts when I tried to find if all nodes are calling getcache at same time. Here is what it might happen, assume four nodes with load_cache bit <1, 1, 0, 0> the expected results would be 1. if we apply xor in reducer, it will produce 0 same as <0,0,0,0> or <1,1,1,1>

// there are difference sequence number the nodes proposed
// this means we want to do recover execution of the lower sequence
// action instead of normal execution
static const int kDiffSeq = 8;
static const int kDiffSeq = 16;

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

I searched code base a bit, I didn't see the different bits in the flag can be set at the same time (except the big indicating difference) ? can you point me to the code if there is any?

// special sequnce number for local state checkpoint ack signal
static const int kLocalCheckAck = (1 << 26) - 1;
static const int kLocalCheckAck = (1 << 20) - 1;

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

why we change it to 20?

This comment has been minimized.

Copy link
@chenqin

chenqin Jul 19, 2019

Author Contributor

so we change to 20,

This is likely cause overflow as singed int type (1 << 26) << 5 + 2^6-1(flags) > 2^31-1

actually I plan to use u_int32_t and keep original special consts chenqin@b71f662

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 19, 2019

Member

I mean the choice of 20 is pretty random, if it is just 1 bit added in flag, we didn't choose to 25?


option(RABIT_BUILD_TESTS "Build rabit tests" OFF)
option(RABIT_BUILD_MPI "Build MPI" OFF)
option(RABIT_BUILD_DMLC "Include DMLC_CORE in build" ON)
option(RABIT_BUILD_DMLC "Include DMLC_CORE in build" OFF)

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 18, 2019

Member

why this PR involves changes on make/cmake files?

This comment has been minimized.

Copy link
@chenqin

chenqin Jul 19, 2019

Author Contributor

this is a bug fix, set on means we expect dmlc-core subdirectory in rabit. It is the case for travis ci as we do run script and download project. In xgboost project, rabit by default should use dmlc-core subproject in same directory level as rabit.

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 19, 2019

Member

but the build in xgboost is always succeeded?

@chenqin

This comment has been minimized.

Copy link
Contributor Author

commented Jul 19, 2019

I searched code base a bit, I didn't see the different bits in the flag can be set at the same time (except the bit indicating difference) ? can you point me to the code if there is any?

  1. checking if all nodes have same seqno and recover the node with same seqno as minimal seqno from act (allreduce acknowledgement)
    https://github.com/chenqin/rabit/blob/master/src/allreduce_robust.cc#L1095

  2. collective checkpointing, if one or more nodes are calling checkpointing by setting checkpoint flag as well as seqno to kSpecialOp, if others are calling same thing, all nodes can do checkpoint at same time. Otherwise, it might indicate at least one node is falling behind calling checkpoint, more comments in code https://github.com/chenqin/rabit/blob/master/src/allreduce_robust.cc#L1019

https://github.com/chenqin/rabit/blob/master/src/allreduce_robust.cc#L1016

  1. if all nodes are calling load checkpoint no one can offer checkpoint, return false
    https://github.com/chenqin/rabit/blob/master/src/allreduce_robust.cc#L1064

according to this comments #98 (comment)

maxseqno diff_seq is not used for now. What we can use is to check if everyone have same max cache seq, there is no need to run cache restore at all.

I guess the question is why we introduce this and how it functions.
Let me break down to what we have before this pr and how action summary works as single most important primitives in rabit. We want to have quick way to reason which state each node in by calling allreduce on actionsummary.

ActionSummary act do minimal seqno of all nodes
https://github.com/chenqin/rabit/blob/master/src/allreduce_robust.h#L266
ActionSummary act do or relationships to all nodes on each flag bits.
https://github.com/chenqin/rabit/blob/master/src/allreduce_robust.h#L268

Two requirements that are slicely different than seqno mimimal / or opeations in action summary reducer function.

As Cache cross seqno reset lifecyles and we want to only restore cache if not all nodes are calling getcache. It translate to bit operation of AND (1,1,1,1) = 1 (1,0,1,1) = 0

As we want to recover node without any cache with all cache entries, all nodes needs to know how many cache entreis with "max" value. Those nodes with max value are offering data to those don't. In case of nodes get different cache entries, the node with largest cache entry win (immutable) and rest will get reset to cache entries same as winner.

Another question is why we need this max instead of use min, the answer lay on an edge case: if node recover, its cache is empty so it will always be min with cache_seq = 0, it doesn't means the rest of nodes actually have anything to offer(they may also be 0) with "max" we can decide if we need to do cache recover at all.

In future, if we can only recover selectively based on some partition or hashing scheme. (This is one of feature in mind, still lots of questions how to implement. But high level would be if we can partition and replicated DMatrix, we can recover host without reload data from hdfs per sea)

On top of everything we talked about, we want to optimize this consensus allreduce call to minimal payload in one call verses mulitple adhoc tryallreduce calls to get "max" or "or" in flags. That leads to two seperate integers and flags SeqType. I guess naming is confusing at this point.

chenqin added some commits Jul 19, 2019

Merge pull request #3 from chenqin/test
Per feedback, adding stdout logging
bool AllreduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno,
int cache_seqno, const char* caller) {
// skip load cache state as we isolated with assertions
if (flag != 0 && flag != ActionSummary::kLoadCache) {

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 19, 2019

Member

what's the hurt to check kSpecialOp is here? from what you wrote in ActionSummary, kLoadCache is treated equally with other actions, it's confusing why we want to skip here

std::string msg = std::string(caller) + " pass negative seqno "
+ std::to_string(seqno) + " flag " + std::to_string(flag)
+ " version " + std::to_string(version_number);
utils::Assert(seqno >=0, msg.c_str());

This comment has been minimized.

Copy link
@CodingCat

CodingCat Jul 19, 2019

Member

if you can only get caller in linux, then discard it, otherwise, in mac, it shows not supported in non linux pass negative seqno which is pretty odd

@chenqin

This comment has been minimized.

Copy link
Contributor Author

commented Jul 19, 2019

Overall, I think lots of confusion caused missing a detailed design doc. I plan to work on and get everyone on same page and sign off. Stay tune.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.