Skip to content

Commit

Permalink
[ISSUE #8300] Add more test coverage for DefaultMQProducer (#8301)
Browse files Browse the repository at this point in the history
* [ISSUE #8300] Add more test coverage for DefaultMQProducer

* Add more tests.

* Update
  • Loading branch information
yx9o committed Jun 18, 2024
1 parent 568950b commit 1511809
Showing 1 changed file with 182 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
Expand All @@ -49,6 +51,7 @@

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -82,15 +85,14 @@ public class DefaultMQProducerTest {
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private NettyRemotingClient nettyRemotingClient;

private DefaultMQProducer producer;
private Message message;
private Message zeroMsg;
private Message bigMessage;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private final String topic = "FooBar";
private final String producerGroupPrefix = "FooBar_PID";
private final long defaultTimeout = 3000L;

@Before
public void init() throws Exception {
Expand Down Expand Up @@ -196,7 +198,7 @@ public void onException(Throwable e) {
countDownLatch.countDown();
}
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
}

@Test
Expand Down Expand Up @@ -240,7 +242,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//this message is send success
producer.send(message, sendCallback, 1000);

countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(5);

// off enableBackpressureForAsyncMode
Expand All @@ -253,7 +255,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//this message is send success
producer.send(message, sendCallback, 1000);

countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(10);
}

Expand Down Expand Up @@ -301,7 +303,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// this message is send failed
producer.send(msgs, new MessageQueue(), sendCallback, 1000);

countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);

// off enableBackpressureForAsyncMode
Expand All @@ -312,7 +314,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// this message is send failed
producer.send(msgs, new MessageQueue(), sendCallback, 1000);

countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(2);
}

Expand All @@ -333,7 +335,7 @@ public void onSuccess(SendResult sendResult) {
public void onException(Throwable e) {
}
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
}

@Test
Expand Down Expand Up @@ -472,7 +474,7 @@ public void onException(Throwable e) {
future.setSendRequestOk(true);
future.getRequestCallback().onSuccess(responseMsg);
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
}

@Test
Expand Down Expand Up @@ -509,7 +511,7 @@ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
future.getRequestCallback().onException(e);
}
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);
}

Expand All @@ -533,7 +535,7 @@ public void onException(Throwable e) {
}
});

countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
producer.setAutoBatch(false);
}

Expand Down Expand Up @@ -662,4 +664,171 @@ public void assertCreateDefaultMQProducer() {
assertTrue(producer5.isEnableTrace());
assertEquals("custom_trace_topic", producer5.getTraceTopic());
}

@Test
public void assertSend() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException {
setDefaultMQProducerImpl();
setOtherParam();
SendResult send = producer.send(message, defaultTimeout);
assertNull(send);
Collection<Message> msgs = Collections.singletonList(message);
send = producer.send(msgs);
assertNull(send);
send = producer.send(msgs, defaultTimeout);
assertNull(send);
}

@Test
public void assertSendOneway() throws RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException {
setDefaultMQProducerImpl();
producer.sendOneway(message);
MessageQueue mq = mock(MessageQueue.class);
producer.sendOneway(message, mq);
MessageQueueSelector selector = mock(MessageQueueSelector.class);
producer.sendOneway(message, selector, 1);
}

@Test
public void assertSendByQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException {
setDefaultMQProducerImpl();
MessageQueue mq = mock(MessageQueue.class);
SendResult send = producer.send(message, mq);
assertNull(send);
send = producer.send(message, mq, defaultTimeout);
assertNull(send);
Collection<Message> msgs = Collections.singletonList(message);
send = producer.send(msgs, mq);
assertNull(send);
send = producer.send(msgs, mq, defaultTimeout);
assertNull(send);
}

@Test
public void assertSendByQueueSelector() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException {
setDefaultMQProducerImpl();
MessageQueueSelector selector = mock(MessageQueueSelector.class);
SendResult send = producer.send(message, selector, 1);
assertNull(send);
send = producer.send(message, selector, 1, defaultTimeout);
assertNull(send);
}

