New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue-596 Issue-583: Auto replication should honor ensemble placement policy #641
Conversation
…n using local bookie - bk ledger checker should consider stripping - remove bookie address from replication worker, so the replication worker would choose bookies from cluster rather than using local bookie.
- handle BKNotEnoughtBookiesException
@jvrao - this pull request is to port the autorecovery changes from twitter branch to oss. it will fix the ledger checker issue on stripping ensembles. I was trying to separate the changes but because originally it was merged as one change in twitter branch, it is a bit hard for me to separate them out now. so I port that change in one pull request as it was in the twitter branch. If you feel strong on splitting this change into multiple smaller changes for reviews, I can try it again but it will take a while for me to do that. let me know what is your preference. |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first commit: Minor stuff to change. Overall looks good.
@@ -495,9 +498,20 @@ public SyncObject() { | |||
*/ | |||
public void recoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should deprecate this call, since it's no different to recoverBookieData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or remove it completely, since the patch does remove the async version, though this would be a BC break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this method. we don't define the binary backward compatibility for bookkeeper admin, and I doubt people depends on bookkeeper admin programmatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I keep this method unchanged is because BookieShell is using this method. I am changing bookie shell in the subsequent change, so I leave this method unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good decision.
* @param cb | ||
* RecoverCallback to invoke once all of the data on the dead | ||
* bookie has been recovered and replicated. | ||
* @param context | ||
* Context for the RecoverCallback to call. | ||
*/ | ||
public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest, | ||
public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BC break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is anyway a semantic breaking change. the original semantic is wrong and doesn't respect placement policy. so we have to break BC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well if the operator knows what they are doing it can be performed by respecting the placement policy. But I agree with the change as it is good to be fungible and let the engine do the right thing.
private void getActiveLedgers(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest, | ||
final RecoverCallback cb, final Object context, final List<BookieSocketAddress> availableBookies) { | ||
private void getActiveLedgers(final Set<BookieSocketAddress> bookiesSrc, final boolean dryrun, | ||
final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) { | ||
// Wrapper class around the RecoverCallback so it can be used | ||
// as the final VoidCallback to process ledgers | ||
class RecoverCallbackWrapper implements AsyncCallback.VoidCallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to happen as part of this PR, but this could be replaced by a lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created: #655
@@ -663,7 +610,7 @@ public void processResult(int rc, String path, Object ctx) { | |||
Processor<Long> ledgerProcessor = new Processor<Long>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, we don't need to create an instance, just use a lambda
(lid, cb) -> { recoverLedger(bookiesSrc, ledgerId, dryrun, skipOpenLedgers, iterCallback); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return; | ||
} | ||
|
||
boolean fenceRequired = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final boolean fenceRequired = containBookies(lm.getEnsembles().lastEntry(), bookieSrc);
and make containsBookie handle null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I understand this comment. I don't see the difference of your suggestion and my change. there isn't null validation here though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is to simplify the code and only update fenceRequired once. Rather than having containBookies take a List, change it to take a Map.Entry, which avoids all the lastKey stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still isn't clear to me. fenceRequired is only set when 1) a ledger is not closed 2) last ensemble contain the src bookie. are you suggesting me pushing both 1) and 2) into containBookies function? this doesn't sound correct to me though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, in that case i'd do
final boolean fenceRequired = !lm.isClosed() && containBookies(lm.getEnsembles().lastEntry(), bookieSrc);
What I want to avoid is setting fenceRequired more than once. The logic here is complex. When reading the code you need to check what a value should be set to, and this is harder if it is set in multiple places.
lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies); | ||
} | ||
|
||
private Map<Integer, BookieSocketAddress> getReplacedBookies( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this actually mutates the ensemble. I would call this replaceBookiesAndGet, so that it is clear that mutation is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Longterm, I think we should stop mutating the ensembles like this. The ensemble for a segment is the ensemble of bookies that "vote" on an entry. It should be immutable. When we rereplicate, we should add another field to the metadata for "extraReplicas", and we can then merge on the read. This would simplify a lot of the recovery logic around ensemble changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, where do you see the mutation happening? this method is calculating the bookies to replace based on ensemble. the actual mutation of ledger metadata only happens after successful re-replication.
extraReplicas
would actually introduce complexity on reading, garbage collection and etc. it is actually not too bad for the ledgers based on a metadata store. it can be bad for log0
stuff, but the ledgers in log0
can be a special case that disable ensemble change which a ledger is effectively an ensemble.
lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate)); | ||
} | ||
|
||
private Map<Integer, BookieSocketAddress> getReplacedBookiesByIndexes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise
final LedgerFragment ledgerFragment, | ||
final BookieSocketAddress targetBookieAddress) | ||
final Map<Integer, BookieSocketAddress> targetBookieAddresses) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are there multiple bookies being passed for a single fragment? A fragment is the subdivision of a segment that is stored on a single bookie.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're changing the definition of fragment later on too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, a fragment is a range of ledger entries. it is not per bookie. so when we replicate a fragment, it can be more efficient on replicating to multiple bookies.
+ ") is still part of the current" | ||
+ " active ensemble for ledgerId: " + lh.getId()); | ||
ledgerFragmentMcb.processResult(BKException.Code.OK, null, null); | ||
return; | ||
} | ||
if (startEntryId > endEntryId) { | ||
if (startEntryId > endEntryId || endEntryId <= -1L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use INVALID_ENTRY_ID rather than -1
@@ -43,6 +43,7 @@ | |||
public final static String NUM_BYTES_READ = "NUM_BYTES_READ"; | |||
public final static String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN"; | |||
public final static String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN"; | |||
public final static String REPLICATE_EXCEPTION = "exceptions"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tab
underreplicationManager | ||
.releaseUnderreplicatedLedger(ledgerIdToReplicate); | ||
getExceptionCounter(e.getClass().getSimpleName()).inc(); | ||
|
||
if (e instanceof BKNotEnoughBookiesException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big fan of this. Maybe create a logErrorAndRelease(BKException e, long ledgerId) method to move out the common stuff and handle BKNotEnoughBookiesException in a different catch clause.
@ivankelly I used merge than rebase, because I had the impression that github will lose commit related comments when using rebase. the change was actually only one commit in twitter branch, so eventually it will be squashed anymore. |
@sijie will review it today. |
@jvrao thank you |
- change `replaced` to `replacement` - fix the tab indent
@ivankelly - I addressed your comments. please review it when you have time. |
Refer to this link for build results (access rights to CI server needed): |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to go, the things I'm commenting are all sugar, though I would like to see System.out avoided.
for (Integer bookieIndex : bookieIndexesToRereplicate) { | ||
BookieSocketAddress oldBookie = ensemble.get(bookieIndex); | ||
BookieSocketAddress newBookie = | ||
bkc.getPlacementPolicy().replaceBookie( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
placement policy is the property of the client. In theory, we can have multiple clients connected to the cluster with multiple placement policies. This should be part of metadata right?. So looks like our assumption is cluster-wide same placement policy and it needs to be set properly on the auditor/replication worker nodes too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes and no. it is sort of cluster-wide, but it is actually per client. if your clients share same configuration, then it is same placement policy.
This placement policy has to be set in the configuration for auditors and replication workers.
@ivankelly I pushed a change to address your comments. |
Refer to this link for build results (access rights to CI server needed): |
CI passed. |
#shipit |
Trying to merge but getting a strange error: |
@ivankelly interesting. encountered same error here. so I guess the merge script doesn't find any approvals. something is wrong in the json response, i guess. you can comment |
I checked the json response of getting all the comments. it turns out the response doesn't have your recent comments. |
seems the comment |
…mble placement policy Descriptions of the changes in this PR: This pull request ports the changes from [twitter/bookkeeperfc7e171](twitter-archive/bookkeeper@fc7e171) The changes include: 1. when bookkeeper admin re-replicates a ledger, it will pick a bookie from the available bookies in the cluster to satisfy the placement constraint. (apache#596) 2. hence, remove `targetBookie` from ReplicationWorker, because the parameter will volatile the placement constraint. (apache#583) 3. at the same time, change `LedgerFragement` to represent the number of bookies that need to be check and replicate. a) the ledger checker can use the correct bookie index for verifying the existence of entries in a bookie (for stripping case) b) only read entries one time when need to replicate them to multiple bookies. Author: Sijie Guo <sijie@apache.org> Reviewers: Ivan Kelly <ivank@apache.org> This closes apache#641 from sijie/twitter_autorecovery_fixes, closes apache#596, closes apache#583
Descriptions of the changes in this PR:
This pull request ports the changes from twitter/bookkeeper@fc7e171
The changes include:
targetBookie
from ReplicationWorker, because the parameter will volatile the placement constraint. (ReplicationWorker should choose bookies from cluster rather than using local bookie #583)LedgerFragement
to represent the number of bookies that need to be check and replicate. a) the ledger checker can use the correct bookie index for verifying the existence of entries in a bookie (for stripping case) b) only read entries one time when need to replicate them to multiple bookies.