Skip to content

Commit

Permalink
Remove Managed Ledger metadata text format (#394)
Browse files Browse the repository at this point in the history
* Remove Managed Ledger metadata text format

* MockZookeeper should store values as byte[] and not convert to String
  • Loading branch information
merlimat committed Aug 29, 2017
1 parent 1a6ffbf commit 3751baf
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 298 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public class ManagedLedgerFactoryConfig {
private long maxCacheSize = 128 * MB; private long maxCacheSize = 128 * MB;
private double cacheEvictionWatermark = 0.90; private double cacheEvictionWatermark = 0.90;


private boolean useProtobufBinaryFormatInZK = false;

public long getMaxCacheSize() { public long getMaxCacheSize() {
return maxCacheSize; return maxCacheSize;
} }
Expand Down Expand Up @@ -55,12 +53,4 @@ public ManagedLedgerFactoryConfig setCacheEvictionWatermark(double cacheEviction
return this; return this;
} }


public boolean useProtobufBinaryFormatInZK() {
return useProtobufBinaryFormatInZK;
}

public void setUseProtobufBinaryFormatInZK(boolean useProtobufBinaryFormatInZK) {
this.useProtobufBinaryFormatInZK = useProtobufBinaryFormatInZK;
}

} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
Expand Down Expand Up @@ -91,15 +90,18 @@ public class ManagedCursorImpl implements ManagedCursor {


protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER = protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
@SuppressWarnings("unused")
private volatile OpReadEntry waitingReadOp = null; private volatile OpReadEntry waitingReadOp = null;


private static final int FALSE = 0; private static final int FALSE = 0;
private static final int TRUE = 1; private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER = AtomicIntegerFieldUpdater private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ManagedCursorImpl.class, "resetCursorInProgress"); .newUpdater(ManagedCursorImpl.class, "resetCursorInProgress");
@SuppressWarnings("unused")
private volatile int resetCursorInProgress = FALSE; private volatile int resetCursorInProgress = FALSE;
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_READ_OPS_UPDATER = private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_READ_OPS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingReadOps"); AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingReadOps");
@SuppressWarnings("unused")
private volatile int pendingReadOps = 0; private volatile int pendingReadOps = 0;


// This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look // This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look
Expand All @@ -117,8 +119,6 @@ public class ManagedCursorImpl implements ManagedCursor {


private final RateLimiter markDeleteLimiter; private final RateLimiter markDeleteLimiter;


private final ZNodeProtobufFormat protobufFormat;

class PendingMarkDeleteEntry { class PendingMarkDeleteEntry {
final PositionImpl newPosition; final PositionImpl newPosition;
final MarkDeleteCallback callback; final MarkDeleteCallback callback;
Expand All @@ -139,6 +139,7 @@ public PendingMarkDeleteEntry(PositionImpl newPosition, MarkDeleteCallback callb
private final ArrayDeque<PendingMarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>(); private final ArrayDeque<PendingMarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque<>();
private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
private volatile int pendingMarkDeletedSubmittedCount = 0; private volatile int pendingMarkDeletedSubmittedCount = 0;
private long lastLedgerSwitchTimestamp; private long lastLedgerSwitchTimestamp;


Expand Down Expand Up @@ -171,9 +172,6 @@ public interface VoidCallback {
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null); WAITING_READ_OP_UPDATER.set(this, null);
this.lastLedgerSwitchTimestamp = System.currentTimeMillis(); this.lastLedgerSwitchTimestamp = System.currentTimeMillis();
this.protobufFormat = ledger.factory.getConfig().useProtobufBinaryFormatInZK() ? //
ZNodeProtobufFormat.Binary : //
ZNodeProtobufFormat.Text;


if (config.getThrottleMarkDelete() > 0.0) { if (config.getThrottleMarkDelete() > 0.0) {
markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete()); markDeleteLimiter = RateLimiter.create(config.getThrottleMarkDelete());
Expand Down Expand Up @@ -1234,7 +1232,7 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca
if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) { if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] cursor reset in progress - ignoring mark delete on position [{}] for cursor [{}]", log.debug("[{}] cursor reset in progress - ignoring mark delete on position [{}] for cursor [{}]",
ledger.getName(), (PositionImpl) position, name); ledger.getName(), position, name);
} }
callback.markDeleteFailed( callback.markDeleteFailed(
new ManagedLedgerException("Reset cursor in progress - unable to mark delete position " new ManagedLedgerException("Reset cursor in progress - unable to mark delete position "
Expand Down Expand Up @@ -1674,9 +1672,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
.setMarkDeleteLedgerId(position.getLedgerId()) // .setMarkDeleteLedgerId(position.getLedgerId()) //
.setMarkDeleteEntryId(position.getEntryId()); // .setMarkDeleteEntryId(position.getEntryId()); //


if (protobufFormat == ZNodeProtobufFormat.Binary) { info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
}


if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, markDeletePosition); log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, markDeletePosition);
Expand Down Expand Up @@ -1705,36 +1701,6 @@ public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object
return; return;
} }


