Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-596 Issue-583: Auto replication should honor ensemble placement policy #641

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions bookkeeper-server/conf/log4j.shell.properties
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n

# verbose console logging
log4j.appender.VERBOSECONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.VERBOSECONSOLE.Threshold=INFO
log4j.appender.VERBOSECONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.VERBOSECONSOLE.layout.ConversionPattern=%m%n

log4j.logger.verbose=INFO,VERBOSECONSOLE
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=ERROR
log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
package org.apache.bookkeeper.client;

import io.netty.buffer.ByteBuf;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
Expand Down Expand Up @@ -81,39 +80,125 @@ public void readEntryComplete(int rc, long ledgerId, long entryId,
}
}

/**
* This will collect the bad bookies inside a ledger fragment.
*/
private static class LedgerFragmentCallback implements GenericCallback<LedgerFragment> {

private final LedgerFragment fragment;
private final int bookieIndex;
// bookie index -> return code
private final Map<Integer, Integer> badBookies;
private final AtomicInteger numBookies;
private final GenericCallback<LedgerFragment> cb;

LedgerFragmentCallback(LedgerFragment lf,
int bookieIndex,
GenericCallback<LedgerFragment> cb,
Map<Integer, Integer> badBookies,
AtomicInteger numBookies) {
this.fragment = lf;
this.bookieIndex = bookieIndex;
this.cb = cb;
this.badBookies = badBookies;
this.numBookies = numBookies;
}

@Override
public void operationComplete(int rc, LedgerFragment lf) {
if (BKException.Code.OK != rc) {
synchronized (badBookies) {
badBookies.put(bookieIndex, rc);
}
}
if (numBookies.decrementAndGet() == 0) {
if (badBookies.isEmpty()) {
cb.operationComplete(BKException.Code.OK, fragment);
} else {
int rcToReturn = BKException.Code.NoBookieAvailableException;
for (Map.Entry<Integer, Integer> entry : badBookies.entrySet()) {
rcToReturn = entry.getValue();
if (entry.getValue() == BKException.Code.ClientClosedException) {
break;
}
}
cb.operationComplete(rcToReturn,
fragment.subset(badBookies.keySet()));
}
}
}
}

public LedgerChecker(BookKeeper bkc) {
bookieClient = bkc.getBookieClient();
}

/**
* Verify a ledger fragment to collect bad bookies
*
* @param fragment
* fragment to verify
* @param cb
* callback
* @throws InvalidFragmentException
*/
private void verifyLedgerFragment(LedgerFragment fragment,
GenericCallback<LedgerFragment> cb)
throws InvalidFragmentException, BKException {
Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
if (bookiesToCheck.isEmpty()) {
cb.operationComplete(BKException.Code.OK, fragment);
return;
}

AtomicInteger numBookies = new AtomicInteger(bookiesToCheck.size());
Map<Integer, Integer> badBookies = new HashMap<Integer, Integer>();
for (Integer bookieIndex : bookiesToCheck) {
LedgerFragmentCallback lfCb = new LedgerFragmentCallback(
fragment, bookieIndex, cb, badBookies, numBookies);
verifyLedgerFragment(fragment, bookieIndex, lfCb);
}
}

/**
* Verify a bookie inside a ledger fragment.
*
* @param fragment
* ledger fragment
* @param bookieIndex
* bookie index in the fragment
* @param cb
* callback
* @throws InvalidFragmentException
*/
private void verifyLedgerFragment(LedgerFragment fragment,
GenericCallback<LedgerFragment> cb) throws InvalidFragmentException {
long firstStored = fragment.getFirstStoredEntryId();
long lastStored = fragment.getLastStoredEntryId();

// because of this if block, even if the bookie of the fragment is
// down, it considers Fragment is available/not-bad if firstStored
// and lastStored are LedgerHandle.INVALID_ENTRY_ID.
// So same logic is used in BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie
// if any change is made here, then the changes should be in BookieShell also
int bookieIndex,
GenericCallback<LedgerFragment> cb)
throws InvalidFragmentException {
long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
long lastStored = fragment.getLastStoredEntryId(bookieIndex);

BookieSocketAddress bookie = fragment.getAddress(bookieIndex);
if (null == bookie) {
throw new InvalidFragmentException();
}

if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
// this fragment is not on this bookie
if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
throw new InvalidFragmentException();
}
cb.operationComplete(BKException.Code.OK, fragment);
return;
}
if (firstStored == lastStored) {
} else if (firstStored == lastStored) {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
bookieClient.readEntry(fragment.getAddress(), fragment
bookieClient.readEntry(bookie, fragment
.getLedgerId(), firstStored, manycb, null);
} else {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
fragment, cb);
bookieClient.readEntry(fragment.getAddress(), fragment
.getLedgerId(), firstStored, manycb, null);
bookieClient.readEntry(fragment.getAddress(), fragment
.getLedgerId(), lastStored, manycb, null);
bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored, manycb, null);
bookieClient.readEntry(bookie, fragment.getLedgerId(), lastStored, manycb, null);
}
}

