Skip to content

Commit

Permalink
Broker side deduplication (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 13, 2017
1 parent 817898e commit bdbb121
Show file tree
Hide file tree
Showing 17 changed files with 860 additions and 129 deletions.
23 changes: 21 additions & 2 deletions conf/broker.conf
Expand Up @@ -70,6 +70,25 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
# How frequently to proactively check and purge expired messages # How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5 messageExpiryCheckIntervalInMinutes=5


# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# Enable check for minimum allowed client library version # Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false clientLibraryVersionCheckEnabled=false


Expand Down Expand Up @@ -103,11 +122,11 @@ maxUnackedMessagesPerBroker=0


# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages # Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back # than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages # limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16


# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default # Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling # message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0 dispatchThrottlingRatePerTopicInMsg=0


# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling # Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
Expand Down
50 changes: 35 additions & 15 deletions conf/standalone.conf
Expand Up @@ -60,13 +60,33 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics # How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60 brokerDeleteInactiveTopicsFrequencySeconds=60


# How frequently to proactively check and purge expired messages # How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5 messageExpiryCheckIntervalInMinutes=5


# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360


# Enable check for minimum allowed client library version # Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false clientLibraryVersionCheckEnabled=false


# Allow client libraries with no version information # Allow client libraries with no version information
clientLibraryVersionCheckAllowUnversioned=true clientLibraryVersionCheckAllowUnversioned=true


# Path for the file used to determine the rotation status for the broker when responding # Path for the file used to determine the rotation status for the broker when responding
Expand All @@ -92,11 +112,11 @@ maxUnackedMessagesPerBroker=0


# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages # Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back # than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages # limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16


# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default # Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling # message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0 dispatchThrottlingRatePerTopicInMsg=0


# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling # Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
Expand Down Expand Up @@ -162,7 +182,7 @@ bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationParametersName= bookkeeperClientAuthenticationParametersName=
bookkeeperClientAuthenticationParameters= bookkeeperClientAuthenticationParameters=


# Timeout for BK add / read operations # Timeout for BK add / read operations
bookkeeperClientTimeoutInSeconds=30 bookkeeperClientTimeoutInSeconds=30


# Speculative reads are initiated if a read request doesn't complete within a certain time # Speculative reads are initiated if a read request doesn't complete within a certain time
Expand All @@ -178,11 +198,11 @@ bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800


# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when # Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
# forming a new bookie ensemble # forming a new bookie ensemble
bookkeeperClientRackawarePolicyEnabled=true bookkeeperClientRackawarePolicyEnabled=true


# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie # Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker # outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups= bookkeeperClientIsolationGroups=


### --- Managed Ledger --- ### ### --- Managed Ledger --- ###
Expand All @@ -198,7 +218,7 @@ managedLedgerDefaultAckQuorum=1


# Amount of memory to use for caching data payload in managed ledger. This memory # Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics # is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker # running in the same broker
managedLedgerCacheSizeMB=1024 managedLedgerCacheSizeMB=1024


# Threshold to which bring down the cache level when eviction is triggered # Threshold to which bring down the cache level when eviction is triggered
Expand All @@ -210,7 +230,7 @@ managedLedgerDefaultMarkDeleteRateLimit=0.1
# Max number of entries to append to a ledger before triggering a rollover # Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions # A ledger rollover is triggered on these conditions
# * Either the max rollover time has been reached # * Either the max rollover time has been reached
# * or max entries have been written to the ledged and at least min-time # * or max entries have been written to the ledged and at least min-time
# has passed # has passed
managedLedgerMaxEntriesPerLedger=50000 managedLedgerMaxEntriesPerLedger=50000


Expand All @@ -228,7 +248,7 @@ managedLedgerCursorRolloverTimeInSeconds=14400






### --- Load balancer --- ### ### --- Load balancer --- ###


# Enable load balancer # Enable load balancer
loadBalancerEnabled=false loadBalancerEnabled=false
Expand All @@ -246,13 +266,13 @@ loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1 loadBalancerHostUsageCheckIntervalMinutes=1


# Load shedding interval. Broker periodically checks whether some traffic should be offload from # Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers # some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=30 loadBalancerSheddingIntervalMinutes=30


# Prevent the same topics to be shed and moved to other broker more that once within this timeframe # Prevent the same topics to be shed and moved to other broker more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30 loadBalancerSheddingGracePeriodMinutes=30


# Usage threshold to determine a broker as under-loaded # Usage threshold to determine a broker as under-loaded
loadBalancerBrokerUnderloadedThresholdPercentage=1 loadBalancerBrokerUnderloadedThresholdPercentage=1


# Usage threshold to determine a broker as over-loaded # Usage threshold to determine a broker as over-loaded
Expand Down Expand Up @@ -289,7 +309,7 @@ replicationMetricsEnabled=true


# Max number of connections to open for each broker in a remote cluster # Max number of connections to open for each broker in a remote cluster
# More connections host-to-host lead to better throughput over high-latency # More connections host-to-host lead to better throughput over high-latency
# links. # links.
replicationConnectionsPerBroker=16 replicationConnectionsPerBroker=16


# Replicator producer queue size # Replicator producer queue size
Expand Down
Expand Up @@ -78,6 +78,26 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long brokerDeleteInactiveTopicsFrequencySeconds = 60; private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
// How frequently to proactively check and purge expired messages // How frequently to proactively check and purge expired messages
private int messageExpiryCheckIntervalInMinutes = 5; private int messageExpiryCheckIntervalInMinutes = 5;

// Set the default behavior for message deduplication in the broker
// This can be overridden per-namespace. If enabled, broker will reject
// messages that were already stored in the topic
private boolean brokerDeduplicationEnabled = false;

// Maximum number of producer information that it's going to be
// persisted for deduplication purposes
private int brokerDeduplicationMaxNumberOfProducers = 10000;

// Number of entries after which a dedup info snapshot is taken.
// A bigger interval will lead to less snapshots being taken though it would
// increase the topic recovery time, when the entries published after the
// snapshot need to be replayed
private int brokerDeduplicationEntriesInterval = 1000;

// Time of inactivity after which the broker will discard the deduplication information
// relative to a disconnected producer. Default is 6 hours.
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;

// Enable check for minimum allowed client library version // Enable check for minimum allowed client library version
private boolean clientLibraryVersionCheckEnabled = false; private boolean clientLibraryVersionCheckEnabled = false;
// Allow client libraries with no version information // Allow client libraries with no version information
Expand Down Expand Up @@ -105,7 +125,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
// limit/2 messages // limit/2 messages
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16; private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
// Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default // Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
// message dispatch-throttling // message dispatch-throttling
@FieldContext(dynamic = true) @FieldContext(dynamic = true)
private int dispatchThrottlingRatePerTopicInMsg = 0; private int dispatchThrottlingRatePerTopicInMsg = 0;
// Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling // Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling
Expand All @@ -124,10 +144,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int maxConcurrentTopicLoadRequest = 5000; private int maxConcurrentTopicLoadRequest = 5000;
// Max concurrent non-persistent message can be processed per connection // Max concurrent non-persistent message can be processed per connection
private int maxConcurrentNonPersistentMessagePerConnection = 1000; private int maxConcurrentNonPersistentMessagePerConnection = 1000;
// Number of worker threads to serve non-persistent topic // Number of worker threads to serve non-persistent topic
private int numWorkerThreadsForNonPersistentTopic = 8; private int numWorkerThreadsForNonPersistentTopic = 8;

// Enable broker to load persistent topics // Enable broker to load persistent topics
private boolean enablePersistentTopics = true; private boolean enablePersistentTopics = true;

// Enable broker to load non-persistent topics // Enable broker to load non-persistent topics
private boolean enableNonPersistentTopics = true; private boolean enableNonPersistentTopics = true;


Expand Down Expand Up @@ -437,6 +459,31 @@ public void setBrokerDeleteInactiveTopicsEnabled(boolean brokerDeleteInactiveTop
this.brokerDeleteInactiveTopicsEnabled = brokerDeleteInactiveTopicsEnabled; this.brokerDeleteInactiveTopicsEnabled = brokerDeleteInactiveTopicsEnabled;
} }


