Skip to content

Commit

Permalink
[ISSUE apache#4384] Remove TimeUnit in nextDelayDuration
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Jun 15, 2022
1 parent b298f62 commit 18afbec
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,18 @@ public String toString() {
* and old index is reconsumeTime + 3
*
* @param reconsumeTimes Message reconsumeTimes {@link org.apache.rocketmq.common.message.MessageExt#getReconsumeTimes}
* @param timeUnit {@link TimeUnit}
* @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java#L210">org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor</a>
* @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L242">org.apache.rocketmq.store.DefaultMessageStore</a>
*/
@Override
public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
public long nextDelayDuration(int reconsumeTimes) {
if (reconsumeTimes < 0) {
reconsumeTimes = 0;
}
int index = reconsumeTimes + 2;
if (index >= next.length) {
index = next.length - 1;
}
long nextDelayDurationInMillis = next[index];
return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
return next[index];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ public String toString() {
}

@Override
public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
public long nextDelayDuration(int reconsumeTimes) {
if (reconsumeTimes < 0) {
reconsumeTimes = 0;
}
if (reconsumeTimes > 32) {
reconsumeTimes = 32;
}
long nextDelayDurationInMillis = Math.min(max, initial * (long) Math.pow(multiplier, reconsumeTimes));
return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
return Math.min(max, initial * (long) Math.pow(multiplier, reconsumeTimes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.rocketmq.common.subscription;

import java.util.concurrent.TimeUnit;

public interface RetryPolicy {
/**
* Compute message's next delay duration by specify reconsumeTimes
*
* @param reconsumeTimes Message reconsumeTimes
* @param timeUnit Given timeUnit
* @return Message's nextDelayDuration in given timeUnit
* @return Message's nextDelayDuration in milliseconds
*/
long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit);
long nextDelayDuration(int reconsumeTimes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ public class CustomizedRetryPolicyTest {
@Test
public void testNextDelayDuration() {
CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
long actual = customizedRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
long actual = customizedRetryPolicy.nextDelayDuration(0);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
actual = customizedRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
actual = customizedRetryPolicy.nextDelayDuration(10);
assertThat(actual).isEqualTo(TimeUnit.MINUTES.toMillis(9));
}

@Test
public void testNextDelayDurationOutOfRange() {
CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
long actual = customizedRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
long actual = customizedRetryPolicy.nextDelayDuration(-1);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
actual = customizedRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
actual = customizedRetryPolicy.nextDelayDuration(100);
assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ public class ExponentialRetryPolicyTest {
@Test
public void testNextDelayDuration() {
ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
long actual = exponentialRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
long actual = exponentialRetryPolicy.nextDelayDuration(0);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
actual = exponentialRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
actual = exponentialRetryPolicy.nextDelayDuration(10);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(1024 * 5));
}

@Test
public void testNextDelayDurationOutOfRange() {
ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
long actual = exponentialRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
long actual = exponentialRetryPolicy.nextDelayDuration(-1);
assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
actual = exponentialRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
actual = exponentialRetryPolicy.nextDelayDuration(100);
assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected void renewMessage(String key, MessageReceiptHandle messageReceiptHandl
} else {
CompletableFuture<AckResult> future = messagingProcessor.changeInvisibleTime(ProxyContext.create(),
handle, messageReceiptHandle.getMessageId(), messageReceiptHandle.getGroup(),
messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes(), TimeUnit.MILLISECONDS));
messageReceiptHandle.getTopic(), retryPolicy.nextDelayDuration(messageReceiptHandle.getReconsumeTimes()));
future.thenAccept(ackResult -> {
if (AckStatus.OK.equals(ackResult.getStatus())) {
removeReceiptHandle(key, messageReceiptHandle.getOriginalReceiptHandle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.rocketmq.proxy.processor;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void testRenewReceiptHandleWhenTimeout() {
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(messageId),
Mockito.eq(group), Mockito.eq(topic), Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(reconsumeTimes, TimeUnit.MILLISECONDS)));
Mockito.eq(group), Mockito.eq(topic), Mockito.eq(groupConfig.getGroupRetryPolicy().getRetryPolicy().nextDelayDuration(reconsumeTimes)));
}


Expand Down

0 comments on commit 18afbec

Please sign in to comment.