Skip to content
Permalink
Browse files
[BOOKIE-LEDGER-RECOVERY] Fix bookie recovery stuck even with enough a…
…ck-quorum response

### Motivation
As discussed at apache/pulsar#6505 

Bk-client was not able to recover ledger which has 2 write/ack quorum and one of the bookie went down and recovery was kept failing and bookkeeper client was not able to recover the ledger.

**BK-Client log**

```
20:44:43.721 [BookKeeperClientWorker-OrderedExecutor-1-0] ERROR org.apache.bookkeeper.client.ReadLastConfirmedOp - While readLastConfirmed ledger: 1234567 did not hear success responses from all quorums
20:44:43.721 [bookkeeper-io-12-27] ERROR org.apache.bookkeeper.proto.PerChannelBookieClient - Could not connect to bookie: [id: 0xb8b97441, L:/1.1.1.1:1234]/1.1.1.2:3181, current s
tate CONNECTING : 
io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: No route to host: /1.1.1.2:3181
        at io.netty.channel.unix.Errors.throwConnectException(Errors.java:112) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.unix.Socket.finishConnect(Socket.java:269) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:665) [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:642) [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:522) [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:423) [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:330) [netty-transport-native-epoll-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) [netty-common-4.1.31.Final.jar:4.1.31.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.31.Final.jar:4.1.31.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.net.ConnectException: finishConnect(..) failed: No route to host
```

**Ledger metadata**

```
BookieMetadataFormatVersion 2
quorumSize: 2
ensembleSize: 2
length: 0
lastEntryId: -1
state: IN_RECOVERY
segment {
  ensembleMember: "1.1.1.1:3181"
  ensembleMember: "1.1.1.2:3181"
  firstEntryId: 0
}
digestType: CRC32
```

**Root cause:**
Bookie should be able to recover ledger once it receives the response from total N (`(Qw - Qa)+1`) bookies. But it was waiting for a successful response from both quorums.
Reference: https://bookkeeper.apache.org/docs/4.5.0/development/protocol/

### Modification
Bookie should be able to recover ledger once it receives the response from total N (`(Qw - Qa)+1`) bookies.


Reviewers: Diego Salvi <diego.salvi@diennea.com>,  Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes #2281 from rdhabalia/recover_q
  • Loading branch information
rdhabalia committed Mar 23, 2020
1 parent c69772b commit 86f58e8a7269c8e51f246386968ceed051375b8f
Showing 3 changed files with 107 additions and 13 deletions.
@@ -38,6 +38,7 @@ class ReadLastConfirmedOp implements ReadEntryCallback {
LedgerHandle lh;
BookieClient bookieClient;
int numResponsesPending;
int numSuccessfulResponse;
RecoveryData maxRecoveredData;
volatile boolean completed = false;
int lastSeenError = BKException.Code.ReadException;
@@ -59,6 +60,7 @@ public ReadLastConfirmedOp(LedgerHandle lh, BookieClient bookieClient,
this.bookieClient = bookieClient;
this.maxRecoveredData = new RecoveryData(LedgerHandle.INVALID_ENTRY_ID, 0);
this.lh = lh;
this.numSuccessfulResponse = 0;
this.numResponsesPending = lh.getLedgerMetadata().getEnsembleSize();
this.coverageSet = lh.distributionSchedule.getCoverageSet();
this.currentEnsemble = ensemble;
@@ -99,6 +101,7 @@ public synchronized void readEntryComplete(final int rc, final long ledgerId, fi
maxRecoveredData = recoveryData;
}
heardValidResponse = true;
numSuccessfulResponse++;
} catch (BKDigestMatchException e) {
// Too bad, this bookie didn't give us a valid answer, we
// still might be able to recover though so continue
@@ -137,8 +140,15 @@ public synchronized void readEntryComplete(final int rc, final long ledgerId, fi
}

if (numResponsesPending == 0 && !completed) {
int totalExepctedResponse = lh.getLedgerMetadata().getWriteQuorumSize()
- lh.getLedgerMetadata().getAckQuorumSize() + 1;
if (numSuccessfulResponse >= totalExepctedResponse) {
cb.readLastConfirmedDataComplete(BKException.Code.OK, maxRecoveredData);
return;
}
// Have got all responses back but was still not enough, just fail the operation
LOG.error("While readLastConfirmed ledger: {} did not hear success responses from all quorums", ledgerId);
LOG.error("While readLastConfirmed ledger: {} did not hear success responses from all quorums {}", ledgerId,
lastSeenError);
cb.readLastConfirmedDataComplete(lastSeenError, maxRecoveredData);
}

@@ -66,6 +66,7 @@
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
@@ -693,4 +694,93 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
readBk.close();
}

/**
* Validate ledger can recover with response: (Qw - Qa)+1.
* @throws Exception
*/
@Test
public void testRecoveryWithUnavailableBookie() throws Exception {

byte[] passwd = "".getBytes(UTF_8);
ClientConfiguration newConf = new ClientConfiguration();
newConf.addConfiguration(baseClientConf);
final BookKeeper readBk = new BookKeeper(newConf);
final BookKeeper newBk0 = new BookKeeper(newConf);

/**
* Test Group-1 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (3
* -2) + 1 = 2
*/
int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuormSize = 2;
LedgerHandle lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, DigestType.DUMMY, passwd);
LedgerHandle readLh = readBk.openLedgerNoRecovery(lh0.getId(), DigestType.DUMMY, passwd);
// Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS, NOT_AVAILABLE
// Expected: Recovery successful Q(response) = 2
int responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.OK, BKException.Code.NoSuchLedgerExistsException);
assertEquals(responseCode, BKException.Code.OK);
// Test 2: bookie response: OK, NOT_AVAILABLE, NOT_AVAILABLE
// Expected: Recovery fail Q(response) = 1
responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.OK, BKException.Code.BookieHandleNotAvailableException);
assertEquals(responseCode, BKException.Code.BookieHandleNotAvailableException);

