Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ void enableLedgerReplication()
boolean isLedgerReplicationEnabled()
throws ReplicationException.UnavailableException;

/**
* Receive notification asynchronously when the ledger replication process
* is changed.
*
* @param cb
* - callback implementation to receive the notification
*/
void notifyLedgerReplicationStatusChanged(GenericCallback<Void> cb)
throws ReplicationException.UnavailableException;

/**
* Receive notification asynchronously when the ledger replication process
* is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ public boolean isLedgerReplicationEnabled() {
return false;
}
@Override
public void notifyLedgerReplicationStatusChanged(GenericCallback<Void> cb) {}
@Override
public void notifyLedgerReplicationEnabled(GenericCallback<Void> cb) {}
@Override
public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,37 @@ public boolean isLedgerReplicationEnabled()
}
}

@Override
public void notifyLedgerReplicationStatusChanged(final GenericCallback<Void> cb)
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("notifyLedgerReplicationStatusChanged()");
}
Watcher w = e -> {
if (e.getType() == Watcher.Event.EventType.NodeDeleted) {
LOG.info("LedgerReplication is enabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is deleted");
cb.operationComplete(0, null);
} else if (e.getType() == Watcher.Event.EventType.NodeCreated) {
LOG.info("LedgerReplication is disabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is created");
cb.operationComplete(1, null);
}
};
try {
zkc.addWatch(basePath + '/'
+ BookKeeperConstants.DISABLE_NODE, w, AddWatchMode.PERSISTENT);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
"Interrupted while contacting zookeeper", ie);
}
}

@Override
public void notifyLedgerReplicationEnabled(final GenericCallback<Void> cb)
throws ReplicationException.UnavailableException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -166,6 +167,7 @@ public class Auditor implements AutoCloseable {
private final int zkOpTimeoutMs;
private final Semaphore openLedgerNoRecoverySemaphore;
private final int openLedgerNoRecoverySemaphoreWaitTimeoutMSec;
private AtomicBoolean isLedgerReplicationEnabled;

private final StatsLogger statsLogger;
@StatsDoc(
Expand Down Expand Up @@ -714,6 +716,10 @@ public void start() {
try {
this.ledgerUnderreplicationManager.notifyUnderReplicationLedgerChanged(
new UnderReplicatedLedgersChangedCb());
this.isLedgerReplicationEnabled =
new AtomicBoolean(this.ledgerUnderreplicationManager.isLedgerReplicationEnabled());
this.ledgerUnderreplicationManager
.notifyLedgerReplicationStatusChanged(new ReplicationEnabledChangedCb());
} catch (UnavailableException ue) {
LOG.error("Exception while registering for under-replicated ledgers change notification, so exiting",
ue);
Expand Down Expand Up @@ -781,7 +787,7 @@ private void scheduleCheckAllLedgersTask(){
Stopwatch stopwatch = Stopwatch.createStarted();
boolean checkSuccess = false;
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
if (!isLedgerReplicationEnabled.get()) {
LOG.info("Ledger replication disabled, skipping checkAllLedgers");
return;
}
Expand All @@ -799,11 +805,6 @@ private void scheduleCheckAllLedgersTask(){
LOG.error("Exception running periodic check", bke);
} catch (IOException ioe) {
LOG.error("I/O exception running periodic check", ioe);
} catch (ReplicationException.NonRecoverableReplicationException nre) {
LOG.error("Non Recoverable Exception while reading from ZK", nre);
submitShutdownTask();
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
} finally {
if (!checkSuccess) {
long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -856,7 +857,7 @@ private void schedulePlacementPolicyCheckTask(){

executor.scheduleAtFixedRate(() -> {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
if (!isLedgerReplicationEnabled.get()) {
LOG.info("Ledger replication disabled, skipping placementPolicyCheck");
return;
}
Expand Down Expand Up @@ -936,8 +937,6 @@ private void schedulePlacementPolicyCheckTask(){
numOfLedgersFoundInPlacementPolicyCheckValue,
numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue,
numOfURLedgersElapsedRecoveryGracePeriodValue, e);
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
}
}, initialDelay, interval, TimeUnit.SECONDS);
} else {
Expand Down Expand Up @@ -986,7 +985,7 @@ private void scheduleReplicasCheckTask() {

executor.scheduleAtFixedRate(() -> {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
if (!isLedgerReplicationEnabled.get()) {
LOG.info("Ledger replication disabled, skipping replicasCheck task.");
return;
}
Expand Down Expand Up @@ -1045,8 +1044,6 @@ private void scheduleReplicasCheckTask() {
numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue
.set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
}
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
}
}, initialDelay, interval, TimeUnit.SECONDS);
}
Expand All @@ -1061,6 +1058,13 @@ public void operationComplete(int rc, Void result) {
}
}

private class ReplicationEnabledChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
isLedgerReplicationEnabled.set(rc == 0);
}
}

private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
@Override
public void operationComplete(int rc, Void result) {
Expand All @@ -1077,14 +1081,14 @@ public void operationComplete(int rc, Void result) {
}
}

private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
ReplicationEnableCb cb = new ReplicationEnableCb();
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
private void waitIfLedgerReplicationDisabled() throws UnavailableException, InterruptedException {
while (!isLedgerReplicationEnabled.get()) {
LOG.info("LedgerReplication is disabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is created, so waiting untill it is enabled");
ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
cb.await();
Thread.sleep(1000);
if (isLedgerReplicationEnabled.get()) {
break;
}
}
}

Expand Down Expand Up @@ -1147,16 +1151,10 @@ private void auditBookies()
Stopwatch stopwatch = Stopwatch.createStarted();
// put exit cases here
Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
// has been disabled while we were generating the index
// discard this run, and schedule a new one
executor.submit(bookieCheck);
return;
}
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
if (!isLedgerReplicationEnabled.get()) {
// has been disabled while we were generating the index
// discard this run, and schedule a new one
executor.submit(bookieCheck);
return;
}

Expand Down Expand Up @@ -1308,18 +1306,8 @@ void checkAllLedgers() throws BKException, IOException, InterruptedException {
final CompletableFuture<Void> processFuture = new CompletableFuture<>();

Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
FutureUtils.complete(processFuture, null);
return;
}
} catch (ReplicationException.NonRecoverableReplicationException nre) {
LOG.error("Non Recoverable Exception while reading from ZK", nre);
submitShutdownTask();
return;
} catch (UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
if (!isLedgerReplicationEnabled.get()) {
LOG.info("Ledger rereplication has been disabled, aborting periodic check");
FutureUtils.complete(processFuture, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ public void testGetLedgerFromZookeeperThrottled() throws Exception {

Auditor auditor1 = new Auditor(BookieImpl.getBookieId(configuration).toString(),
configuration, NullStatsLogger.INSTANCE);
auditor1.start();
Auditor auditor = Mockito.spy(auditor1);

BookKeeper bookKeeper = Mockito.spy(auditor.getBookKeeper(configuration));
Expand Down