@Test
public void assertRequest() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException, RequestTimeoutException {
setDefaultMQProducerImpl();
MessageQueueSelector selector = mock(MessageQueueSelector.class);
Message replyNsg = producer.request(message, selector, 1, defaultTimeout);
assertNull(replyNsg);
RequestCallback requestCallback = mock(RequestCallback.class);
producer.request(message, selector, 1, requestCallback, defaultTimeout);
MessageQueue mq = mock(MessageQueue.class);
producer.request(message, mq, defaultTimeout);
producer.request(message, mq, requestCallback, defaultTimeout);
}

@Test(expected = RuntimeException.class)
public void assertSendMessageInTransaction() throws MQClientException {
TransactionSendResult result = producer.sendMessageInTransaction(message, 1);
assertNull(result);
}

@Test
public void assertSearchOffset() throws MQClientException, NoSuchFieldException, IllegalAccessException {
setDefaultMQProducerImpl();
MessageQueue mq = mock(MessageQueue.class);
long result = producer.searchOffset(mq, System.currentTimeMillis());
assertEquals(0L, result);
}

@Test
public void assertBatchMaxDelayMs() throws NoSuchFieldException, IllegalAccessException {
setProduceAccumulator(true);
assertEquals(0, producer.getBatchMaxDelayMs());
setProduceAccumulator(false);
assertEquals(10, producer.getBatchMaxDelayMs());
producer.batchMaxDelayMs(1000);
assertEquals(1000, producer.getBatchMaxDelayMs());
}

@Test
public void assertBatchMaxBytes() throws NoSuchFieldException, IllegalAccessException {
setProduceAccumulator(true);
assertEquals(0L, producer.getBatchMaxBytes());
setProduceAccumulator(false);
assertEquals(32 * 1024L, producer.getBatchMaxBytes());
producer.batchMaxBytes(64 * 1024L);
assertEquals(64 * 1024L, producer.getBatchMaxBytes());
}

@Test
public void assertTotalBatchMaxBytes() throws NoSuchFieldException, IllegalAccessException {
setProduceAccumulator(true);
assertEquals(0L, producer.getTotalBatchMaxBytes());
}

@Test
public void assertGetRetryResponseCodes() {
assertNotNull(producer.getRetryResponseCodes());
assertEquals(7, producer.getRetryResponseCodes().size());
}

@Test
public void assertIsSendLatencyFaultEnable() {
assertFalse(producer.isSendLatencyFaultEnable());
}

@Test
public void assertGetLatencyMax() {
assertNotNull(producer.getLatencyMax());
}

@Test
public void assertGetNotAvailableDuration() {
assertNotNull(producer.getNotAvailableDuration());
}

@Test
public void assertIsRetryAnotherBrokerWhenNotStoreOK() {
assertFalse(producer.isRetryAnotherBrokerWhenNotStoreOK());
}

private void setOtherParam() {
producer.setCreateTopicKey("createTopicKey");
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
producer.setDefaultTopicQueueNums(6);
producer.setRetryTimesWhenSendFailed(1);
producer.setSendMessageWithVIPChannel(false);
producer.setNotAvailableDuration(new long[1]);
producer.setLatencyMax(new long[1]);
producer.setSendLatencyFaultEnable(false);
producer.setRetryTimesWhenSendAsyncFailed(1);
producer.setTopics(Collections.singletonList(topic));
producer.setStartDetectorEnable(false);
producer.setCompressLevel(5);
producer.setCompressType(CompressionType.LZ4);
producer.addRetryResponseCode(0);
ExecutorService executorService = mock(ExecutorService.class);
producer.setAsyncSenderExecutor(executorService);
}

private void setProduceAccumulator(final boolean isDefault) throws NoSuchFieldException, IllegalAccessException {
ProduceAccumulator accumulator = null;
if (!isDefault) {
accumulator = new ProduceAccumulator("instanceName");
}
setField(producer, "produceAccumulator", accumulator);
}

private void setDefaultMQProducerImpl() throws NoSuchFieldException, IllegalAccessException {
DefaultMQProducerImpl producerImpl = mock(DefaultMQProducerImpl.class);
setField(producer, "defaultMQProducerImpl", producerImpl);
when(producerImpl.getMqFaultStrategy()).thenReturn(mock(MQFaultStrategy.class));
}

private void setField(final Object target, final String fieldName, final Object newValue) throws NoSuchFieldException, IllegalAccessException {
Class<?> clazz = target.getClass();
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, newValue);
}
}

0 comments on commit 1511809

Please sign in to comment.