lock.readLock().lock();
try {
if (cursorLedger != null && protobufFormat == ZNodeProtobufFormat.Text
&& !individualDeletedMessages.isEmpty()) {
// To save individualDeletedMessages status, we don't want to dump the information in text format into
// the z-node. Until we switch to binary format, just flush the mark-delete + the
// individualDeletedMessages into the ledger.
persistPosition(cursorLedger, markDeletePosition, new VoidCallback() {
@Override
public void operationComplete() {
cursorLedger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
callback.closeComplete(ctx);
}
}, ctx);
}

@Override
public void operationFailed(ManagedLedgerException exception) {
callback.closeFailed(exception, ctx);
}
});

return;
}
} finally {
lock.readLock().unlock();
}

persistPositionMetaStore(-1, markDeletePosition, new MetaStoreCallback<Void>() { persistPositionMetaStore(-1, markDeletePosition, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Stat stat) { public void operationComplete(Void result, Stat stat) {
Expand Down Expand Up @@ -2182,7 +2148,7 @@ public String getIndividuallyDeletedMessages() {
/** /**
* Checks given position is part of deleted-range and returns next position of upper-end as all the messages are * Checks given position is part of deleted-range and returns next position of upper-end as all the messages are
* deleted up to that point * deleted up to that point
* *
* @param position * @param position
* @return next available position * @return next available position
*/ */
Expand All @@ -2194,7 +2160,7 @@ public PositionImpl getNextAvailablePosition(PositionImpl position) {
} }
return position.getNext(); return position.getNext();
} }

public boolean isIndividuallyDeletedEntriesEmpty() { public boolean isIndividuallyDeletedEntriesEmpty() {
lock.readLock().lock(); lock.readLock().lock();
try { try {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -47,18 +47,13 @@


public class MetaStoreImplZookeeper implements MetaStore { public class MetaStoreImplZookeeper implements MetaStore {


public static enum ZNodeProtobufFormat {
Text, Binary
}

private static final Charset Encoding = Charsets.UTF_8; private static final Charset Encoding = Charsets.UTF_8;
private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;


private static final String prefixName = "/managed-ledgers"; private static final String prefixName = "/managed-ledgers";
private static final String prefix = prefixName + "/"; private static final String prefix = prefixName + "/";


private final ZooKeeper zk; private final ZooKeeper zk;
private final ZNodeProtobufFormat protobufFormat;
private final OrderedSafeExecutor executor; private final OrderedSafeExecutor executor;


private static class ZKStat implements Stat { private static class ZKStat implements Stat {
Expand Down Expand Up @@ -94,14 +89,9 @@ public long getModificationTimestamp() {
} }
} }


public MetaStoreImplZookeeper(ZooKeeper zk, OrderedSafeExecutor executor) throws Exception { public MetaStoreImplZookeeper(ZooKeeper zk, OrderedSafeExecutor executor)
this(zk, ZNodeProtobufFormat.Text, executor);
}

public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat, OrderedSafeExecutor executor)
throws Exception { throws Exception {
this.zk = zk; this.zk = zk;
this.protobufFormat = protobufFormat;
this.executor = executor; this.executor = executor;


if (zk.exists(prefixName, false) == null) { if (zk.exists(prefixName, false) == null) {
Expand Down Expand Up @@ -177,9 +167,7 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, zkStat.version, mlInfo); log.debug("[{}] Updating metadata version={} with content={}", ledgerName, zkStat.version, mlInfo);
} }


byte[] serializedMlInfo = protobufFormat == ZNodeProtobufFormat.Text ? // byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
mlInfo.toString().getBytes(Encoding) : // Text format
mlInfo.toByteArray(); // Binary format


zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(), zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
(rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> { (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
Expand Down Expand Up @@ -255,9 +243,7 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId()); info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());


String path = prefix + ledgerName + "/" + cursorName; String path = prefix + ledgerName + "/" + cursorName;
byte[] content = protobufFormat == ZNodeProtobufFormat.Text ? // byte[] content = info.toByteArray(); // Binary format
info.toString().getBytes(Encoding) : // Text format
info.toByteArray(); // Binary format


if (stat == null) { if (stat == null) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -336,60 +322,29 @@ public Iterable<String> getManagedLedgers() throws MetaStoreException {


private ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) private ManagedLedgerInfo parseManagedLedgerInfo(byte[] data)
throws ParseException, InvalidProtocolBufferException { throws ParseException, InvalidProtocolBufferException {
if (protobufFormat == ZNodeProtobufFormat.Text) { // First try binary format, then fallback to text
// First try text format, then fallback to binary try {
try { return ManagedLedgerInfo.parseFrom(data);
return parseManagedLedgerInfoFromText(data); } catch (InvalidProtocolBufferException e) {
} catch (ParseException e) { // Fallback to parsing protobuf text format
return parseManagedLedgerInfoFromBinary(data); ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
} TextFormat.merge(new String(data, Encoding), builder);
} else { return builder.build();
// First try binary format, then fallback to text
try {
return parseManagedLedgerInfoFromBinary(data);
} catch (InvalidProtocolBufferException e) {
return parseManagedLedgerInfoFromText(data);
}
} }
} }


private ManagedLedgerInfo parseManagedLedgerInfoFromText(byte[] data) throws ParseException {
ManagedLedgerInfo.Builder builder = ManagedLedgerInfo.newBuilder();
TextFormat.merge(new String(data, Encoding), builder);
return builder.build();
}

private ManagedLedgerInfo parseManagedLedgerInfoFromBinary(byte[] data) throws InvalidProtocolBufferException {
return ManagedLedgerInfo.newBuilder().mergeFrom(data).build();
}

private ManagedCursorInfo parseManagedCursorInfo(byte[] data) private ManagedCursorInfo parseManagedCursorInfo(byte[] data)
throws ParseException, InvalidProtocolBufferException { throws ParseException, InvalidProtocolBufferException {
if (protobufFormat == ZNodeProtobufFormat.Text) { // First try binary format, then fallback to text
// First try text format, then fallback to binary try {
try { return ManagedCursorInfo.parseFrom(data);
return parseManagedCursorInfoFromText(data); } catch (InvalidProtocolBufferException e) {
} catch (ParseException e) { // Fallback to parsing protobuf text format
return parseManagedCursorInfoFromBinary(data); ManagedCursorInfo.Builder builder = ManagedCursorInfo.newBuilder();
} TextFormat.merge(new String(data, Encoding), builder);
} else { return builder.build();
// First try binary format, then fallback to text
try {
return parseManagedCursorInfoFromBinary(data);
} catch (InvalidProtocolBufferException e) {
return parseManagedCursorInfoFromText(data);
}
} }
}

private ManagedCursorInfo parseManagedCursorInfoFromText(byte[] data) throws ParseException {
ManagedCursorInfo.Builder builder = ManagedCursorInfo.newBuilder();
TextFormat.merge(new String(data, Encoding), builder);
return builder.build();
}


private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws InvalidProtocolBufferException {
return ManagedCursorInfo.newBuilder().mergeFrom(data).build();
} }


private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class); private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class);
Expand Down
Loading

0 comments on commit 3751baf

Please sign in to comment.