Skip to content

Commit

Permalink
BP-14 force() API
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 24, 2018
1 parent e475ac3 commit 6034b74
Show file tree
Hide file tree
Showing 16 changed files with 775 additions and 35 deletions.
Expand Up @@ -117,6 +117,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
private OpStatsLogger readLacAndEntryOpLogger;
private OpStatsLogger readLacAndEntryRespLogger;
private OpStatsLogger addOpLogger;
private OpStatsLogger forceOpLogger;
private OpStatsLogger writeLacOpLogger;
private OpStatsLogger readLacOpLogger;
private OpStatsLogger recoverAddEntriesStats;
Expand Down Expand Up @@ -1501,6 +1502,7 @@ private void initOpLoggers(StatsLogger stats) {
readLacAndEntryRespLogger = stats.getOpStatsLogger(
BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE);
addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP);
forceOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.FORCE_OP);
addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR);
writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP);
readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP);
Expand Down Expand Up @@ -1534,6 +1536,9 @@ OpStatsLogger getReadLacAndEntryRespLogger() {
OpStatsLogger getAddOpLogger() {
return addOpLogger;
}
OpStatsLogger getForceOpLogger() {
return forceOpLogger;
}
OpStatsLogger getWriteLacOpLogger() {
return writeLacOpLogger;
}
Expand Down
Expand Up @@ -41,6 +41,7 @@ public interface BookKeeperClientStats {

String ADD_OP = "ADD_ENTRY";
String ADD_OP_UR = "ADD_ENTRY_UR"; // Under Replicated during AddEntry.
String FORCE_OP = "FORCE";
String READ_OP = "READ_ENTRY";
// Corrupted entry (Digest Mismatch/ Under Replication) detected during ReadEntry
String READ_OP_DM = "READ_ENTRY_DM";
Expand All @@ -63,7 +64,9 @@ public interface BookKeeperClientStats {
String CHANNEL_ADD_OP = "ADD_ENTRY";
String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY";
String CHANNEL_WRITE_LAC_OP = "WRITE_LAC";
String CHANNEL_FORCE_OP = "FORCE";
String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC";
String CHANNEL_TIMEOUT_FORCE = "TIMEOUT_FORCE";
String CHANNEL_READ_LAC_OP = "READ_LAC";
String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
Expand Down
Expand Up @@ -149,6 +149,13 @@ public WriteSet copy() {
*/
WriteSet getWriteSetForLongPoll(long entryId);

/**
* Return the set of bookies indices to send the messages to for force ledger.
*
* @return the set of bookies indices to send force request to.
*/
WriteSet getWriteSetForForceLedger();

/**
* An ack set represents the set of bookies from which
* a response must be received so that an entry can be
Expand Down Expand Up @@ -197,6 +204,11 @@ interface AckSet {
*/
AckSet getAckSet();

/**
* Returns an ackset object useful to wait for all bookies in the ensemble,
* responses should be checked against this.
*/
AckSet getAckSetForForceLedger();

/**
* Interface to keep track of which bookies in an ensemble, an action
Expand Down
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This represents a request to sync the ledger on every bookie.
*/
class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback {

private static final Logger LOG = LoggerFactory.getLogger(ForceLedgerOp.class);
CompletableFuture<Void> cb;
BitSet receivedResponseSet;

DistributionSchedule.AckSet ackSet;
boolean completed = false;
int lastSeenError = BKException.Code.WriteException;
ArrayList<BookieSocketAddress> currentEnsemble;

long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID;

LedgerHandle lh;

ForceLedgerOp(LedgerHandle lh, CompletableFuture<Void> cb) {
this.lh = lh;
this.cb = cb;
}

void sendForceLedgerRequest(int bookieIndex) {
lh.bk.getBookieClient().forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex);
}

@Override
public void safeRun() {
initiate();
}

void initiate() {

// capture currentNonDurableLastAddConfirmed, we are inside OrderedExecutor
this.currentNonDurableLastAddConfirmed = lh.pendingAddsSequenceHead;
LOG.debug("start force() on ledger {} capturing {} ", lh.ledgerId, currentNonDurableLastAddConfirmed);

// we need to send the request to every bookie in the ensamble
this.currentEnsemble = lh.metadata.currentEnsemble;
this.ackSet = lh.distributionSchedule.getAckSetForForceLedger();
this.receivedResponseSet = new BitSet(
lh.getLedgerMetadata().getEnsembleSize());
this.receivedResponseSet.set(0,
lh.getLedgerMetadata().getEnsembleSize());

DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSetForForceLedger();
try {
for (int i = 0; i < writeSet.size(); i++) {
sendForceLedgerRequest(writeSet.get(i));
}
} finally {
writeSet.recycle();
}
}

@Override
public void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) {
int bookieIndex = (Integer) ctx;

if (completed) {
return;
}

if (BKException.Code.OK != rc) {
lastSeenError = rc;
}

// We got response.
receivedResponseSet.clear(bookieIndex);

if (rc == BKException.Code.OK) {
if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
completed = true;
// we are able to say that every bookie sync'd its own journal
// for every ackknowledged entry before issuing the force() call
LOG.info("After force on ledger {} updating LastAddConfirmed to {} ",
ledgerId, currentNonDurableLastAddConfirmed);
lh.lastAddConfirmed = currentNonDurableLastAddConfirmed;
FutureUtils.complete(cb, null);
return;
}
} else {
LOG.warn("ForceLedger did not succeed: Ledger {} on {}", new Object[]{ledgerId, addr});
}

if (receivedResponseSet.isEmpty()) {
completed = true;
FutureUtils.completeExceptionally(cb, BKException.create(lastSeenError));
}
}
}
Expand Up @@ -1099,6 +1099,67 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx);
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> force() {
CompletableFuture<Void> result = new CompletableFuture<>();
ForceLedgerOp op = new ForceLedgerOp(this, result);
boolean wasClosed = false;
synchronized (this) {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
if (metadata.isClosed()) {
wasClosed = true;
}
}

if (wasClosed) {
// make sure the callback is triggered in main worker pool
try {
bk.getMainWorkerPool().submit(new SafeRunnable() {
@Override
public void safeRun() {
LOG.warn("Attempt to use a closed ledger: {}", ledgerId);
result.completeExceptionally(new BKException.BKLedgerClosedException());
}

@Override
public String toString() {
return String.format("force(lid=%d)", ledgerId);
}
});
} catch (RejectedExecutionException e) {
result.completeExceptionally(new BKException.BKInterruptedException());
}
return result;
}

if (pendingAddsSequenceHead == INVALID_ENTRY_ID) {
bk.getMainWorkerPool().submit(new SafeRunnable() {
@Override
public void safeRun() {
FutureUtils.complete(result, null);
}

@Override
public String toString() {
return String.format("force(lid=%d)", ledgerId);
}
});
return result;
}

try {
bk.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
result.completeExceptionally(new BKException.BKInterruptedException());
}
return result;
}

