Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager #1619

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client;

import java.util.function.Function;

/**
* Class the enumerates all the possible error conditions.
*
Expand All @@ -28,6 +30,19 @@
@SuppressWarnings("serial")
public abstract class BKException extends org.apache.bookkeeper.client.api.BKException {

public static final Function<Throwable, BKException> HANDLER = cause -> {
if (cause == null) {
return null;
}
if (cause instanceof BKException) {
return (BKException) cause;
} else {
BKException ex = new BKUnexpectedConditionException();
ex.initCause(cause);
return ex;
}
};

BKException(int code) {
super(code);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,41 @@
*/
package org.apache.bookkeeper.meta;

import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException;

/**
* Interface for marking ledgers which need to be rereplicated.
*/
public interface LedgerUnderreplicationManager extends AutoCloseable {

/**
* Mark a ledger as underreplicated. The replication should
* then check which fragments are underreplicated and rereplicate them
*/
void markLedgerUnderreplicated(long ledgerId, String missingReplica)
throws ReplicationException.UnavailableException;
default void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException {
FutureUtils.result(
markLedgerUnderreplicatedAsync(
ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER);
}

/**
* Mark a ledger as underreplicated with missing bookies. The replication should then
* check which fragements are underreplicated and rereplicate them.
*
* @param ledgerId ledger id
* @param missingReplicas missing replicas
* @return a future presents the mark result.
*/
CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dont we need to have async version of "void markLedgerReplicated(long ledgerId)"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't. markLedgerReplicated is used by replication worker, which replication worker is single threaded and using sync methods. so markLedgerReplicated is fine at the context.
only markLedgerUnderreplicated is the issue.

ideally, LedgerUnderreplicationManager should be splitted into at least 2 interfaces, one for Auditor, the other one for ReplicationWorker. That would make things much clearer.

Copy link
Contributor

@reddycharan reddycharan Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there any need of making 'missingReplicas' collection, instead of just String missingReplica

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this for https://github.com/apache/bookkeeper/pull/1619/files#diff-7525f06ad3a1ad0a00a462df4deb4698L581 . then it doesn't need to do multiple zk calls for creating an UL.


/**
* Mark a ledger as fully replicated. If the ledger is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@

import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.DNS;
Expand All @@ -54,6 +57,7 @@
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
Expand Down Expand Up @@ -254,58 +258,95 @@ public UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId)
}

@Override
public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
throws ReplicationException.UnavailableException {
public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
if (LOG.isDebugEnabled()) {
LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplica);
}
try {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
String znode = getUrLedgerZnode(ledgerId);
while (true) {
UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas);
}
final List<ACL> zkAcls = ZkUtils.getACLs(conf);
final String znode = getUrLedgerZnode(ledgerId);
final CompletableFuture<Void> createFuture = new CompletableFuture<>();
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why does caller has to create 'createFuture' (CompletableFuture) and pass it, why cann't method returns the CompletableFuture

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this allows the method tryMarkLedgerUnderreplicatedAsync to be attempted with same CompleteableFuture.

return createFuture;
}

private void tryMarkLedgerUnderreplicatedAsync(final String znode,
final Collection<String> missingReplicas,
final List<ACL> zkAcls,
final CompletableFuture<Void> finalFuture) {
final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
builder.setCtime(System.currentTimeMillis());
}
missingReplicas.forEach(builder::addReplica);
final byte[] urLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
ZkUtils.asyncCreateFullPathOptimistic(
zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (Code.OK.intValue() == rc) {
FutureUtils.complete(finalFuture, null);
} else if (Code.NODEEXISTS.intValue() == rc) {
// we need to handle the case where the ledger has been marked as underreplicated
handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));
}
}, null);
}


private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,
final Collection<String> missingReplicas,
final List<ACL> zkAcls,
final CompletableFuture<Void> finalFuture) {
// get the existing underreplicated ledger data
zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
if (Code.OK.intValue() == getRc) {
// deserialize existing underreplicated ledger data
final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
try {
if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
builder.setCtime(System.currentTimeMillis());
}
builder.addReplica(missingReplica);
ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
.printToString(builder.build()).getBytes(UTF_8),
zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
Stat s = zkc.exists(znode, false);
if (s == null) {
TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);
} catch (ParseException e) {
// corrupted metadata in zookeeper
FutureUtils.completeExceptionally(finalFuture,
new ReplicationException.UnavailableException(
"Invalid underreplicated ledger data for ledger " + znode, e));
return;
}
UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
boolean replicaAdded = false;
for (String missingReplica : missingReplicas) {
if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {
continue;
}
try {
byte[] bytes = zkc.getData(znode, false, s);
builder.clear();
TextFormat.merge(new String(bytes, UTF_8), builder);
UnderreplicatedLedgerFormat data = builder.build();
if (data.getReplicaList().contains(missingReplica)) {
return; // nothing to add
}
} else {
builder.addReplica(missingReplica);
zkc.setData(znode,
TextFormat.printToString(builder.build()).getBytes(UTF_8),
s.getVersion());
} catch (KeeperException.NoNodeException nne) {
continue;
} catch (KeeperException.BadVersionException bve) {
continue;
} catch (TextFormat.ParseException pe) {
throw new ReplicationException.UnavailableException(
"Invalid data found", pe);
replicaAdded = true;
}
}
break;
if (!replicaAdded) { // no new missing replica is added
FutureUtils.complete(finalFuture, null);
return;
}
if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
builder.setCtime(System.currentTimeMillis());
}
final byte[] newUrLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
if (Code.OK.intValue() == setRc) {
FutureUtils.complete(finalFuture, null);
} else if (Code.NONODE.intValue() == setRc) {
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
} else if (Code.BADVERSION.intValue() == setRc) {
handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));
}
}, null);
} else if (Code.NONODE.intValue() == getRc) {
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));
}
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
}
}, null);
}

@Override
Expand Down
Loading