Skip to content

Commit

Permalink
Add persistent-policies validation and dynamic policy update on manag…
Browse files Browse the repository at this point in the history
…ed-ledger/cursor (#781)

* Add persistent-policies validation and dynamic policy update on managed-ledger/cursor

* addressed comments: handle/setting markDelete
  • Loading branch information
rdhabalia authored and merlimat committed Sep 26, 2017
1 parent a98e818 commit c359905
Show file tree
Hide file tree
Showing 11 changed files with 503 additions and 157 deletions.
Expand Up @@ -475,4 +475,18 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
* @return * @return
*/ */
int getTotalNonContiguousDeletedMessagesRange(); int getTotalNonContiguousDeletedMessagesRange();

/**
* Returns cursor throttle mark-delete rate
*
* @return
*/
double getThrottleMarkDelete();

/**
* Update throttle mark delete rate
*
*/
void setThrottleMarkDelete(double throttleMarkDelete);

} }
Expand Up @@ -321,4 +321,16 @@ public interface ManagedLedger {
* Returns whether the managed ledger was terminated * Returns whether the managed ledger was terminated
*/ */
public boolean isTerminated(); public boolean isTerminated();

/**
* Returns managed-ledger config
*/
ManagedLedgerConfig getConfig();

/**
* Updates managed-ledger config
*
* @param config
*/
void setConfig(ManagedLedgerConfig config);
} }
Expand Up @@ -121,7 +121,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private final RangeSet<PositionImpl> individualDeletedMessages = TreeRangeSet.create(); private final RangeSet<PositionImpl> individualDeletedMessages = TreeRangeSet.create();
private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ReadWriteLock lock = new ReentrantReadWriteLock();


private final RateLimiter markDeleteLimiter; private RateLimiter markDeleteLimiter;


class MarkDeleteEntry { class MarkDeleteEntry {
final PositionImpl newPosition; final PositionImpl newPosition;
Expand Down Expand Up @@ -2287,5 +2287,24 @@ public String getState() {
return STATE_UPDATER.get(this).toString(); return STATE_UPDATER.get(this).toString();
} }


@Override
public double getThrottleMarkDelete() {
return this.markDeleteLimiter.getRate();
}

@Override
public void setThrottleMarkDelete(double throttleMarkDelete) {
if (throttleMarkDelete > 0.0) {
if (markDeleteLimiter == null) {
markDeleteLimiter = RateLimiter.create(throttleMarkDelete);
} else {
this.markDeleteLimiter.setRate(throttleMarkDelete);
}
} else {
// Disable mark-delete rate limiter
markDeleteLimiter = null;
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
} }
Expand Up @@ -99,7 +99,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final BookKeeper bookKeeper; private final BookKeeper bookKeeper;
private final String name; private final String name;


private final ManagedLedgerConfig config; private ManagedLedgerConfig config;
private final MetaStore store; private final MetaStore store;


private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> ledgerCache = new ConcurrentLongHashMap<>(); private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> ledgerCache = new ConcurrentLongHashMap<>();
Expand Down Expand Up @@ -2092,10 +2092,17 @@ MetaStore getStore() {
return store; return store;
} }


ManagedLedgerConfig getConfig() { @Override
public ManagedLedgerConfig getConfig() {
return config; return config;
} }


@Override
public void setConfig(ManagedLedgerConfig config) {
this.config = config;
this.cursors.forEach(c -> c.setThrottleMarkDelete(config.getThrottleMarkDelete()));
}

static interface ManagedLedgerInitializeLedgerCallback { static interface ManagedLedgerInitializeLedgerCallback {
public void initializeComplete(); public void initializeComplete();


Expand Down
Expand Up @@ -263,6 +263,15 @@ public long getNumberOfEntriesSinceFirstNotAckedMessage() {
public int getTotalNonContiguousDeletedMessagesRange() { public int getTotalNonContiguousDeletedMessagesRange() {
return 0; return 0;
} }

@Override
public void setThrottleMarkDelete(double throttleMarkDelete) {
}

@Override
public double getThrottleMarkDelete() {
return -1;
}
} }


@Test @Test
Expand Down
Expand Up @@ -226,6 +226,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Number of guaranteed copies (acks to wait before write is complete) // Number of guaranteed copies (acks to wait before write is complete)
@FieldContext(minValue = 1) @FieldContext(minValue = 1)
private int managedLedgerDefaultAckQuorum = 1; private int managedLedgerDefaultAckQuorum = 1;
// Max number of bookies to use when creating a ledger
@FieldContext(minValue = 1)
private int managedLedgerMaxEnsembleSize = 5;
// Max number of copies to store for each message
@FieldContext(minValue = 1)
private int managedLedgerMaxWriteQuorum = 5;
// Max number of guaranteed copies (acks to wait before write is complete)
@FieldContext(minValue = 1)
private int managedLedgerMaxAckQuorum = 5;
// Amount of memory to use for caching data payload in managed ledger. This // Amount of memory to use for caching data payload in managed ledger. This
// memory // memory
// is allocated from JVM direct memory and it's shared across all the topics // is allocated from JVM direct memory and it's shared across all the topics
Expand Down Expand Up @@ -851,6 +860,30 @@ public void setManagedLedgerDefaultAckQuorum(int managedLedgerDefaultAckQuorum)
this.managedLedgerDefaultAckQuorum = managedLedgerDefaultAckQuorum; this.managedLedgerDefaultAckQuorum = managedLedgerDefaultAckQuorum;
} }


