Skip to content

Commit

Permalink
Enable ML binary format from broker configuration (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Mar 10, 2017
1 parent 1fa4508 commit 146738c
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 70 deletions.
Expand Up @@ -21,6 +21,8 @@ public class ManagedLedgerFactoryConfig {
private long maxCacheSize = 128 * MB;
private double cacheEvictionWatermark = 0.90;

private boolean useProtobufBinaryFormatInZK = false;

public long getMaxCacheSize() {
return maxCacheSize;
}
Expand Down Expand Up @@ -50,4 +52,12 @@ public ManagedLedgerFactoryConfig setCacheEvictionWatermark(double cacheEviction
return this;
}

public boolean useProtobufBinaryFormatInZK() {
return useProtobufBinaryFormatInZK;
}

public void setUseProtobufBinaryFormatInZK(boolean useProtobufBinaryFormatInZK) {
this.useProtobufBinaryFormatInZK = useProtobufBinaryFormatInZK;
}

}
Expand Up @@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat;

import java.util.ArrayDeque;
import java.util.Collections;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class ManagedCursorImpl implements ManagedCursor {
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final RateLimiter markDeleteLimiter;

private final ZNodeProtobufFormat protobufFormat;

class PendingMarkDeleteEntry {
final PositionImpl newPosition;
Expand Down Expand Up @@ -164,6 +167,9 @@ public interface VoidCallback {
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null);
this.lastLedgerSwitchTimestamp = System.currentTimeMillis();
this.protobufFormat = ledger.factory.getConfig().useProtobufBinaryFormatInZK() ? //
ZNodeProtobufFormat.Binary : //
ZNodeProtobufFormat.Text;

if (config.getThrottleMarkDelete() > 0.0) {
markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete());
Expand Down Expand Up @@ -1666,21 +1672,20 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
MetaStoreCallback<Void> callback) {
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo info = ManagedCursorInfo.newBuilder() //
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
.setCursorsLedgerId(cursorsLedgerId) //
.setMarkDeleteLedgerId(position.getLedgerId()) //
.setMarkDeleteEntryId(position.getEntryId()) //
.setMarkDeleteEntryId(position.getEntryId()); //

if (protobufFormat == ZNodeProtobufFormat.Binary) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}

// Do not add individually deleted messages in text format since it would break
// backward compatibility.
// TODO: Add this again, when binary format is enabled
// .addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()) //
.build();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, markDeletePosition);
}

ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info, cursorLedgerVersion,
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info.build(), cursorLedgerVersion,
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Version version) {
Expand All @@ -1705,7 +1710,8 @@ public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object

lock.readLock().lock();
try {
if (!individualDeletedMessages.isEmpty()) {
if (cursorLedger != null && protobufFormat == ZNodeProtobufFormat.Text
&& !individualDeletedMessages.isEmpty()) {
// To save individualDeletedMessages status, we don't want to dump the information in text format into
// the z-node. Until we switch to binary format, just flush the mark-delete + the
// individualDeletedMessages into the ledger.
Expand Down Expand Up @@ -1921,6 +1927,7 @@ void persistPosition(final LedgerHandle lh, final PositionImpl position, final V
position);
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
Expand Down
Expand Up @@ -170,7 +170,7 @@ enum PositionBound {

private final ScheduledExecutorService scheduledExecutor;
private final OrderedSafeExecutor executor;
private final ManagedLedgerFactoryImpl factory;
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;

/**
Expand Down
Expand Up @@ -42,7 +42,11 @@
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;

class MetaStoreImplZookeeper implements MetaStore {
public class MetaStoreImplZookeeper implements MetaStore {

public static enum ZNodeProtobufFormat {
Text, Binary
}

private static final Charset Encoding = Charsets.UTF_8;
private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
Expand All @@ -51,6 +55,7 @@ class MetaStoreImplZookeeper implements MetaStore {
private static final String prefix = prefixName + "/";

private final ZooKeeper zk;
private final ZNodeProtobufFormat protobufFormat;
private final OrderedSafeExecutor executor;

private static class ZKVersion implements Version {
Expand All @@ -62,7 +67,13 @@ private static class ZKVersion implements Version {
}

public MetaStoreImplZookeeper(ZooKeeper zk, OrderedSafeExecutor executor) throws Exception {
this(zk, ZNodeProtobufFormat.Text, executor);
}

public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat, OrderedSafeExecutor executor)
throws Exception {
this.zk = zk;
this.protobufFormat = protobufFormat;
this.executor = executor;

if (zk.exists(prefixName, false) == null) {
Expand Down Expand Up @@ -100,12 +111,10 @@ public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallbac
zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> {
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
TextFormat.merge(new String(readData, Encoding), builder);
ManagedLedgerInfo info = builder.build();
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, new ZKVersion(stat.getVersion()));
} catch (ParseException e) {
} catch (ParseException | InvalidProtocolBufferException e) {
callback.operationFailed(new MetaStoreException(e));
}
} else if (rc == Code.NONODE.intValue()) {
Expand All @@ -116,16 +125,14 @@ public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallbac
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
callback.operationComplete(info, new ZKVersion(0));
} else {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc1))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1))));
}
};

ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl,
CreateMode.PERSISTENT, createcb, null);
ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl, CreateMode.PERSISTENT,
createcb, null);
} else {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
}
})), null);
}
Expand All @@ -139,23 +146,28 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Ve
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, zkVersion.version, mlInfo);
}

zk.setData(prefix + ledgerName, mlInfo.toString().getBytes(Encoding), zkVersion.version, (rc, path, zkCtx, stat) -> executor.submit(safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion() : "null");
}
MetaStoreException status = null;
if (rc == Code.BADVERSION.intValue()) {
// Content has been modified on ZK since our last read
status = new BadVersionException(KeeperException.create(Code.get(rc)));
callback.operationFailed(status);
} else if (rc != Code.OK.intValue()) {
status = new MetaStoreException(KeeperException.create(Code.get(rc)));
callback.operationFailed(status);
} else {
callback.operationComplete(null, new ZKVersion(stat.getVersion()));
}
})), null);
byte[] serializedMlInfo = protobufFormat == ZNodeProtobufFormat.Text ? //
mlInfo.toString().getBytes(Encoding) : // Text format
mlInfo.toByteArray(); // Binary format

zk.setData(prefix + ledgerName, serializedMlInfo, zkVersion.version,
(rc, path, zkCtx, stat) -> executor.submit(safeRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion() : "null");
}
MetaStoreException status = null;
if (rc == Code.BADVERSION.intValue()) {
// Content has been modified on ZK since our last read
status = new BadVersionException(KeeperException.create(Code.get(rc)));
callback.operationFailed(status);
} else if (rc != Code.OK.intValue()) {
status = new MetaStoreException(KeeperException.create(Code.get(rc)));
callback.operationFailed(status);
} else {
callback.operationComplete(null, new ZKVersion(stat.getVersion()));
}
})), null);
}

@Override
Expand All @@ -168,8 +180,7 @@ public void getCursors(final String ledgerName, final MetaStoreCallback<List<Str
log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children);
}
if (rc != Code.OK.intValue()) {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
return;
}

Expand All @@ -191,14 +202,12 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName,

zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> {
if (rc != Code.OK.intValue()) {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
try {
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder();
TextFormat.merge(new String(data, Encoding), info);
callback.operationComplete(info.build(), new ZKVersion(stat.getVersion()));
} catch (ParseException e) {
ManagedCursorInfo info = parseManagedCursorInfo(data);
callback.operationComplete(info, new ZKVersion(stat.getVersion()));
} catch (ParseException | InvalidProtocolBufferException e) {
callback.operationFailed(new MetaStoreException(e));
}
}
Expand All @@ -216,38 +225,38 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());

String path = prefix + ledgerName + "/" + cursorName;
byte[] content = info.toString().getBytes(Encoding);
byte[] content = protobufFormat == ZNodeProtobufFormat.Text ? //
info.toString().getBytes(Encoding) : // Text format
info.toByteArray(); // Binary format

if (version == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
zk.create(path, content, Acl, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
if (rc != Code.OK.intValue()) {
log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName,
cursorName, info, Code.get(rc));
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Created consumer {} on meta-data store with {}", ledgerName, cursorName,
info);
}
callback.operationComplete(null, new ZKVersion(0));
}
})), null);
zk.create(path, content, Acl, CreateMode.PERSISTENT,
(rc, path1, ctx, name) -> executor.submit(safeRun(() -> {
if (rc != Code.OK.intValue()) {
log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName,
cursorName, info, Code.get(rc));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Created consumer {} on meta-data store with {}", ledgerName, cursorName,
info);
}
callback.operationComplete(null, new ZKVersion(0));
}
})), null);
} else {
ZKVersion zkVersion = (ZKVersion) version;
if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
}
zk.setData(path, content, zkVersion.version, (rc, path1, ctx, stat) -> executor.submit(safeRun(() -> {
if (rc == Code.BADVERSION.intValue()) {
callback.operationFailed(
new BadVersionException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc))));
} else if (rc != Code.OK.intValue()) {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
} else {
callback.operationComplete(null, new ZKVersion(stat.getVersion()));
}
Expand All @@ -266,8 +275,7 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName
if (rc == Code.OK.intValue()) {
callback.operationComplete(null, null);
} else {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
}
})), null);
}
Expand All @@ -282,8 +290,7 @@ public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callb
if (rc == Code.OK.intValue()) {
callback.operationComplete(null, null);
} else {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc))));
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
}
})), null);
}
Expand All @@ -297,5 +304,63 @@ public Iterable<String> getManagedLedgers() throws MetaStoreException {
}
}

private ManagedLedgerInfo parseManagedLedgerInfo(byte[] data)
throws ParseException, InvalidProtocolBufferException {
if (protobufFormat == ZNodeProtobufFormat.Text) {
// First try text format, then fallback to binary
try {
return parseManagedLedgerInfoFromText(data);
} catch (ParseException e) {
return parseManagedLedgerInfoFromBinary(data);
}
} else {
// First try binary format, then fallback to text
try {
return parseManagedLedgerInfoFromBinary(data);
} catch (InvalidProtocolBufferException e) {
return parseManagedLedgerInfoFromText(data);
}
}
}

private ManagedLedgerInfo parseManagedLedgerInfoFromText(byte[] data) throws ParseException {
ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
TextFormat.merge(new String(data, Encoding), builder);
return builder.build();
}

private ManagedLedgerInfo parseManagedLedgerInfoFromBinary(byte[] data) throws InvalidProtocolBufferException {
return ManagedLedgerInfo.newBuilder().mergeFrom(data).build();
}

private ManagedCursorInfo parseManagedCursorInfo(byte[] data)
throws ParseException, InvalidProtocolBufferException {
if (protobufFormat == ZNodeProtobufFormat.Text) {
// First try text format, then fallback to binary
try {
return parseManagedCursorInfoFromText(data);
} catch (ParseException e) {
return parseManagedCursorInfoFromBinary(data);
}
} else {
// First try binary format, then fallback to text
try {
return parseManagedCursorInfoFromBinary(data);
} catch (InvalidProtocolBufferException e) {
return parseManagedCursorInfoFromText(data);
}
}
}

private ManagedCursorInfo parseManagedCursorInfoFromText(byte[] data) throws ParseException {
ManagedCursorInfo.Builder builder = ManagedCursorInfo.newBuilder();
TextFormat.merge(new String(data, Encoding), builder);
return builder.build();
}

private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws InvalidProtocolBufferException {
return ManagedCursorInfo.newBuilder().mergeFrom(data).build();
}

private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
}

0 comments on commit 146738c

Please sign in to comment.