/**
* Make a recovery add entry request. Recovery adds can add to a ledger even
* if it has been fenced.
Expand Down Expand Up @@ -1685,6 +1746,10 @@ void errorOutPendingAdds(int rc, List<PendingAddOp> ops) {
}
}

void forceCompleted(long entryId) {

}

void sendAddSuccessCallbacks() {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
Expand Down
Expand Up @@ -60,6 +60,11 @@ public WriteSet getWriteSetForLongPoll(long entryId) {
return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, entryId);
}

@Override
public WriteSet getWriteSetForForceLedger() {
return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, 0);
}

@VisibleForTesting
static WriteSet writeSetFromValues(Integer... values) {
WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
Expand Down Expand Up @@ -252,6 +257,11 @@ public AckSet getAckSet() {
return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize);
}

@Override
public AckSet getAckSetForForceLedger() {
return AckSetImpl.create(ensembleSize, ensembleSize, ensembleSize);
}

private static class AckSetImpl implements AckSet {
private int writeQuorumSize;
private int ackQuorumSize;
Expand Down
@@ -0,0 +1,50 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client.api;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;

/**
* Provide the ability to enforce durability guarantees to the writer.
*
* @see WriteAdvHandle
* @see WriteHandle
*
* @since 4.8
*/
@Public
@Unstable
public interface ForceableHandle {

/**
* Enforce durability to the entries written by this handle.
* <p>This API is useful with {@link WriteFlag#DEFERRED_SYNC}, because with
* that flag writes are acknowledged by the bookie without waiting for a
* durable write
* </p>
*
* @return an handle to the result
*/
CompletableFuture<Void> force();

}
Expand Up @@ -39,7 +39,7 @@
*/
@Public
@Unstable
public interface WriteAdvHandle extends ReadHandle {
public interface WriteAdvHandle extends ReadHandle, ForceableHandle {

/**
* Add entry asynchronously to an open ledger.
Expand Down
Expand Up @@ -32,6 +32,8 @@ public enum WriteFlag {
/**
* Writes will be acknowledged after writing to the filesystem
* but not yet been persisted to disks.
*
* @see ForceableHandle#force()
*/
DEFERRED_SYNC(0x1 << 0);

Expand Down
Expand Up @@ -38,7 +38,7 @@
*/
@Public
@Unstable
public interface WriteHandle extends ReadHandle {
public interface WriteHandle extends ReadHandle, ForceableHandle {

/**
* Add entry asynchronously to an open ledger.
Expand Down

0 comments on commit 6034b74

Please sign in to comment.