public int getManagedLedgerMaxEnsembleSize() {
return managedLedgerMaxEnsembleSize;
}

public void setManagedLedgerMaxEnsembleSize(int managedLedgerMaxEnsembleSize) {
this.managedLedgerMaxEnsembleSize = managedLedgerMaxEnsembleSize;
}

public int getManagedLedgerMaxWriteQuorum() {
return managedLedgerMaxWriteQuorum;
}

public void setManagedLedgerMaxWriteQuorum(int managedLedgerMaxWriteQuorum) {
this.managedLedgerMaxWriteQuorum = managedLedgerMaxWriteQuorum;
}

public int getManagedLedgerMaxAckQuorum() {
return managedLedgerMaxAckQuorum;
}

public void setManagedLedgerMaxAckQuorum(int managedLedgerMaxAckQuorum) {
this.managedLedgerMaxAckQuorum = managedLedgerMaxAckQuorum;
}

public int getManagedLedgerCacheSizeMB() { public int getManagedLedgerCacheSizeMB() {
return managedLedgerCacheSizeMB; return managedLedgerCacheSizeMB;
} }
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin; package org.apache.pulsar.broker.admin;


import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;


import java.net.URI; import java.net.URI;
Expand Down Expand Up @@ -48,6 +49,7 @@
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;


import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -1121,10 +1123,12 @@ private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
@ApiOperation(value = "Set the persistence configuration for all the destinations on a namespace.") @ApiOperation(value = "Set the persistence configuration for all the destinations on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"), @ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") }) @ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence policies") })
public void setPersistence(@PathParam("property") String property, @PathParam("cluster") String cluster, public void setPersistence(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, PersistencePolicies persistence) { @PathParam("namespace") String namespace, PersistencePolicies persistence) {
validatePoliciesReadOnlyAccess(); validatePoliciesReadOnlyAccess();
validatePersistencePolicies(persistence);


try { try {
Stat nodeStat = new Stat(); Stat nodeStat = new Stat();
Expand Down Expand Up @@ -1152,6 +1156,26 @@ public void setPersistence(@PathParam("property") String property, @PathParam("c
} }
} }


private void validatePersistencePolicies(PersistencePolicies persistence) {
try {
checkNotNull(persistence);
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= %s", config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= %s", config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= %s", config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
"Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)", persistence.getBookkeeperEnsemble(),
persistence.getBookkeeperWriteQuorum(), persistence.getBookkeeperAckQuorum());
}catch(NullPointerException | IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, e.getMessage());
}
}

@GET @GET
@Path("/{property}/{cluster}/{namespace}/persistence") @Path("/{property}/{cluster}/{namespace}/persistence")
@ApiOperation(value = "Get the persistence configuration for a namespace.") @ApiOperation(value = "Get the persistence configuration for a namespace.")
Expand Down
Expand Up @@ -947,7 +947,7 @@ public ConcurrentOpenHashMap<String, CompletableFuture<Topic>> getTopics() {
public void onUpdate(String path, Policies data, Stat stat) { public void onUpdate(String path, Policies data, Stat stat) {
final NamespaceName namespace = new NamespaceName(NamespaceBundleFactory.getNamespaceFromPoliciesPath(path)); final NamespaceName namespace = new NamespaceName(NamespaceBundleFactory.getNamespaceFromPoliciesPath(path));


log.info("Updated {}", path); log.info("{} updating with {}", path, data);


topics.forEach((name, topicFuture) -> { topics.forEach((name, topicFuture) -> {
if (namespace.includes(DestinationName.get(name))) { if (namespace.includes(DestinationName.get(name))) {
Expand Down
Expand Up @@ -730,6 +730,21 @@ public CompletableFuture<Void> checkDeduplicationStatus() {
return messageDeduplication.checkStatus(); return messageDeduplication.checkStatus();
} }


private CompletableFuture<Void> checkPersistencePolicies() {
DestinationName topicName = DestinationName.get(topic);
CompletableFuture<Void> future = new CompletableFuture<>();
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
// update managed-ledger config and managed-cursor.markDeleteRate
this.ledger.setConfig(config);
future.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to update persistence-policies {}", topic, ex.getMessage());
future.completeExceptionally(ex);
return null;
});
return future;
}

@Override @Override
public CompletableFuture<Void> checkReplication() { public CompletableFuture<Void> checkReplication() {
DestinationName name = DestinationName.get(topic); DestinationName name = DestinationName.get(topic);
Expand Down Expand Up @@ -1331,7 +1346,8 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
checkMessageExpiry(); checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure(); CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus(); CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
return CompletableFuture.allOf(replicationFuture, dedupFuture); CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
} }


/** /**
Expand Down

0 comments on commit c359905

Please sign in to comment.