Skip to content

Commit

Permalink
add api to increase partitions of existing non-global partitioned-top…
Browse files Browse the repository at this point in the history
…ic (#299)

* add api to increase partitions of existing non-global partitioned-topic

* add documentation for ordering violation
  • Loading branch information
rdhabalia authored and merlimat committed May 23, 2017
1 parent 96b8c20 commit 5ba05cc
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 38 deletions.
74 changes: 54 additions & 20 deletions docs/AdminTools.md
Expand Up @@ -55,6 +55,7 @@
- [get retention](#get-retention)
- [Persistent](#persistent)
- [create partitioned topic](#create-partitioned-topic)
- [update partitioned topic](#update-partitioned-topic)
- [get partitioned topic](#get-partitioned-topic)
- [delete partitioned topic](#delete-partitioned-topic)
- [Delete topic](#delete-topic)
Expand Down Expand Up @@ -1405,6 +1406,39 @@ PUT /admin/persistent/{property}/{cluster}/{namespace}/{destination}/partitions
admin.persistentTopics().createPartitionedTopic(persistentTopic, numPartitions)
```

#### update partitioned topic


It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be already
exist and number of new partitions must be greater than existing number of partitions. Decrementing number of partitions
requires deletion of topic which is not supported.
Already created partitioned producers and consumers can't see newly created partitions and it requires to recreate them at
application so, newly created producers and consumers can connect to newly added partitions as well. Therefore, it can
violate partition ordering at producers until all producers are restarted at application.

###### CLI


```
$ pulsar-admin persistent update-partitioned-topic --partitions 4 persistent://test-property/cl1/ns1/pt1
```

```
N/A
```

###### REST

```
POST /admin/persistent/{property}/{cluster}/{namespace}/{destination}/partitions
```

###### Java

```java
admin.persistentTopics().updatePartitionedTopic(persistentTopic, numPartitions)
```

#### get partitioned topic

It gives metadata of created partitioned topic.
Expand Down Expand Up @@ -2401,58 +2435,58 @@ In your terminal, go to below directory to play with client tool.
<table>
<tbody>
<tr>
<td colspan="2">```pulsar-client produce```</td>
<td colspan="2">pulsar-client produce</td>
</tr>
<tr>
<th>options</th>
<th>description</th>
</tr>
<tr>
<td>```-f, --files```</td>
<td>```Comma separated file paths to send. Cannot be used with -m. Either -f or -m must be provided```</td>
<td>-f, --files</td>
<td>Comma separated file paths to send. Cannot be used with -m. Either -f or -m must be provided</td>
</tr>
<tr>
<td>```-m, --messages```</td>
<td>```Comma separted string messages to send. Cannot be used with -f. Either -m or -f must be provided```</td>
<td>-m, --message`</td>
<td>Comma separted string messages to send. Cannot be used with -f. Either -m or -f must be provided</td>
</tr>
<tr>
<td>```-n, --num-produce```</td>
<td>```Number of times to send message(s), Default: 1```</td>
<td>-n, --num-produce</td>
<td>Number of times to send message(s), Default: 1</td>
</tr>
<tr>
<td>```-r, --rate```</td>
<td>```Rate (in msg/sec) at which to produce. Value of 0 will produce messages as fast as possible, Default: 0.0```</td>
<td>-r, --rate</td>
<td>Rate (in msg/sec) at which to produce. Value of 0 will produce messages as fast as possible, Default: 0.0</td>
</tr>
<table>

#### consume message command
<table>
<tbody>
<tr>
<td colspan="2">```pulsar-client consume```</td>
<td colspan="2">pulsar-client consume</td>
</tr>
<tr>
<th>options</th>
<th>description</th>
</tr>
<tr>
<td>```--hex```</td>
<td>```Display binary messages in hex, Default: false```</td>
<td>--hex</td>
<td>Display binary messages in hex, Default: false</td>
</tr>
<tr>
<td>```-n, --num-messages```</td>
<td>```Number of messages to consume, Default: 1```</td>
<td>-n, --num-messages</td>
<td>Number of messages to consume, Default: 1</td>
</tr>
<tr>
<td>```-r, --rate```</td>
<td>```Rate (in msg/sec) at which to consume. Value of 0 will consume messages as fast as possible, Default: 0.0```</td>
<td>-r, --rate</td>
<td>Rate (in msg/sec) at which to consume. Value of 0 will consume messages as fast as possible, Default: 0.0</td>
</tr>
<tr>
<td>```-s, --subscription-name```</td>
<td>```Subscription name```</td>
<td>-s, --subscription-name</td>
<td>Subscription name</td>
</tr>
<tr>
<td>```-t, --subscription-type```</td>
<td>```Subscription type: Exclusive, Shared, Failover, Default: Exclusive```</td>
<td>-t, --subscription-type</td>
<td>Subscription type: Exclusive, Shared, Failover, Default: Exclusive</td>
</tr>
<table>
Expand Up @@ -56,8 +56,10 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
Expand All @@ -79,6 +81,7 @@
import com.yahoo.pulsar.client.admin.PulsarAdminException.NotFoundException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.util.FutureUtil;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.KeyValue;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand All @@ -94,6 +97,8 @@
import com.yahoo.pulsar.common.policies.data.PersistentTopicInternalStats;
import com.yahoo.pulsar.common.policies.data.PersistentTopicStats;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.util.Codec;

import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -390,6 +395,50 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path
}
}

/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
* Already created partitioned producers and consumers can't see newly created partitions and it requires to
* recreate them at application so, newly created producers and consumers can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param property
* @param cluster
* @param namespace
* @param destination
* @param numPartitions
*/
@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
@ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist") })
public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
int numPartitions) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminAccessOnProperty(dn.getProperty());
if (dn.isGlobal()) {
log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", clientAppId(), dn);
throw new RestException(Status.FORBIDDEN, "Update forbidden on global namespace");
}
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
}
try {
updatePartitionedTopic(dn, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), dn, e.getCause());
throw new RestException(e.getCause());
}
}

@GET
@Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
Expand Down Expand Up @@ -1203,4 +1252,122 @@ private PersistentReplicator getReplicatorReference(String replName, PersistentT
throw new RestException(Status.NOT_FOUND, "Replicator not found");
}
}

private CompletableFuture<Void> updatePartitionedTopic(DestinationName dn, int numPartitions) {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(),
domain(), dn.getEncodedLocalName());

CompletableFuture<Void> updatePartition = new CompletableFuture<>();
createSubscriptions(dn, numPartitions).thenAccept(res -> {
try {
byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
globalZk().setData(path, data, -1, (rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
updatePartition.complete(null);
} else {
updatePartition.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc),
"failed to create update partitions"));
}
}, null);
} catch (Exception e) {
updatePartition.completeExceptionally(e);
}
}).exceptionally(ex -> {
updatePartition.completeExceptionally(ex);
return null;
});

