Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 2912][pulsar-admin] add get-message-by-id cmd into pulsar-admin #6331

Merged
merged 12 commits into from
Apr 22, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;

Expand All @@ -31,7 +38,6 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -52,11 +58,6 @@
import javax.ws.rs.core.StreamingOutput;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -1495,6 +1496,37 @@ protected void internalResetCursorOnPosition(String subName, boolean authoritati
}
}

protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId, long entryId,
boolean authoritative) {
verifyReadOperation(authoritative);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
try {
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(new RestException(exception));
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
asyncResponse.resume(generateResponseWithEntry(entry));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to release the Entry it after using it.

} catch (IOException exception) {
asyncResponse.resume(new RestException(exception));
}
}
}, null);
} catch (NullPointerException npe) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Message not found"));
} catch (Exception exception) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}",
clientAppId(), ledgerId, entryId, topicName, exception);
asyncResponse.resume(new RestException(exception));
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand All @@ -1503,13 +1535,18 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
}

protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
verifyReadOperation(authoritative);
validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
subName);
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Skip messages on a non-persistent topic is not allowed");
}

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
PersistentReplicator repl = null;
PersistentSubscription sub = null;
Expand All @@ -1525,48 +1562,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
} else {
entry = sub.peekNthMessage(messagePosition).get();
}
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();

StreamingOutput stream = new StreamingOutput() {

@Override
public void write(OutputStream output) throws IOException, WebApplicationException {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
}
};

return responseBuilder.entity(stream).build();
return generateResponseWithEntry(entry);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Message not found");
} catch (Exception exception) {
Expand All @@ -1580,6 +1576,57 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti
}
}

private void verifyReadOperation(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
}
}

private Response generateResponseWithEntry(Entry entry) throws IOException {
checkNotNull(entry);
PositionImpl pos = (PositionImpl) entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();

// moves the readerIndex to the payload
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);

ResponseBuilder responseBuilder = Response.ok();
responseBuilder.header("X-Pulsar-Message-ID", pos.toString());
for (KeyValue keyValue : metadata.getPropertiesList()) {
responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue());
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime()));
}
if (metadata.hasEventTime()) {
responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime()));
}
if (metadata.hasNumMessagesInBatch()) {
responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch());
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize());

// Copy into a heap buffer for output stream compatibility
ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(),
uncompressedPayload.readableBytes());
data.writeBytes(uncompressedPayload);
uncompressedPayload.release();

StreamingOutput stream = output -> {
output.write(data.array(), data.arrayOffset(), data.readableBytes());
data.release();
};

return responseBuilder.entity(stream).build();
}

protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,28 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}

@GET
@Path("/{property}/{cluster}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(hidden = true, value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't java admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the messageId does not exist")
})
public void getMessageByID(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("ledgerId") Long ledgerId,
@PathParam("entryId") Long entryId, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("{property}/{cluster}/{namespace}/{topic}/backlog")
@ApiOperation(hidden = true, value = "Get estimated backlog for offline topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,43 @@ public Response peekNthMessage(
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void getMessageById(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The ledger id", required = true)
@PathParam("ledgerId") long ledgerId,
@ApiParam(value = "The entry id", required = true)
@PathParam("entryId") long entryId,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("{tenant}/{namespace}/{topic}/backlog")
@ApiOperation(value = "Get estimated backlog for offline topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,32 @@ void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds)
*/
CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages);

/**
* Get a message by its messageId via a topic subscription
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return the message indexed by the messageId
* @throws PulsarAdminException
* Unexpected error
*/
Message<byte[]> getMessageById(String topic, long ledgerId, long entryId) throws PulsarAdminException;

/**
* Get a message by its messageId via a topic subscription asynchronously
* @param topic
* Topic name
* @param ledgerId
* Ledger id
* @param entryId
* Entry id
* @return a future that can be used to track when the message is returned
*/
CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId);

/**
* Create a new subscription on a topic
*
Expand Down
Loading