Skip to content

Commit

Permalink
[ISSUE apache#4384] Add RetryPolicy interface
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Jun 1, 2022
1 parent 3bddd51 commit be83813
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.rocketmq.common.subscription;

import java.util.Arrays;
import com.google.common.base.MoreObjects;
import java.util.concurrent.TimeUnit;

public class CustomizedRetryPolicy {
/**
* CustomizedRetryPolicy is aim to make group's behavior compatible with messageDelayLevel
*
* @see <a href="https://github.com/apache/rocketmq/blob/3bd4b2b2f61a824196f19b03146e2c929c62777b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java#L137">org.apache.rocketmq.store.config.MessageStoreConfig</a>
*/
public class CustomizedRetryPolicy implements RetryPolicy {
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private long[] next = new long[]{
private long[] next = new long[] {
TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toMillis(5),
TimeUnit.SECONDS.toMillis(10),
Expand Down Expand Up @@ -51,10 +56,30 @@ public void setNext(long[] next) {
this.next = next;
}

@Override
public String toString() {
return "CustomizedRetryPolicy{" +
"next=" + Arrays.toString(next) +
'}';
@Override public String toString() {
return MoreObjects.toStringHelper(this)
.add("next", next)
.toString();
}

/**
* Index = reconsumeTimes + 2 is compatible logic, cause old delayLevelTable starts from index 1,
* and old index is reconsumeTime + 3
*
* @param reconsumeTimes Message reconsumeTimes {@link org.apache.rocketmq.common.message.MessageExt#getReconsumeTimes}
* @param timeUnit {@link TimeUnit}
* @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java#L210">org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor</a>
* @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L242">org.apache.rocketmq.store.DefaultMessageStore</a>
*/
@Override public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
if (reconsumeTimes < 0) {
reconsumeTimes = 0;
}
int index = reconsumeTimes + 2;
if (index >= next.length) {
index = next.length - 1;
}
long nextDelayDurationInMillis = next[index];
return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.rocketmq.common.subscription;

import com.google.common.base.MoreObjects;
import java.util.concurrent.TimeUnit;

/**
* next delay time = min(max, initial * multiplier^reconsumeTimes)
*/
public class ExponentialRetryPolicy {
public class ExponentialRetryPolicy implements RetryPolicy {
private long initial = TimeUnit.SECONDS.toMillis(5);
private long max = TimeUnit.HOURS.toMillis(2);
private long multiplier = 2;
Expand Down Expand Up @@ -51,12 +52,22 @@ public void setMultiplier(long multiplier) {
this.multiplier = multiplier;
}

@Override
public String toString() {
return "ExponentialRetryPolicy{" +
"initial=" + initial +
", max=" + max +
", multiplier=" + multiplier +
'}';
@Override public String toString() {
return MoreObjects.toStringHelper(this)
.add("initial", initial)
.add("max", max)
.add("multiplier", multiplier)
.toString();
}

@Override public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
if (reconsumeTimes < 0) {
reconsumeTimes = 0;
}
if (reconsumeTimes > 32) {
reconsumeTimes = 32;
}
long nextDelayDurationInMillis = Math.min(max, initial * (long) Math.pow(multiplier, reconsumeTimes));
return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.rocketmq.common.subscription;

import com.alibaba.fastjson.annotation.JSONField;
import com.google.common.base.MoreObjects;

public class GroupRetryPolicy {
private GroupRetryPolicyType type = GroupRetryPolicyType.EXPONENTIAL;
private final static RetryPolicy DEFAULT_RETRY_POLICY = new CustomizedRetryPolicy();
private GroupRetryPolicyType type = GroupRetryPolicyType.CUSTOMIZED;
private ExponentialRetryPolicy exponentialRetryPolicy;
private CustomizedRetryPolicy customizedRetryPolicy;

Expand Down Expand Up @@ -46,12 +50,28 @@ public void setCustomizedRetryPolicy(CustomizedRetryPolicy customizedRetryPolicy
this.customizedRetryPolicy = customizedRetryPolicy;
}

@Override
public String toString() {
return "GroupRetryPolicy{" +
"type=" + type +
", exponentialRetryPolicy=" + exponentialRetryPolicy +
", customizedRetryPolicy=" + customizedRetryPolicy +
'}';
@JSONField(serialize = false, deserialize = false)
public RetryPolicy getRetryPolicy() {
if (GroupRetryPolicyType.EXPONENTIAL.equals(type)) {
if (exponentialRetryPolicy == null) {
return DEFAULT_RETRY_POLICY;
}
return exponentialRetryPolicy;
} else if (GroupRetryPolicyType.CUSTOMIZED.equals(type)) {
if (customizedRetryPolicy == null) {
return DEFAULT_RETRY_POLICY;
}
return customizedRetryPolicy;
} else {
return DEFAULT_RETRY_POLICY;
}
}

@Override public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("exponentialRetryPolicy", exponentialRetryPolicy)
.add("customizedRetryPolicy", customizedRetryPolicy)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.subscription;

import java.util.concurrent.TimeUnit;

public interface RetryPolicy {
long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.subscription;

import java.util.concurrent.TimeUnit;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class CustomizedRetryPolicyTest {

@Test
public void testNextDelayDuration() {
CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
long actual = customizedRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
actual = customizedRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.MINUTES.toMillis(9));
}

@Test
public void testNextDelayDurationOutOfRange() {
CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
long actual = customizedRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
actual = customizedRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.subscription;

import java.util.concurrent.TimeUnit;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class ExponentialRetryPolicyTest {

@Test
public void testNextDelayDuration() {
ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
long actual = exponentialRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
actual = exponentialRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(1024 * 5));
}

@Test
public void testNextDelayDurationOutOfRange() {
ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
long actual = exponentialRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
actual = exponentialRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common.subscription;

import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class GroupRetryPolicyTest {

@Test
public void testGetRetryPolicy() {
GroupRetryPolicy groupRetryPolicy = new GroupRetryPolicy();
RetryPolicy retryPolicy = groupRetryPolicy.getRetryPolicy();
assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
groupRetryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
retryPolicy = groupRetryPolicy.getRetryPolicy();
assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);

groupRetryPolicy.setType(GroupRetryPolicyType.CUSTOMIZED);
groupRetryPolicy.setCustomizedRetryPolicy(new CustomizedRetryPolicy());
retryPolicy = groupRetryPolicy.getRetryPolicy();
assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);

groupRetryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
groupRetryPolicy.setExponentialRetryPolicy(new ExponentialRetryPolicy());
retryPolicy = groupRetryPolicy.getRetryPolicy();
assertThat(retryPolicy).isInstanceOf(ExponentialRetryPolicy.class);
}
}

0 comments on commit be83813

Please sign in to comment.