Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support replicator dispatch rate policy at the topic level #9175

Merged
merged 1 commit into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,18 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<DispatchRate> internalGetReplicatorDispatchRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate);
}

protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private void preValidation() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,90 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
@ApiOperation(value = "Get replicatorDispatchRate config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<DispatchRate> dispatchRate = internalGetReplicatorDispatchRate();
if (dispatchRate.isPresent()) {
asyncResponse.resume(dispatchRate.get());
} else {
asyncResponse.resume(Response.noContent().build());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
@ApiOperation(value = "Set replicatorDispatchRate config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")})
public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Replicator dispatch rate of the topic")
DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Updating replicatorDispatchRate failed", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Updating replicatorDispatchRate failed", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
+ ", replicatorDispatchRate={}"
, clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
@ApiOperation(value = "Remove replicatorDispatchRate config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove replicatorDispatchRate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Get maxProducers config for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ private DispatchRate createDispatchRate() {
}

/**
* Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies
* default broker dispatch-throttling-rate
* Update dispatch-throttling-rate.
* Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in
* broker-level
*/
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
Expand Down Expand Up @@ -189,6 +190,13 @@ public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService br
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getSubscriptionDispatchRate);
break;
case REPLICATOR:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getReplicatorDispatchRate);
break;
default:
break;
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies cache have not init.", topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,13 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
});
}

@Override
protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
long newEpoch = currentEpoch.orElse(-1L) + 1;
return setTopicEpoch(newEpoch);
}

@Override
protected CompletableFuture<Long> setTopicEpoch(long newEpoch) {
CompletableFuture<Long> future = new CompletableFuture<>();
ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(newEpoch), new UpdatePropertiesCallback() {
Expand Down Expand Up @@ -2059,7 +2061,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
}
});
replicators.forEach((name, replicator) ->
replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data))
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
);
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
Expand Down Expand Up @@ -2662,10 +2664,12 @@ public void onUpdate(TopicPolicies policies) {
}

initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent() && policies != null) {
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
}

private Optional<Policies> getNamespacePolicies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,4 +1387,25 @@ public void testMaxSubscriptionsPerTopic() throws Exception {
c.close();
}
}

@Test(timeOut = 30000)
public void testReplicatorRateApi() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

assertNull(admin.topics().getReplicatorDispatchRate(topic));

DispatchRate dispatchRate = new DispatchRate(100,200L,10);
admin.topics().setReplicatorDispatchRate(topic, dispatchRate);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate));

admin.topics().removeReplicatorDispatchRate(topic);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -41,6 +43,10 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

/**
* Starts 3 brokers that are in 3 different clusters
*/
Expand Down Expand Up @@ -74,6 +80,73 @@ public Object[][] dispatchRateProvider() {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}

@Test
public void testReplicatorRatePriority() throws Exception {
shutdown();
config1.setSystemTopicEnabled(true);
config1.setTopicLevelPoliciesEnabled(true);
config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
setup();

final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
final String topicName = "persistent://" + namespace + "/ratechange";

admin1.namespaces().createNamespace(namespace);
// set 2 clusters, there will be 1 replicator in each topic
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
client1.newProducer().topic(topicName).create().close();
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));

//use broker-level by default
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L);

//set namespace-level policy, which should take effect
DispatchRate nsDispatchRate = new DispatchRate(50, 60L, 70);
admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L);

//set topic-level policy, which should take effect
DispatchRate topicRate = new DispatchRate(10, 20L, 30);
admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);

//Set the namespace-level policy, which should not take effect
DispatchRate nsDispatchRate2 = new DispatchRate(500, 600L, 700);
admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);

//remove topic-level policy, namespace-level should take effect
admin1.topics().removeReplicatorDispatchRate(topicName);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
600L);

//remove namespace-level policy, broker-level should take effect
admin1.namespaces().setReplicatorDispatchRate(namespace, null);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
200L);
}

/**
* verifies dispatch rate for replicators get changed once namespace policies changed.
*
Expand Down Expand Up @@ -103,7 +176,6 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();

// 1. default replicator throttling not configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,70 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);

/**
* Set replicatorDispatchRate for the topic.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second
*
* @param topic
* @param dispatchRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;

/**
* Set replicatorDispatchRate for the topic asynchronously.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second.
*
* @param topic
* @param dispatchRate
* number of messages per second
*/
CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate);

/**
* Get replicatorDispatchRate for the topic.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second.
*
* @param topic
* @returns DispatchRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException;

/**
* Get replicatorDispatchRate asynchronously.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second.
*
* @param topic
* @returns DispatchRate
* number of messages per second
*/
CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic);

/**
* Remove replicatorDispatchRate for a topic.
* @param topic
* Topic name
* @throws PulsarAdminException
* Unexpected error
*/
void removeReplicatorDispatchRate(String topic) throws PulsarAdminException;

/**
* Remove replicatorDispatchRate for a topic asynchronously.
* @param topic
* Topic name
*/
CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic);

/**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
Expand Down
Loading