Skip to content

Commit

Permalink
NonDurable cursor for managed ledger
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Apr 21, 2017
1 parent 136fb7b commit 98e687e
Show file tree
Hide file tree
Showing 8 changed files with 758 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,4 +417,9 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
* @return
*/
public boolean isActive();

/**
* Tells whether the cursor is durable or just kept in memory
*/
public boolean isDurable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@ public interface ManagedLedger {
*/
public ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException;

/**
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
* exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
* were deleted. Also it does not prevent data from being deleted.
* <p>
* The cursor is anonymous and can be positioned on an arbitrary position.
* <p>
* This method is not-blocking.
*
* @param startCursorPosition
* the position where the cursor should be initialized, or null to start from the current latest entry.
* When starting on a particular cursor position, the first entry to be returned will be the entry next
* to the specified position
* @return the new NonDurableCursor
*/
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerImpl ledger;
private final String name;

private volatile PositionImpl markDeletePosition;
private volatile PositionImpl readPosition;
protected volatile PositionImpl markDeletePosition;
protected volatile PositionImpl readPosition;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
Expand All @@ -101,7 +101,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look
// at the list of ledgers in the ml. They are initialized to (-backlog) at opening, and will be incremented each
// time a message is read or deleted.
private volatile long messagesConsumedCounter;
protected volatile long messagesConsumedCounter;

// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
Expand Down Expand Up @@ -303,7 +303,6 @@ private void recoveredCursor(PositionImpl position) {
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));

markDeletePosition = position;
readPosition = ledger.getNextValidPosition(position);
STATE_UPDATER.set(this, State.NoLedger);
Expand Down Expand Up @@ -1151,7 +1150,7 @@ boolean hasMoreEntries(PositionImpl position) {

void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.first);
markDeletePosition = PositionImpl.get(lastPositionCounter.first);
markDeletePosition = lastPositionCounter.first;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
Expand Down Expand Up @@ -1257,7 +1256,7 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca
internalAsyncMarkDelete(newPosition, callback, ctx);
}

private void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkDeleteCallback callback,
protected void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkDeleteCallback callback,
final Object ctx) {
ledger.mbean.addMarkDeleteOp();

Expand Down Expand Up @@ -1574,6 +1573,11 @@ public String getName() {
return name;
}

@Override
public boolean isDurable() {
return true;
}

@Override
public Position getReadPosition() {
return PositionImpl.get(readPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,14 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
}
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
checkManagedLedgerIsOpen();
checkFenced();

return new NonDurableCursorImpl(bookKeeper, config, this, null, (PositionImpl) startCursorPosition);
}

@Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
Expand Down Expand Up @@ -1393,6 +1401,13 @@ void internalTrimConsumedLedgers() {
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;

if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- expired: {} -- over-quota: {} -- current-ledger: {}",
name, ls.getLedgerId(), (System.currentTimeMillis() - ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId() || (!expired && !overRetentionQuota)) {
if (log.isDebugEnabled()) {
if (!expired) {
Expand Down Expand Up @@ -1868,7 +1883,7 @@ public void deactivateCursor(ManagedCursor cursor) {
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
return cursor.isDurable() && activeCursors.get(cursor.getName()) != null;
}

private boolean currentLedgerIsFull() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;
import com.google.common.collect.Range;

public class NonDurableCursorImpl extends ManagedCursorImpl {

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition) {
super(bookkeeper, config, ledger, cursorName);

if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
} else if (startCursorPosition.equals(PositionImpl.earliest)) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
// Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
// read-position
recoverCursor(startCursorPosition);
}

log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(),
readPosition, markDeletePosition);
}

private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
this.readPosition = ledger.getNextValidPosition(mdPosition);
markDeletePosition = mdPosition;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
long initialBacklog = readPosition.compareTo(lastEntryAndCounter.first) < 0
? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.first)) : 0;
messagesConsumedCounter = lastEntryAndCounter.second - initialBacklog;
}

@Override
public boolean isDurable() {
return false;
}

/// Overridden methods from ManagedCursorImpl. Void implementation to skip cursor persistence

@Override
void recover(final VoidCallback callback) {
/// No-Op
}

@Override
protected void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkDeleteCallback callback,
final Object ctx) {
// Bypass persistence of mark-delete position and individually deleted messages info
callback.markDeleteComplete(ctx);
}

@Override
public void setActive() {
/// No-Op
}

@Override
public boolean isActive() {
return false;
}

@Override
public void setInactive() {
/// No-Op
}

@Override
public void asyncClose(CloseCallback callback, Object ctx) {
// No-Op
callback.closeComplete(ctx);
}

public void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback, final Object ctx) {
/// No-Op
callback.deleteCursorComplete(ctx);
}

@Override
public synchronized String toString() {
return Objects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
.add("readPos", readPosition).toString();
}

private static final Logger log = LoggerFactory.getLogger(NonDurableCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.RangeSet;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;

import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;

import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

public class PositionImpl implements Position, Comparable<PositionImpl> {

private final long ledgerId;
private final long entryId;

public static Position earliest = new PositionImpl(-1, -1);
public static Position latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);

public PositionImpl(PositionInfo pi) {
this.ledgerId = pi.getLedgerId();
this.entryId = pi.getEntryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public MockManagedCursor(ManagedCursorContainer container, String name, Position
this.position = position;
}

@Override
public boolean isDurable() {
return true;
}

@Override
public List<Entry> readEntries(int numberOfEntriesToRead) throws ManagedLedgerException {
return Lists.newArrayList();
Expand Down
Loading

0 comments on commit 98e687e

Please sign in to comment.