Expand Down Expand Up @@ -191,44 +276,49 @@ public void checkLedger(LedgerHandle lh,
for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : lh
.getLedgerMetadata().getEnsembles().entrySet()) {
if (curEntryId != null) {
Set<Integer> bookieIndexes = new HashSet<Integer>();
for (int i = 0; i < curEnsemble.size(); i++) {
fragments.add(new LedgerFragment(lh, curEntryId,
e.getKey() - 1, i));
bookieIndexes.add(i);
}
fragments.add(new LedgerFragment(lh, curEntryId,
e.getKey() - 1, bookieIndexes));
}
curEntryId = e.getKey();
curEnsemble = e.getValue();
}

/* Checking the last fragment of the ledger can be complicated in some cases.



/* Checking the last segment of the ledger can be complicated in some cases.
* In the case that the ledger is closed, we can just check the fragments of
* the ledger as normal, except in the case that no entry was ever written,
* to the ledger, in which case we check no fragments.
* the segment as normal even if no data has ever been written to.
* In the case that the ledger is open, but enough entries have been written,
* for lastAddConfirmed to be set above the start entry of the fragment, we
* for lastAddConfirmed to be set above the start entry of the segment, we
* can also check as normal.
* However, if lastAddConfirmed cannot be trusted, such as when it's lower than
* the first entry id, or not set at all, we cannot be sure if there has been
* data written to the fragment. For this reason, we have to send a read request
* However, if ledger is open, sometimes lastAddConfirmed cannot be trusted,
* such as when it's lower than the first entry id, or not set at all,
* we cannot be sure if there has been data written to the segment.
* For this reason, we have to send a read request
* to the bookies which should have the first entry. If they respond with
* NoSuchEntry we can assume it was never written. If they respond with anything
* else, we must assume the entry has been written, so we run the check.
*/
if (curEntryId != null && !(lh.getLedgerMetadata().isClosed() && lh.getLastAddConfirmed() < curEntryId)) {
if (curEntryId != null) {
long lastEntry = lh.getLastAddConfirmed();

if (lastEntry < curEntryId) {
if (!lh.isClosed() && lastEntry < curEntryId) {
lastEntry = curEntryId;
}

final Set<LedgerFragment> finalFragments = new HashSet<LedgerFragment>();
Set<Integer> bookieIndexes = new HashSet<Integer>();
for (int i = 0; i < curEnsemble.size(); i++) {
finalFragments.add(new LedgerFragment(lh, curEntryId,
lastEntry, i));
bookieIndexes.add(i);
}
final LedgerFragment lastLedgerFragment = new LedgerFragment(lh, curEntryId,
lastEntry, bookieIndexes);

// Check for the case that no last confirmed entry has
// been set.
// Check for the case that no last confirmed entry has been set
if (curEntryId == lastEntry) {
final long entryToRead = curEntryId;

Expand All @@ -237,7 +327,7 @@ public void checkLedger(LedgerHandle lh,
new GenericCallback<Boolean>() {
public void operationComplete(int rc, Boolean result) {
if (result) {
fragments.addAll(finalFragments);
fragments.add(lastLedgerFragment);
}
checkFragments(fragments, cb);
}
Expand All @@ -250,7 +340,7 @@ public void operationComplete(int rc, Boolean result) {
}
return;
} else {
fragments.addAll(finalFragments);
fragments.add(lastLedgerFragment);
}
}

Expand All @@ -275,7 +365,10 @@ private void checkFragments(Set<LedgerFragment> fragments,
LOG.error("Invalid fragment found : {}", r);
allFragmentsCb.operationComplete(
BKException.Code.IncorrectParameterException, r);
} catch (BKException e) {
LOG.error("BKException when checking fragment : {}", r, e);
}
}
}

}