Skip to content

Commit

Permalink
Provide async version of markLedgerUnderreplicated for LedgerUnderrep…
Browse files Browse the repository at this point in the history
…licationManager

Descriptions of the changes in this PR:

 ### Motivation

Auditor has multiple places calling sync methods in async callbacks.
This raises the possibility hitting deadlock. Issue #1578 is one of the examples.

After looking into the `LedgerUnderreplicationManager`, `markLedgerUnderreplicated`
is the only interface that will be called in async callbacks. This change is
to provide an async version of `markLedgerUnderreplicated`.

 ### Changes

- add `markLedgerUnderreplicatedAsync` interface in `LedgerUnderreplicationManager`.
- implement the logic of `markLedgerUnderreplicated` using async callbacks
- use `markLedgerUnderreplicatedAsync` in the Auditor

Related Issues: #1578
Master Issue: #1617

Author: Sijie Guo <sijie@apache.org>

Reviewers: Charan Reddy Guttapalem <reddycharan18@gmail.com>, Enrico Olivelli <eolivelli@gmail.com>, Matteo Merli <mmerli@apache.org>

This closes #1619 from sijie/async_sync_autorecovery
  • Loading branch information
sijie committed Aug 27, 2018
1 parent 6698ab8 commit 3e01125
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client;

import java.util.function.Function;

/**
* Class the enumerates all the possible error conditions.
*
Expand All @@ -28,6 +30,19 @@
@SuppressWarnings("serial")
public abstract class BKException extends org.apache.bookkeeper.client.api.BKException {

public static final Function<Throwable, BKException> HANDLER = cause -> {
if (cause == null) {
return null;
}
if (cause instanceof BKException) {
return (BKException) cause;
} else {
BKException ex = new BKUnexpectedConditionException();
ex.initCause(cause);
return ex;
}
};

BKException(int code) {
super(code);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,42 @@
*/
package org.apache.bookkeeper.meta;

import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException;

/**
* Interface for marking ledgers which need to be rereplicated.
*/
public interface LedgerUnderreplicationManager extends AutoCloseable {

/**
* Mark a ledger as underreplicated. The replication should
* then check which fragments are underreplicated and rereplicate them
*/
void markLedgerUnderreplicated(long ledgerId, String missingReplica)
throws ReplicationException.UnavailableException;
default void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException {
FutureUtils.result(
markLedgerUnderreplicatedAsync(
ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER);
}

/**
* Mark a ledger as underreplicated with missing bookies. The replication should then
* check which fragements are underreplicated and rereplicate them.
*
* @param ledgerId ledger id
* @param missingReplicas missing replicas
* @return a future presents the mark result.
*/
CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas);

/**
* Mark a ledger as fully replicated. If the ledger is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,25 @@
import com.google.common.base.Joiner;
import com.google.protobuf.TextFormat;

import com.google.protobuf.TextFormat.ParseException;
import java.net.UnknownHostException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.DNS;
Expand All @@ -54,6 +58,7 @@
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
Expand Down Expand Up @@ -254,55 +259,89 @@ public UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId)
}

@Override
public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
throws ReplicationException.UnavailableException {
public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
if (LOG.isDebugEnabled()) {
LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplica);
}
try {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
String znode = getUrLedgerZnode(ledgerId);
while (true) {
UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas);
}
final List<ACL> zkAcls = ZkUtils.getACLs(conf);
final String znode = getUrLedgerZnode(ledgerId);
final CompletableFuture<Void> createFuture = new CompletableFuture<>();
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
return createFuture;
}

private void tryMarkLedgerUnderreplicatedAsync(final String znode,
final Collection<String> missingReplicas,
final List<ACL> zkAcls,
final CompletableFuture<Void> finalFuture) {
final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
missingReplicas.forEach(builder::addReplica);
final byte[] urLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
ZkUtils.asyncCreateFullPathOptimistic(
zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (Code.OK.intValue() == rc) {
FutureUtils.complete(finalFuture, null);
} else if (Code.NODEEXISTS.intValue() == rc) {
// we need to handle the case where the ledger has been marked as underreplicated
handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));
}
}, null);
}


private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,
final Collection<String> missingReplicas,
final List<ACL> zkAcls,
final CompletableFuture<Void> finalFuture) {
// get the existing underreplicated ledger data
zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
if (Code.OK.intValue() == getRc) {
// deserialize existing underreplicated ledger data
final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
try {
builder.addReplica(missingReplica);
ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
.printToString(builder.build()).getBytes(UTF_8),
zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
Stat s = zkc.exists(znode, false);
if (s == null) {
TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);
} catch (ParseException e) {
// corrupted metadata in zookeeper
FutureUtils.completeExceptionally(finalFuture,
new ReplicationException.UnavailableException(
"Invalid underreplicated ledger data for ledger " + znode, e));
return;
}
UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
boolean replicaAdded = false;
for (String missingReplica : missingReplicas) {
if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {
continue;
}
try {
byte[] bytes = zkc.getData(znode, false, s);
builder.clear();
TextFormat.merge(new String(bytes, UTF_8), builder);
UnderreplicatedLedgerFormat data = builder.build();
if (data.getReplicaList().contains(missingReplica)) {
return; // nothing to add
}
} else {
builder.addReplica(missingReplica);
zkc.setData(znode,
TextFormat.printToString(builder.build()).getBytes(UTF_8),
s.getVersion());
} catch (KeeperException.NoNodeException nne) {
continue;
} catch (KeeperException.BadVersionException bve) {
continue;
} catch (TextFormat.ParseException pe) {
throw new ReplicationException.UnavailableException(
"Invalid data found", pe);
replicaAdded = true;
}
}
break;
if (!replicaAdded) { // no new missing replica is added
FutureUtils.complete(finalFuture, null);
return;
}
final byte[] newUrLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
if (Code.OK.intValue() == setRc) {
FutureUtils.complete(finalFuture, null);
} else if (Code.NONODE.intValue() == setRc) {
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
} else if (Code.BADVERSION.intValue() == setRc) {
handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));
}
}, null);
} else if (Code.NONODE.intValue() == getRc) {
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));
}
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
}
}, null);
}

@Override
Expand Down
Loading

0 comments on commit 3e01125

Please sign in to comment.