/**
* Test Group-2 : Expected Response for recovery: Qr = (Qw - Qa)+1 = (2
* -2) + 1 = 1
*/
ensembleSize = 2;
writeQuorumSize = 2;
ackQuormSize = 2;
lh0 = newBk0.createLedger(ensembleSize, writeQuorumSize, ackQuormSize, DigestType.DUMMY, passwd);
readLh = readBk.openLedgerNoRecovery(lh0.getId(), DigestType.DUMMY, passwd);
// Test 1: bookie response: OK, NOT_AVAILABLE
// Expected: Recovery successful Q(response) = 1
responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.OK);
assertEquals(responseCode, BKException.Code.OK);

// Test 1: bookie response: OK, NO_SUCH_LEDGER_EXISTS
// Expected: Recovery successful Q(response) = 2
responseCode = readLACFromQuorum(readLh, BKException.Code.NoSuchLedgerExistsException, BKException.Code.OK);
assertEquals(responseCode, BKException.Code.OK);

// Test 3: bookie response: NOT_AVAILABLE, NOT_AVAILABLE
// Expected: Recovery fail Q(response) = 0
responseCode = readLACFromQuorum(readLh, BKException.Code.BookieHandleNotAvailableException,
BKException.Code.BookieHandleNotAvailableException);
assertEquals(responseCode, BKException.Code.BookieHandleNotAvailableException);

newBk0.close();
readBk.close();
}

private int readLACFromQuorum(LedgerHandle ledger, int... bookieLACResponse) throws Exception {
MutableInt responseCode = new MutableInt(100);
CountDownLatch responseLatch = new CountDownLatch(1);
ReadLastConfirmedOp readLCOp = new ReadLastConfirmedOp(ledger, bkc.getBookieClient(),
ledger.getLedgerMetadata().getAllEnsembles().lastEntry().getValue(),
new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
System.out.println("response = " + rc);
responseCode.setValue(rc);
responseLatch.countDown();
}
});
byte[] lac = new byte[Long.SIZE * 3];
ByteBuf data = Unpooled.wrappedBuffer(lac, 0, lac.length);
int writerIndex = data.writerIndex();
data.resetWriterIndex();
data.writeLong(ledger.getId());
data.writeLong(0L);
data.writerIndex(writerIndex);
for (int i = 0; i < bookieLACResponse.length; i++) {
readLCOp.readEntryComplete(bookieLACResponse[i], 0, 0, data, i);
}
responseLatch.await();
return responseCode.intValue();
}
}
@@ -313,12 +313,9 @@ public void testLedgerNoRecoveryOpenAfterBKCrashed() throws Exception {
killBookie(0);

// try to open ledger no recovery
try {
bkc.openLedgerNoRecovery(beforelh2.getId(), digestType, "".getBytes());
fail("Should have thrown exception");
} catch (BKException.BKReadException e) {
// correct behaviour
}
// should be able to open ledger with one bookie down:
// Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
bkc.openLedgerNoRecovery(beforelh2.getId(), digestType, "".getBytes());
}

@Test
@@ -351,12 +348,9 @@ public void testLedgerOpenAfterBKCrashed() throws Exception {
killBookie(0);

// try to open ledger no recovery
try {
bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
fail("Should have thrown exception");
} catch (BKException.BKLedgerRecoveryException e) {
// correct behaviour
}
// should be able to open ledger with one bookie down:
// Total bookies available 3 >= 1 (Qw(4) - Qa(4) + 1)
bkc.openLedger(beforelh2.getId(), digestType, "".getBytes());
}

/**

0 comments on commit 86f58e8

Please sign in to comment.