public int getBrokerDeduplicationMaxNumberOfProducers() {
return brokerDeduplicationMaxNumberOfProducers;
}

public void setBrokerDeduplicationMaxNumberOfProducers(int brokerDeduplicationMaxNumberOfProducers) {
this.brokerDeduplicationMaxNumberOfProducers = brokerDeduplicationMaxNumberOfProducers;
}

public int getBrokerDeduplicationEntriesInterval() {
return brokerDeduplicationEntriesInterval;
}

public void setBrokerDeduplicationEntriesInterval(int brokerDeduplicationEntriesInterval) {
this.brokerDeduplicationEntriesInterval = brokerDeduplicationEntriesInterval;
}

public int getBrokerDeduplicationProducerInactivityTimeoutMinutes() {
return brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public void setBrokerDeduplicationProducerInactivityTimeoutMinutes(
int brokerDeduplicationProducerInactivityTimeoutMinutes) {
this.brokerDeduplicationProducerInactivityTimeoutMinutes = brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public long getBrokerDeleteInactiveTopicsFrequencySeconds() { public long getBrokerDeleteInactiveTopicsFrequencySeconds() {
return brokerDeleteInactiveTopicsFrequencySeconds; return brokerDeleteInactiveTopicsFrequencySeconds;
} }
Expand All @@ -453,6 +500,14 @@ public void setMessageExpiryCheckIntervalInMinutes(int messageExpiryCheckInterva
this.messageExpiryCheckIntervalInMinutes = messageExpiryCheckIntervalInMinutes; this.messageExpiryCheckIntervalInMinutes = messageExpiryCheckIntervalInMinutes;
} }


public boolean isBrokerDeduplicationEnabled() {
return brokerDeduplicationEnabled;
}

public void setBrokerDeduplicationEnabled(boolean brokerDeduplicationEnabled) {
this.brokerDeduplicationEnabled = brokerDeduplicationEnabled;
}

public boolean isClientLibraryVersionCheckEnabled() { public boolean isClientLibraryVersionCheckEnabled() {
return clientLibraryVersionCheckEnabled; return clientLibraryVersionCheckEnabled;
} }
Expand Down
Expand Up @@ -653,6 +653,49 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path
} }
} }


@POST
@Path("/{property}/{cluster}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean enableDeduplication) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

NamespaceName nsName = new NamespaceName(property, cluster, namespace);
Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().deduplicationEnabled = enableDeduplication;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, property, cluster, namespace),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));

log.info("[{}] Successfully {} on namespace {}/{}/{}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", property, cluster, namespace);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify deplication status for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify deplication status on namespace {}/{}/{} expected policy node version={} : concurrent modification",
clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify deplication status on namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

@GET @GET
@Path("/{property}/{cluster}/{namespace}/bundles") @Path("/{property}/{cluster}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.") @ApiOperation(value = "Get the bundles split data.")
Expand Down

0 comments on commit bdbb121

Please sign in to comment.