Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -59,6 +61,18 @@
@InterfaceStability.Stable
public interface ManagedLedger {

boolean isValidPosition(PositionImpl nextReadPosition);

boolean hasMoreEntries(PositionImpl nextReadPosition);

void addWaitingEntryCallBack(WaitingEntryCallBack streamingEntryReader);

// define boundaries for position based seeks and searches
enum PositionBound {
startIncluded, startExcluded
}


/**
* @return the unique name of this ManagedLedger
*/
Expand Down Expand Up @@ -387,6 +401,23 @@ public interface ManagedLedger {
*/
long getEstimatedBacklogSize();

/**
* Get estimated backlog size from a specific position.
* @param pos
* @return
*/
long getEstimatedBacklogSize(PositionImpl pos);

/**
* number of entries are in add progress
*/
int getPendingAddEntriesCount();

/**
* Get the total size in bytes of all the entries stored in this cache.
*/
long getCacheSize();

/**
* Return the size of all ledgers offloaded to 2nd tier storage
*/
Expand Down Expand Up @@ -590,6 +621,25 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
* */
CompletableFuture<Position> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);

/**
* Get the entry position at a given distance from a given position.
*
* @param startPosition
* starting position
* @param n
* number of entries to skip ahead
* @param startRange
* specifies whether or not to include the start position in calculating the distance
* @return the new position that is n entries ahead
*/
PositionImpl getPositionAfterN(final PositionImpl startPosition, long n,
ManagedLedgerImpl.PositionBound startRange);

/**
* first position of current topic
*/
PositionImpl getFirstPosition();

/**
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
Expand All @@ -600,4 +650,10 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
* will got null if corresponding ledger not exists.
*/
CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);

/**
* get the valid position next to the given one
* @param position current postion
*/
PositionImpl getNextValidPosition(final PositionImpl position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
Expand All @@ -38,9 +37,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;

import io.netty.util.concurrent.FastThreadLocal;

import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -62,7 +59,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
Expand All @@ -81,13 +77,13 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedger.PositionBound;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -77,7 +75,6 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.Retries;
Expand Down Expand Up @@ -232,11 +229,6 @@ enum State {
// After handling the BK write failure, managed ledger will get signalled to create a new ledger
}

// define boundaries for position based seeks and searches
public enum PositionBound {
startIncluded, startExcluded
}

private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ManagedLedgerImpl.class, State.class, "state");
protected volatile State state = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;

import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger.PositionBound;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;

@Slf4j
class OpFindNewest implements ReadEntryCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
package org.apache.bookkeeper.mledger.impl;

import com.google.common.collect.Range;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger.PositionBound;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;

@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -2193,38 +2193,37 @@ private void getEntryBatchSize(CompletableFuture<Integer> batchSizeFuture, Persi
MessageIdImpl messageId, int batchIndex) {
if (batchIndex >= 0) {
try {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId()), new AsyncCallbacks.ReadEntryCallback() {
ManagedLedger ledger = topic.getManagedLedger();
final PositionImpl position = new PositionImpl(messageId.getLedgerId(),
messageId.getEntryId());
ledger.newNonDurableCursor(position).asyncReadEntries(1, new AsyncCallbacks.ReadEntriesCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
// Since we can't read the message from the storage layer,
// it might be an already delete message ID or an invalid message ID
// We should fall back to non batch index seek.
batchSizeFuture.complete(0);
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
try {
try {
if (entry == null) {
batchSizeFuture.complete(0);
} else {
MessageMetadata metadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
}
} catch (Exception e) {
batchSizeFuture.completeExceptionally(new RestException(e));
if (entries.isEmpty()) {
batchSizeFuture.complete(0);
} else {
MessageMetadata metadata =
Commands.parseMessageMetadata(entries.get(0).getDataBuffer());
batchSizeFuture.complete(metadata.getNumMessagesInBatch());
}
} catch (Exception e) {
batchSizeFuture.completeExceptionally(new RestException(e));
} finally {
if (entry != null) {
for (Entry entry : entries) {
entry.release();
}
}
}
}, null);

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
// Since we can't read the message from the storage layer,
// it might be an already delete message ID or an invalid message ID
// We should fall back to non batch index seek.
batchSizeFuture.complete(0);
}
}, null, PositionImpl.latest);
} catch (NullPointerException npe) {
batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
} catch (Exception exception) {
Expand Down Expand Up @@ -2278,26 +2277,27 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId
// will redirect if the topic not owned by current broker
validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(new RestException(exception));
}

ManagedLedger ledger = topic.getManagedLedger();
final PositionImpl position = new PositionImpl(ledgerId, entryId);
ledger.newNonDurableCursor(position).asyncReadEntries(1, new AsyncCallbacks.ReadEntriesCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
try {
asyncResponse.resume(generateResponseWithEntry(entry));
} catch (IOException exception) {
asyncResponse.resume(new RestException(exception));
asyncResponse.resume(generateResponseWithEntry(entries.get(0)));
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
} finally {
if (entry != null) {
for (Entry entry : entries) {
entry.release();
}
}
}
}, null);

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(new RestException(exception));
}
}, null, PositionImpl.latest);
} catch (NullPointerException npe) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found"));
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -132,7 +132,7 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {

// Get estimated unconsumed size for the managed ledger associated with this topic. Estimated size is more
// useful than the actual storage size. Actual storage size gets updated only when managed ledger is trimmed.
ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedLedger mLedger = persistentTopic.getManagedLedger();
long backlogSize = mLedger.getEstimatedBacklogSize();

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Promise;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
Expand All @@ -47,9 +48,9 @@
import javax.net.ssl.SSLSession;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1594,7 +1595,7 @@ private void getLargestBatchIndexWhenPossible(
String subscriptionName) {

PersistentTopic persistentTopic = (PersistentTopic) topic;
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedLedger ml = persistentTopic.getManagedLedger();

// If it's not pointing to a valid entry, respond messageId of the current position.
if (position.getEntryId() == -1) {
Expand All @@ -1605,17 +1606,25 @@ private void getLargestBatchIndexWhenPossible(

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
}
try {
ml.newNonDurableCursor(position).asyncReadEntries(1, new AsyncCallbacks.ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (!entries.isEmpty()) {
entryFuture.complete(entries.get(0));
} else {
entryFuture.complete(null);
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null);
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}
}, null, PositionImpl.latest);
} catch (ManagedLedgerException e) {
entryFuture.completeExceptionally(e);
}

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
Expand Down
Loading