Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NonDurable cursor for managed ledger #366

Merged
merged 1 commit into from
May 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,4 +417,9 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
* @return
*/
public boolean isActive();

/**
* Tells whether the cursor is durable or just kept in memory
*/
public boolean isDurable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@ public interface ManagedLedger {
*/
public ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException;

/**
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
* exactly like a normal cursor, with the only difference in that after restart it will not remember which entries
* were deleted. Also it does not prevent data from being deleted.
Copy link
Contributor

@rdhabalia rdhabalia Apr 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it does not prevent data from being deleted.

Isn't it create unexpected behavior for client when slowest durable cursor moves to next ledger and ml trims the ledger. Let's say if this cursor is slower than other durable cursor then ManagedLedger will immediately delete data once durable-cursor consumeAndAck from that ml-ledger and switch the ledger, so non-durable cursor may not get expected chunk of data in stream.

  • Instead can't we treat non-durable cursor like durable and not delete until it consume messages. Because if non-durable cursor/consumer disconnects then broker is anyway removes this non-durable cursor and can move position according to durable-cursor's markDeletePosition.

* <p>
* The cursor is anonymous and can be positioned on an arbitrary position.
* <p>
* This method is not-blocking.
*
* @param startCursorPosition
* the position where the cursor should be initialized, or null to start from the current latest entry.
* When starting on a particular cursor position, the first entry to be returned will be the entry next
* to the specified position
* @return the new NonDurableCursor
*/
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;

/**
* Delete a ManagedCursor asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerImpl ledger;
private final String name;

private volatile PositionImpl markDeletePosition;
private volatile PositionImpl readPosition;
protected volatile PositionImpl markDeletePosition;
protected volatile PositionImpl readPosition;

protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
Expand All @@ -101,7 +101,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// This counters are used to compute the numberOfEntries and numberOfEntriesInBacklog values, without having to look
// at the list of ledgers in the ml. They are initialized to (-backlog) at opening, and will be incremented each
// time a message is read or deleted.
private volatile long messagesConsumedCounter;
protected volatile long messagesConsumedCounter;

// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
Expand Down Expand Up @@ -303,7 +303,6 @@ private void recoveredCursor(PositionImpl position) {
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));

markDeletePosition = position;
readPosition = ledger.getNextValidPosition(position);
STATE_UPDATER.set(this, State.NoLedger);
Expand Down Expand Up @@ -1151,7 +1150,7 @@ boolean hasMoreEntries(PositionImpl position) {

void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
readPosition = ledger.getNextValidPosition(lastPositionCounter.first);
markDeletePosition = PositionImpl.get(lastPositionCounter.first);
markDeletePosition = lastPositionCounter.first;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is 0, to ensure the initial backlog count is 0.
Expand Down Expand Up @@ -1257,7 +1256,7 @@ public void asyncMarkDelete(final Position position, final MarkDeleteCallback ca
internalAsyncMarkDelete(newPosition, callback, ctx);
}

private void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkDeleteCallback callback,
protected void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkDeleteCallback callback,
final Object ctx) {
ledger.mbean.addMarkDeleteOp();

Expand Down Expand Up @@ -1574,6 +1573,11 @@ public String getName() {
return name;
}

@Override
public boolean isDurable() {
return true;
}

@Override
public Position getReadPosition() {
return PositionImpl.get(readPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,14 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
}
}

@Override
public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException {
checkManagedLedgerIsOpen();
checkFenced();

return new NonDurableCursorImpl(bookKeeper, config, this, null, (PositionImpl) startCursorPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for debugging, should broker generate or expect cursor-name or id rather passing null?

}

@Override
public Iterable<ManagedCursor> getCursors() {
return cursors;
Expand Down Expand Up @@ -1393,6 +1401,13 @@ void internalTrimConsumedLedgers() {
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > ((long) config.getRetentionSizeInMB()) * 1024 * 1024;

if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- expired: {} -- over-quota: {} -- current-ledger: {}",
name, ls.getLedgerId(), (System.currentTimeMillis() - ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId() || (!expired && !overRetentionQuota)) {
if (log.isDebugEnabled()) {
if (!expired) {
Expand Down Expand Up @@ -1868,7 +1883,7 @@ public void deactivateCursor(ManagedCursor cursor) {
}

public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
return cursor.isDurable() && activeCursors.get(cursor.getName()) != null;
}

private boolean currentLedgerIsFull() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Objects;
import com.google.common.collect.Range;

public class NonDurableCursorImpl extends ManagedCursorImpl {

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition) {
super(bookkeeper, config, ledger, cursorName);

if (startCursorPosition == null || startCursorPosition.equals(PositionImpl.latest)) {
// Start from last entry
initializeCursorPosition(ledger.getLastPositionAndCounter());
} else if (startCursorPosition.equals(PositionImpl.earliest)) {
// Start from invalid ledger to read from first available entry
recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
} else {
// Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
// read-position
recoverCursor(startCursorPosition);
}

log.info("[{}] Created non-durable cursor read-position={} mark-delete-position={}", ledger.getName(),
readPosition, markDeletePosition);
}

private void recoverCursor(PositionImpl mdPosition) {
Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
this.readPosition = ledger.getNextValidPosition(mdPosition);
markDeletePosition = mdPosition;

// Initialize the counter such that the difference between the messages written on the ML and the
// messagesConsumed is equal to the current backlog (negated).
long initialBacklog = readPosition.compareTo(lastEntryAndCounter.first) < 0
? ledger.getNumberOfEntries(Range.closed(readPosition, lastEntryAndCounter.first)) : 0;
messagesConsumedCounter = lastEntryAndCounter.second - initialBacklog;
}

@Override
public boolean isDurable() {
return false;
}

/// Overridden methods from ManagedCursorImpl. Void implementation to skip cursor persistence

@Override
void recover(final VoidCallback callback) {
/// No-Op
}

@Override
protected void internalAsyncMarkDelete(final PositionImpl newPosition, final MarkDeleteCallback callback,
final Object ctx) {
// Bypass persistence of mark-delete position and individually deleted messages info
callback.markDeleteComplete(ctx);
}

@Override
public void setActive() {
/// No-Op
}

@Override
public boolean isActive() {
return false;
}

@Override
public void setInactive() {
/// No-Op
}

@Override
public void asyncClose(CloseCallback callback, Object ctx) {
// No-Op
callback.closeComplete(ctx);
}

public void asyncDeleteCursor(final String consumerName, final DeleteCursorCallback callback, final Object ctx) {
/// No-Op
callback.deleteCursorComplete(ctx);
}

@Override
public synchronized String toString() {
return Objects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
.add("readPos", readPosition).toString();
}

private static final Logger log = LoggerFactory.getLogger(NonDurableCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.RangeSet;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;

import com.google.common.base.Objects;
import com.google.common.collect.ComparisonChain;

import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;

public class PositionImpl implements Position, Comparable<PositionImpl> {

private final long ledgerId;
private final long entryId;

public static Position earliest = new PositionImpl(-1, -1);
public static Position latest = new PositionImpl(Long.MAX_VALUE, Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MIN_POSITION / MAX_POSITION?


public PositionImpl(PositionInfo pi) {
this.ledgerId = pi.getLedgerId();
this.entryId = pi.getEntryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public MockManagedCursor(ManagedCursorContainer container, String name, Position
this.position = position;
}

@Override
public boolean isDurable() {
return true;
}

@Override
public List<Entry> readEntries(int numberOfEntriesToRead) throws ManagedLedgerException {
return Lists.newArrayList();
Expand Down
Loading