Skip to content

Commit

Permalink
ARTEMIS-2364 collision avoidance for redelivery
Browse files Browse the repository at this point in the history
This is a feature from 5.x implemented via
https://issues.apache.org/jira/browse/AMQ-747.
  • Loading branch information
jbertram authored and clebertsuconic committed Aug 26, 2019
1 parent 16adc8d commit 449f032
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ public void validate(final String name, final Object value) {
}
};

public static final Validator LE_ONE = new Validator() {
@Override
public void validate(final String name, final Object value) {
Number val = (Number) value;
if (val.doubleValue() <= 1) {
// OK
} else {
throw ActiveMQMessageBundle.BUNDLE.lessThanOrEqualToOne(name, val);
}
}
};

public static final Validator MINUS_ONE_OR_GT_ZERO = new Validator() {
@Override
public void validate(final String name, final Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String REDELIVERY_DELAY_MULTIPLIER_NODE_NAME = "redelivery-delay-multiplier";

private static final String REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME = "redelivery-collision-avoidance-factor";

private static final String MAX_REDELIVERY_DELAY_NODE_NAME = "max-redelivery-delay";

private static final String MAX_DELIVERY_ATTEMPTS = "max-delivery-attempts";
Expand Down Expand Up @@ -1046,6 +1048,11 @@ protected Pair<String, AddressSettings> parseAddressSettings(final Node node) {
addressSettings.setRedeliveryDelay(XMLUtil.parseLong(child));
} else if (REDELIVERY_DELAY_MULTIPLIER_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setRedeliveryMultiplier(XMLUtil.parseDouble(child));
} else if (REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME.equalsIgnoreCase(name)) {
double redeliveryCollisionAvoidanceFactor = XMLUtil.parseDouble(child);
Validators.GE_ZERO.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
Validators.LE_ONE.validate(REDELIVERY_COLLISION_AVOIDANCE_FACTOR_NODE_NAME, redeliveryCollisionAvoidanceFactor);
addressSettings.setRedeliveryCollisionAvoidanceFactor(redeliveryCollisionAvoidanceFactor);
} else if (MAX_REDELIVERY_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMaxRedeliveryDelay(XMLUtil.parseLong(child));
} else if (MAX_SIZE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,

@Message(id = 229227, value = "{0} must be equals to -1 or greater than 0 and less than or equal to Integer.MAX_VALUE (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException inRangeOfPositiveIntThanMinusOne(String name, Number val);

@Message(id = 229228, value = "{0} must be less than or equal to 1 (actual value: {1})", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException lessThanOrEqualToOne(String name, Number val);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -3617,9 +3619,15 @@ private long calculateRedeliveryDelay(final AddressSettings addressSettings, fin
long redeliveryDelay = addressSettings.getRedeliveryDelay();
long maxRedeliveryDelay = addressSettings.getMaxRedeliveryDelay();
double redeliveryMultiplier = addressSettings.getRedeliveryMultiplier();
double collisionAvoidanceFactor = addressSettings.getRedeliveryCollisionAvoidanceFactor();

int tmpDeliveryCount = deliveryCount > 0 ? deliveryCount - 1 : 0;
long delay = (long) (redeliveryDelay * (Math.pow(redeliveryMultiplier, tmpDeliveryCount)));
if (collisionAvoidanceFactor > 0) {
Random random = ThreadLocalRandom.current();
double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
delay += (delay * variance);
}

if (delay > maxRedeliveryDelay) {
delay = maxRedeliveryDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable

public static final double DEFAULT_REDELIVER_MULTIPLIER = 1.0;

public static final double DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR = 0.0;

public static final boolean DEFAULT_LAST_VALUE_QUEUE = false;

@Deprecated
Expand Down Expand Up @@ -125,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable

private Double redeliveryMultiplier = null;

private Double redeliveryCollisionAvoidanceFactor = null;

private Long maxRedeliveryDelay = null;

private SimpleString deadLetterAddress = null;
Expand Down Expand Up @@ -223,6 +227,7 @@ public AddressSettings(AddressSettings other) {
this.messageCounterHistoryDayLimit = other.messageCounterHistoryDayLimit;
this.redeliveryDelay = other.redeliveryDelay;
this.redeliveryMultiplier = other.redeliveryMultiplier;
this.redeliveryCollisionAvoidanceFactor = other.redeliveryCollisionAvoidanceFactor;
this.maxRedeliveryDelay = other.maxRedeliveryDelay;
this.deadLetterAddress = other.deadLetterAddress;
this.expiryAddress = other.expiryAddress;
Expand Down Expand Up @@ -566,6 +571,15 @@ public AddressSettings setRedeliveryMultiplier(final double redeliveryMultiplier
return this;
}

public double getRedeliveryCollisionAvoidanceFactor() {
return redeliveryCollisionAvoidanceFactor != null ? redeliveryCollisionAvoidanceFactor : AddressSettings.DEFAULT_REDELIVER_COLLISION_AVOIDANCE_FACTOR;
}

public AddressSettings setRedeliveryCollisionAvoidanceFactor(final double redeliveryCollisionAvoidanceFactor) {
this.redeliveryCollisionAvoidanceFactor = redeliveryCollisionAvoidanceFactor;
return this;
}

public long getMaxRedeliveryDelay() {
// default is redelivery-delay * 10 as specified on the docs and at this JIRA:
// https://issues.jboss.org/browse/HORNETQ-1263
Expand Down Expand Up @@ -776,6 +790,9 @@ public void merge(final AddressSettings merged) {
if (redeliveryMultiplier == null) {
redeliveryMultiplier = merged.redeliveryMultiplier;
}
if (redeliveryCollisionAvoidanceFactor == null) {
redeliveryCollisionAvoidanceFactor = merged.redeliveryCollisionAvoidanceFactor;
}
if (maxRedeliveryDelay == null) {
maxRedeliveryDelay = merged.maxRedeliveryDelay;
}
Expand Down Expand Up @@ -1059,6 +1076,10 @@ public void decode(ActiveMQBuffer buffer, boolean tryCompatible) {
if (buffer.readableBytes() > 0) {
defaultRingSize = BufferHelper.readNullableLong(buffer);
}

if (buffer.readableBytes() > 0) {
redeliveryCollisionAvoidanceFactor = BufferHelper.readNullableDouble(buffer);
}
}

@Override
Expand All @@ -1073,6 +1094,7 @@ public int getEncodeSize() {
BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
BufferHelper.sizeOfNullableLong(redeliveryDelay) +
BufferHelper.sizeOfNullableDouble(redeliveryMultiplier) +
BufferHelper.sizeOfNullableDouble(redeliveryCollisionAvoidanceFactor) +
BufferHelper.sizeOfNullableLong(maxRedeliveryDelay) +
SimpleString.sizeofNullableString(deadLetterAddress) +
SimpleString.sizeofNullableString(expiryAddress) +
Expand Down Expand Up @@ -1210,6 +1232,8 @@ public void encode(ActiveMQBuffer buffer) {

BufferHelper.writeNullableLong(buffer, defaultRingSize);

BufferHelper.writeNullableDouble(buffer, redeliveryCollisionAvoidanceFactor);

}

/* (non-Javadoc)
Expand All @@ -1235,6 +1259,7 @@ public int hashCode() {
result = prime * result + ((pageMaxCache == null) ? 0 : pageMaxCache.hashCode());
result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
result = prime * result + ((redeliveryMultiplier == null) ? 0 : redeliveryMultiplier.hashCode());
result = prime * result + ((redeliveryCollisionAvoidanceFactor == null) ? 0 : redeliveryCollisionAvoidanceFactor.hashCode());
result = prime * result + ((maxRedeliveryDelay == null) ? 0 : maxRedeliveryDelay.hashCode());
result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
Expand Down Expand Up @@ -1363,6 +1388,11 @@ public boolean equals(Object obj) {
return false;
} else if (!redeliveryMultiplier.equals(other.redeliveryMultiplier))
return false;
if (redeliveryCollisionAvoidanceFactor == null) {
if (other.redeliveryCollisionAvoidanceFactor != null)
return false;
} else if (!redeliveryCollisionAvoidanceFactor.equals(other.redeliveryCollisionAvoidanceFactor))
return false;
if (maxRedeliveryDelay == null) {
if (other.maxRedeliveryDelay != null)
return false;
Expand Down Expand Up @@ -1580,6 +1610,8 @@ public String toString() {
redeliveryDelay +
", redeliveryMultiplier=" +
redeliveryMultiplier +
", redeliveryCollisionAvoidanceFactor=" +
redeliveryCollisionAvoidanceFactor +
", maxRedeliveryDelay=" +
maxRedeliveryDelay +
", redistributionDelay=" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2991,6 +2991,14 @@
</xsd:annotation>
</xsd:element>

<xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
factor by which to modify the redelivery delay slightly to avoid collisions
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ public void testDefaults() {
assertEquals("a1.1", conf.getAddressesSettings().get("a1").getDeadLetterAddress().toString());
assertEquals("a1.2", conf.getAddressesSettings().get("a1").getExpiryAddress().toString());
assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
assertEquals(0.5, conf.getAddressesSettings().get("a1").getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(856686592L, conf.getAddressesSettings().get("a1").getMaxSizeBytes());
assertEquals(817381738L, conf.getAddressesSettings().get("a1").getPageSizeBytes());
assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
Expand All @@ -365,6 +366,7 @@ public void testDefaults() {
assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
assertEquals(0.0, conf.getAddressesSettings().get("a2").getRedeliveryCollisionAvoidanceFactor(), 0);
assertEquals(932489234928324L, conf.getAddressesSettings().get("a2").getMaxSizeBytes());
assertEquals(712671626L, conf.getAddressesSettings().get("a2").getPageSizeBytes());
assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
<page-size-bytes>817381738</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<dead-letter-address>a1.1</dead-letter-address>
<expiry-address>a1.2</expiry-address>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
<page-size-bytes>817381738</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
Expand Down
8 changes: 8 additions & 0 deletions artemis-tools/src/test/resources/artemis-configuration.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,14 @@
</xsd:annotation>
</xsd:element>

<xsd:element name="redelivery-collision-avoidance-factor" type="xsd:double" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
factor by which to modify the redelivery delay slightly to avoid collisions
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="max-redelivery-delay" type="xsd:long" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down
6 changes: 6 additions & 0 deletions docs/user-manual/en/address-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ that would be found in the `broker.xml` file.
<expiry-delay>123</expiry-delay>
<redelivery-delay>5000</redelivery-delay>
<redelivery-delay-multiplier>1.0</redelivery-delay-multiplier>
<redelivery-collision-avoidance-factor>0.0</redelivery-collision-avoidance-factor>
<max-redelivery-delay>10000</max-redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
<max-size-bytes>100000</max-size-bytes>
Expand Down Expand Up @@ -660,6 +661,11 @@ messages](undelivered-messages.md#configuring-delayed-redelivery).
Default is `1.0`. Read more about [undelivered
messages](undelivered-messages.md#configuring-delayed-redelivery).

`redelivery-collision-avoidance-factor` defines an additional factor used to
calculate an adjustment to the `redelivery-delay` (up or down). Default is
`0.0`. Valid values are between 0.0 and 1.0. Read more about [undelivered
messages](undelivered-messages.md#configuring-delayed-redelivery).

`max-size-bytes`, `page-size-bytes`, & `page-max-cache-size` are used to
configure paging on an address. This is explained
[here](paging.md#configuration).
Expand Down
7 changes: 4 additions & 3 deletions docs/user-manual/en/configuration-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,11 @@ Name | Description | Default
[match](address-model.md) | The filter to apply to the setting | n/a
[dead-letter-address](undelivered-messages.md) | Dead letter address | n/a
[expiry-address](message-expiry.md) | Expired messages address | n/a
[expiry-delay](address-model.md) | Expiration time override; -1 don't override | -1
[expiry-delay](message-expiry.md) | Expiration time override; -1 don't override | -1
[redelivery-delay](undelivered-messages.md) | Time to wait before redelivering a message | 0
[redelivery-delay-multiplier](address-model.md) | Multiplier to apply to the `redelivery-delay` | 1.0
[max-redelivery-delay](address-model.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
[redelivery-delay-multiplier](undelivered-messages.md) | Multiplier to apply to the `redelivery-delay` | 1.0
[redelivery-collision-avoidance-factor](undelivered-messages.md) | an additional factor used to calculate an adjustment to the `redelivery-delay` (up or down) | 0.0
[max-redelivery-delay](undelivered-messages.md) | Max value for the `redelivery-delay` | 10 \* `redelivery-delay`
[max-delivery-attempts](undelivered-messages.md)| Number of retries before dead letter address| 10
[max-size-bytes](paging.md)| Max size a queue can be before invoking `address-full-policy` | -1
[max-size-bytes-reject-threshold]() | Used with `BLOCK`, the max size an address can reach before messages are rejected; works in combination with `max-size-bytes` **for AMQP clients only**. | -1
Expand Down
58 changes: 48 additions & 10 deletions docs/user-manual/en/undelivered-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Delayed redelivery is defined in the address-setting configuration:
<redelivery-delay-multiplier>1.5</redelivery-delay-multiplier>
<!-- default is 0 (no delay) -->
<redelivery-delay>5000</redelivery-delay>
<!-- default is 0.0) -->
<redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
<!-- default is redelivery-delay * 10 -->
<max-redelivery-delay>50000</max-redelivery-delay>
</address-setting>
Expand All @@ -59,24 +61,60 @@ message will be sent asynchronously back to the queue after the delay.
You can specify a multiplier (the `redelivery-delay-multiplier`) that will
take effect on top of the `redelivery-delay`. Each time a message is redelivered
the delay period will be equal to the previous delay * `redelivery-delay-multiplier`.
A max-redelivery-delay can be set to prevent the delay from becoming too large.
The max-redelivery-delay is defaulted to redelivery-delay \* 10.
A `max-redelivery-delay` can be set to prevent the delay from becoming too large.
The `max-redelivery-delay` is defaulted to `redelivery-delay` \* 10.

Example:
**Example:**

- redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000
- redelivery-delay=5000, redelivery-delay-multiplier=2, max-redelivery-delay=15000,
redelivery-collision-avoidance-factor=0.0

1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 5000
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 10000 // (5000 * 2) < max-delay-period. Use 10000
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 15000 // (10000 * 2) > max-delay-period: Use max-delay-delivery
1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 5000
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 10000 // (5000 * 2) < max-delay-period. Use 10000
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 15000 // (10000 * 2) > max-delay-period: Use max-delay-delivery

Address wildcards can be used to configure redelivery delay for a set of
addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)), so you don't have to specify redelivery delay
individually for each address.

The `redelivery-delay` can be also be modified by configuring the
`redelivery-collision-avoidance-factor`. This factor will be made either
positive or negative at random to control whether the ultimate value will
increase or decrease the `redelivery-delay`. Then it's multiplied by a random
number between 0.0 and 1.0. This result is then multiplied by the
`redelivery-delay` and then added to the `redelivery-delay` to arrive at the
final value.

The algorithm may sound complicated but the bottom line is quite simple: the
larger `redelivery-collision-avoidance-factor` you choose the larger the variance
of the `redelivery-delay` will be. The `redelivery-collision-avoidance-factor`
must be between 0.0 and 1.0.

**Example:**

- redelivery-delay=1000, redelivery-delay-multiplier=1, max-redelivery-delay=15000,
redelivery-collision-avoidance-factor=0.5, (bold values chosen using
`java.util.Random`)

1. Delivery Attempt 1. (Unsuccessful)
2. Wait Delay Period: 875 // 1000 + (1000 * ((0.5 * __-1__) * __.25__)
3. Delivery Attempt 2. (Unsuccessful)
4. Wait Delay Period: 1375 // 1000 + (1000 * ((0.5 * __1__) * __.75__)
5. Delivery Attempt 3: (Unsuccessful)
6. Wait Delay Period: 975 // 1000 + (1000 * ((0.5 * __-1__) * __.05__)

This feature can be particularly useful in environments where there are
multiple consumers on the same queue all interacting transactionally
with the same external system (e.g. a database). If there is overlapping
data in messages which are consumed concurrently then one transaction can
succeed while all the rest fail. If those failed messages are redelivered
at the same time then this process where one consumer succeeds and the
rest fail will continue. By randomly padding the redelivery-delay by a
small, configurable amount these redelivery "collisions" can be avoided.

### Example

See [the examples chapter](examples.md) for an example which shows how delayed redelivery is configured
Expand Down
Loading

0 comments on commit 449f032

Please sign in to comment.