Skip to content

Commit

Permalink
Rest API for Ledger Offloading (#1639)
Browse files Browse the repository at this point in the history
* Rest API for Ledger Offloading

Implemented for both V1 and V2 topic name formats. API takes a message
ID, up to which the broker will try to offload messages. It returns
the message ID of the first message in the topic which has not been
offloaded.

This patch also adds basic support for setting the Offloader
implementation in the broker (needed for testing). Subsequent patches
will make this configurable through ServiceConfiguration.

* Split compaction endpoint into two

One for triggering and one for getting the current status.

* Add conflict to rest api doc

* Fixed build
  • Loading branch information
ivankelly authored and sijie committed May 2, 2018
1 parent b837af8 commit 5f678e0
Show file tree
Hide file tree
Showing 15 changed files with 426 additions and 43 deletions.
Expand Up @@ -46,7 +46,9 @@
import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -635,6 +637,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory(); return managedLedgerClientFactory.getManagedLedgerFactory();
} }


public LedgerOffloader getManagedLedgerOffloader() {
return NullLedgerOffloader.INSTANCE;
}

public ZooKeeperCache getLocalZkCache() { public ZooKeeperCache getLocalZkCache() {
return localZkCache; return localZkCache;
} }
Expand Down
Expand Up @@ -55,6 +55,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand All @@ -75,6 +76,8 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
Expand All @@ -86,7 +89,6 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.compaction.CompactionStatus;
import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -1110,12 +1112,30 @@ protected void internalTriggerCompaction(boolean authoritative) {
} }
} }


protected CompactionStatus internalCompactionStatus(boolean authoritative) { protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative); validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.compactionStatus(); return topic.compactionStatus();
} }


protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) {
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
topic.triggerOffload(messageId);
} catch (AlreadyRunningException e) {
throw new RestException(Status.CONFLICT, e.getMessage());
} catch (Exception e) {
throw new RestException(e);
}
}

protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
}

public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar, public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar,
String clientAppId, AuthenticationDataSource authenticationData, TopicName topicName) { String clientAppId, AuthenticationDataSource authenticationData, TopicName topicName) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
Expand Down
Expand Up @@ -38,9 +38,10 @@
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;


import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.compaction.CompactionStatus;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
Expand Down Expand Up @@ -445,11 +446,43 @@ public void compact(@PathParam("property") String property, @PathParam("cluster"
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") }) @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") })
public CompactionStatus compactionStatus( public LongRunningProcessStatus compactionStatus(
@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic); validateTopicName(property, cluster, namespace, encodedTopic);
return internalCompactionStatus(authoritative); return internalCompactionStatus(authoritative);
} }

@PUT
@Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
@ApiOperation(value = "Offload a prefix of a topic to long term storage")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Offload already running")})
public void triggerOffload(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
MessageIdImpl messageId) {
validateTopicName(tenant, cluster, namespace, encodedTopic);
internalTriggerOffload(authoritative, messageId);
}

@GET
@Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
@ApiOperation(value = "Offload a prefix of a topic to long term storage")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist")})
public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, cluster, namespace, encodedTopic);
return internalOffloadStatus(authoritative);
}
} }
Expand Up @@ -38,9 +38,10 @@
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;


import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.compaction.CompactionStatus;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
Expand Down Expand Up @@ -430,11 +431,41 @@ public void compact(@PathParam("tenant") String tenant,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") }) @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") })
public CompactionStatus compactionStatus( public LongRunningProcessStatus compactionStatus(
@PathParam("tenant") String tenant, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic); validateTopicName(tenant, namespace, encodedTopic);
return internalCompactionStatus(authoritative); return internalCompactionStatus(authoritative);
} }

@PUT
@Path("/{tenant}/{namespace}/{topic}/offload")
@ApiOperation(value = "Offload a prefix of a topic to long term storage")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Offload already running")})
public void triggerOffload(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
MessageIdImpl messageId) {
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerOffload(authoritative, messageId);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/offload")
@ApiOperation(value = "Offload a prefix of a topic to long term storage")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist")})
public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalOffloadStatus(authoritative);
}
} }
Expand Up @@ -722,6 +722,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());


managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());

future.complete(managedLedgerConfig); future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception))); }, (exception) -> future.completeExceptionally(exception)));


Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -81,13 +82,14 @@
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.compaction.CompactionStatus;
import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.ConsumerStats;
Expand Down Expand Up @@ -173,6 +175,9 @@ public class PersistentTopic implements Topic, AddEntryCallback {
CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
final CompactedTopic compactedTopic; final CompactedTopic compactedTopic;


CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
(MessageIdImpl)MessageId.earliest);

// Whether messages published must be encrypted or not in this topic // Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false; private volatile boolean isEncryptionRequired = false;


Expand Down Expand Up @@ -1683,23 +1688,61 @@ public synchronized void triggerCompaction()
} }
} }



public synchronized LongRunningProcessStatus compactionStatus() {
public synchronized CompactionStatus compactionStatus() {
final CompletableFuture<Long> current; final CompletableFuture<Long> current;
synchronized (this) { synchronized (this) {
current = currentCompaction; current = currentCompaction;
} }
if (!current.isDone()) { if (!current.isDone()) {
return CompactionStatus.forStatus(CompactionStatus.Status.RUNNING); return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
} else { } else {
try { try {
if (current.join() == COMPACTION_NEVER_RUN) { if (current.join() == COMPACTION_NEVER_RUN) {
return CompactionStatus.forStatus(CompactionStatus.Status.NOT_RUN); return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
} else {
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
}
} catch (CancellationException | CompletionException e) {
return LongRunningProcessStatus.forError(e.getMessage());
}
}
}

public synchronized void triggerOffload(MessageIdImpl messageId) throws AlreadyRunningException {
if (currentOffload.isDone()) {
CompletableFuture<MessageIdImpl> promise = currentOffload = new CompletableFuture<>();
getManagedLedger().asyncOffloadPrefix(
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()),
new OffloadCallback() {
@Override
public void offloadComplete(Position pos, Object ctx) {
PositionImpl impl = (PositionImpl)pos;

promise.complete(new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1));
}

@Override
public void offloadFailed(ManagedLedgerException exception, Object ctx) {
promise.completeExceptionally(exception);
}
}, null);
} else {
throw new AlreadyRunningException("Offload already in progress");
}
}

public synchronized OffloadProcessStatus offloadStatus() {
if (!currentOffload.isDone()) {
return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
} else {
try {
if (currentOffload.join() == MessageId.earliest) {
return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
} else { } else {
return CompactionStatus.forStatus(CompactionStatus.Status.SUCCESS); return OffloadProcessStatus.forSuccess(currentOffload.join());
} }
} catch (CancellationException | CompletionException e) { } catch (CancellationException | CompletionException e) {
return CompactionStatus.forError(e.getMessage()); return OffloadProcessStatus.forError(e.getMessage());
} }
} }
} }
Expand Down

0 comments on commit 5f678e0

Please sign in to comment.