return updatePartition;
}

/**
* It creates subscriptions for new partitions of existing partitioned-topics
*
* @param dn : topic-name: persistent://prop/cluster/ns/topic
* @param numPartitions : number partitions for the topics
*/
private CompletableFuture<Void> createSubscriptions(DestinationName dn, int numPartitions) {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, dn.getProperty(), dn.getCluster(), dn.getNamespacePortion(),
domain(), dn.getEncodedLocalName());
CompletableFuture<Void> result = new CompletableFuture<>();
fetchPartitionedTopicMetadataAsync(pulsar(), path).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions <= 1) {
result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic"));
return;
}

if (partitionMetadata.partitions >= numPartitions) {
result.completeExceptionally(new RestException(Status.CONFLICT,
"number of partitions must be more than existing " + partitionMetadata.partitions));
return;
}

// get list of cursors name of partition-1
final String ledgerName = dn.getPartition(1).getPersistenceNamingEncoding();
((ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory()).getMetaStore().getCursors(ledgerName,
new MetaStoreCallback<List<String>>() {

@Override
public void operationComplete(List<String> cursors,
org.apache.bookkeeper.mledger.impl.MetaStore.Stat stat) {
List<CompletableFuture<Void>> topicCreationFuture = Lists.newArrayList();
// create subscriptions for all new partition-topics
cursors.forEach(cursor -> {
String subName = Codec.decode(cursor);
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicName = dn.getPartition(i).toString();
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopic(topicName).handle((topic, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to create topic {}", clientAppId(), topicName);
future.completeExceptionally(ex);
return null;
} else {
topic.createSubscription(subName).handle((sub, e) -> {
if (e != null) {
log.warn("[{}] Failed to create subsciption {} {}", clientAppId(),
topicName, subName);
future.completeExceptionally(e);
return null;
} else {
log.info("[{}] Successfully create subsciption {} {}",
clientAppId(), topicName, subName);
// close topic
topic.close();
future.complete(null);
return null;
}
});
return null;
}
});
topicCreationFuture.add(future);
}
});
// wait for all subscriptions to be created
FutureUtil.waitForAll(topicCreationFuture).handle((res, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
log.info("[{}] Successfully create new partitions {}", clientAppId(),
dn.toString());
result.complete(null);
}
return null;
});
}

@Override
public void operationFailed(MetaStoreException ex) {
log.warn("[{}] Failed to get list of cursors of {}", clientAppId(), ledgerName);
result.completeExceptionally(ex);
}
});
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partition metadata for {}", clientAppId(), dn.toString());
result.completeExceptionally(ex);
return null;
});
return result;
}
}
Expand Up @@ -41,6 +41,8 @@ public interface PublishCallback {

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId);

CompletableFuture<PersistentSubscription> createSubscription(String subscriptionName);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Expand Up @@ -358,7 +358,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
lock.readLock().unlock();
}

CompletableFuture<Subscription> subscriptionFuture = isDurable ? //
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName) //
: getNonDurableSubscription(subscriptionName, startMessageId);

Expand Down Expand Up @@ -402,7 +402,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
return future;
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName) {
private CompletableFuture<? extends Subscription> getDurableSubscription(String subscriptionName) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
ledger.asyncOpenCursor(Codec.encode(subscriptionName), new OpenCursorCallback() {
@Override
Expand All @@ -425,7 +425,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
return subscriptionFuture;
}

private CompletableFuture<Subscription> getNonDurableSubscription(String subscriptionName, MessageId startMessageId) {
private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName, MessageId startMessageId) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();

Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
Expand All @@ -451,6 +451,12 @@ private CompletableFuture<Subscription> getNonDurableSubscription(String subscri
return subscriptionFuture;
}

@SuppressWarnings("unchecked")
@Override
public CompletableFuture<PersistentSubscription> createSubscription(String subscriptionName) {
return (CompletableFuture<PersistentSubscription>) getDurableSubscription(subscriptionName);
}

/**
* Delete the cursor ledger for a given subscription
*
Expand Down

0 comments on commit 5ba05cc

Please sign in to comment.