Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ public void start(final boolean startFactory) throws MQClientException {

this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

defaultMQProducer.initProduceAccumulator();

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int backPressureForAsyncSendSize = 100 * 1024 * 1024;

/**
* Maximum hold time of accumulator.
*/
private int batchMaxDelayMs = -1;

/**
* Maximum accumulation message body size for a single messageAccumulation.
*/
private long batchMaxBytes = -1;

/**
* Maximum message body size for produceAccumulator.
*/
private long totalBatchMaxBytes = -1;

private RPCHook rpcHook = null;

/**
Expand Down Expand Up @@ -293,7 +308,6 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, final List
this.enableTrace = enableMsgTrace;
this.traceTopic = customizedTraceTopic;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
Expand All @@ -320,7 +334,6 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
this.producerGroup = producerGroup;
this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}

/**
Expand Down Expand Up @@ -1168,10 +1181,10 @@ public int getBatchMaxDelayMs() {
}

public void batchMaxDelayMs(int holdMs) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
this.batchMaxDelayMs = holdMs;
if (this.produceAccumulator != null) {
this.produceAccumulator.batchMaxDelayMs(holdMs);
}
this.produceAccumulator.batchMaxDelayMs(holdMs);
}

public long getBatchMaxBytes() {
Expand All @@ -1182,10 +1195,10 @@ public long getBatchMaxBytes() {
}

public void batchMaxBytes(long holdSize) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
this.batchMaxBytes = holdSize;
if (this.produceAccumulator != null) {
this.produceAccumulator.batchMaxBytes(holdSize);
}
this.produceAccumulator.batchMaxBytes(holdSize);
}

public long getTotalBatchMaxBytes() {
Expand All @@ -1196,10 +1209,10 @@ public long getTotalBatchMaxBytes() {
}

public void totalBatchMaxBytes(long totalHoldSize) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
this.totalBatchMaxBytes = totalHoldSize;
if (this.produceAccumulator != null) {
this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
}
this.produceAccumulator.totalBatchMaxBytes(totalHoldSize);
}

public boolean getAutoBatch() {
Expand All @@ -1210,9 +1223,6 @@ public boolean getAutoBatch() {
}

public void setAutoBatch(boolean autoBatch) {
if (this.produceAccumulator == null) {
throw new UnsupportedOperationException("The currently constructed producer does not support autoBatch");
}
this.autoBatch = autoBatch;
}

Expand Down Expand Up @@ -1439,4 +1449,21 @@ public void setCompressType(CompressionType compressType) {
public Compressor getCompressor() {
return compressor;
}

public void initProduceAccumulator() {
this.produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);

if (this.batchMaxDelayMs > -1) {
this.produceAccumulator.batchMaxDelayMs(this.batchMaxDelayMs);
}

if (this.batchMaxBytes > -1) {
this.produceAccumulator.batchMaxBytes(this.batchMaxBytes);
}

if (this.totalBatchMaxBytes > -1) {
this.produceAccumulator.totalBatchMaxBytes(this.totalBatchMaxBytes);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -659,29 +660,29 @@ public void assertCreateDefaultMQProducer() {
assertNotNull(producer1);
assertEquals(producerGroupTemp, producer1.getProducerGroup());
assertNotNull(producer1.getDefaultMQProducerImpl());
assertTrue(producer1.getTotalBatchMaxBytes() > 0);
assertTrue(producer1.getBatchMaxBytes() > 0);
assertTrue(producer1.getBatchMaxDelayMs() > 0);
assertEquals(0, producer1.getTotalBatchMaxBytes());
assertEquals(0, producer1.getBatchMaxBytes());
assertEquals(0, producer1.getBatchMaxDelayMs());
assertNull(producer1.getTopics());
assertFalse(producer1.isEnableTrace());
assertTrue(UtilAll.isBlank(producer1.getTraceTopic()));
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class));
assertNotNull(producer2);
assertEquals(producerGroupTemp, producer2.getProducerGroup());
assertNotNull(producer2.getDefaultMQProducerImpl());
assertTrue(producer2.getTotalBatchMaxBytes() > 0);
assertTrue(producer2.getBatchMaxBytes() > 0);
assertTrue(producer2.getBatchMaxDelayMs() > 0);
assertEquals(0, producer2.getTotalBatchMaxBytes());
assertEquals(0, producer2.getBatchMaxBytes());
assertEquals(0, producer2.getBatchMaxDelayMs());
assertNull(producer2.getTopics());
assertFalse(producer2.isEnableTrace());
assertTrue(UtilAll.isBlank(producer2.getTraceTopic()));
DefaultMQProducer producer3 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"));
assertNotNull(producer3);
assertEquals(producerGroupTemp, producer3.getProducerGroup());
assertNotNull(producer3.getDefaultMQProducerImpl());
assertTrue(producer3.getTotalBatchMaxBytes() > 0);
assertTrue(producer3.getBatchMaxBytes() > 0);
assertTrue(producer3.getBatchMaxDelayMs() > 0);
assertEquals(0, producer3.getTotalBatchMaxBytes());
assertEquals(0, producer3.getBatchMaxBytes());
assertEquals(0, producer3.getBatchMaxDelayMs());
assertNotNull(producer3.getTopics());
assertEquals(1, producer3.getTopics().size());
assertFalse(producer3.isEnableTrace());
Expand All @@ -690,19 +691,19 @@ public void assertCreateDefaultMQProducer() {
assertNotNull(producer4);
assertEquals(producerGroupTemp, producer4.getProducerGroup());
assertNotNull(producer4.getDefaultMQProducerImpl());
assertTrue(producer4.getTotalBatchMaxBytes() > 0);
assertTrue(producer4.getBatchMaxBytes() > 0);
assertTrue(producer4.getBatchMaxDelayMs() > 0);
assertEquals(0, producer4.getTotalBatchMaxBytes());
assertEquals(0, producer4.getBatchMaxBytes());
assertEquals(0, producer4.getBatchMaxDelayMs());
assertNull(producer4.getTopics());
assertTrue(producer4.isEnableTrace());
assertEquals("custom_trace_topic", producer4.getTraceTopic());
DefaultMQProducer producer5 = new DefaultMQProducer(producerGroupTemp, mock(RPCHook.class), Collections.singletonList("custom_topic"), true, "custom_trace_topic");
assertNotNull(producer5);
assertEquals(producerGroupTemp, producer5.getProducerGroup());
assertNotNull(producer5.getDefaultMQProducerImpl());
assertTrue(producer5.getTotalBatchMaxBytes() > 0);
assertTrue(producer5.getBatchMaxBytes() > 0);
assertTrue(producer5.getBatchMaxDelayMs() > 0);
assertEquals(0, producer5.getTotalBatchMaxBytes());
assertEquals(0, producer5.getBatchMaxBytes());
assertEquals(0, producer5.getBatchMaxDelayMs());
assertNotNull(producer5.getTopics());
assertEquals(1, producer5.getTopics().size());
assertTrue(producer5.isEnableTrace());
Expand Down Expand Up @@ -810,6 +811,136 @@ public void assertTotalBatchMaxBytes() throws NoSuchFieldException, IllegalAcces
assertEquals(0L, producer.getTotalBatchMaxBytes());
}

@Test
public void assertProduceAccumulatorStart() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
assertEquals(0, producer.getTotalBatchMaxBytes());
assertEquals(0, producer.getBatchMaxBytes());
assertEquals(0, producer.getBatchMaxDelayMs());
assertNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));
producer.start();
assertTrue(producer.getTotalBatchMaxBytes() > 0);
assertTrue(producer.getBatchMaxBytes() > 0);
assertTrue(producer.getBatchMaxDelayMs() > 0);
assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));
}

@Test
public void assertProduceAccumulatorBeforeStartSet() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
producer.totalBatchMaxBytes(64 * 1024 * 100);
producer.batchMaxBytes(64 * 1024);
producer.batchMaxDelayMs(10);

producer.start();
assertEquals(64 * 1024, producer.getBatchMaxBytes());
assertEquals(10, producer.getBatchMaxDelayMs());
assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));
}

@Test
public void assertProduceAccumulatorAfterStartSet() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp);
producer.start();

assertNotNull(getField(producer, "produceAccumulator", ProduceAccumulator.class));

producer.totalBatchMaxBytes(64 * 1024 * 100);
producer.batchMaxBytes(64 * 1024);
producer.batchMaxDelayMs(10);

assertEquals(64 * 1024, producer.getBatchMaxBytes());
assertEquals(10, producer.getBatchMaxDelayMs());
}

@Test
public void assertProduceAccumulatorUnit() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp);
producer1.setUnitName("unit1");
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp);
producer2.setUnitName("unit2");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertNotEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertProduceAccumulator() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1);
producer1.setInstanceName("instanceName1");
String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2);
producer2.setInstanceName("instanceName2");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertNotEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertProduceAccumulatorInstanceEqual() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1);
producer1.setInstanceName("equalInstance");
String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2);
producer2.setInstanceName("equalInstance");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertProduceAccumulatorInstanceAndUnitNameEqual() throws NoSuchFieldException, IllegalAccessException, MQClientException {
String producerGroupTemp1 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer1 = new DefaultMQProducer(producerGroupTemp1);
producer1.setInstanceName("equalInstance");
producer1.setUnitName("equalUnitName");
String producerGroupTemp2 = producerGroupPrefix + System.nanoTime();
DefaultMQProducer producer2 = new DefaultMQProducer(producerGroupTemp2);
producer2.setInstanceName("equalInstance");
producer2.setUnitName("equalUnitName");

producer1.start();
producer2.start();

ProduceAccumulator producer1Accumulator = getField(producer1, "produceAccumulator", ProduceAccumulator.class);
ProduceAccumulator producer2Accumulator = getField(producer2, "produceAccumulator", ProduceAccumulator.class);

assertNotNull(producer1Accumulator);
assertNotNull(producer2Accumulator);

assertEquals(producer1Accumulator, producer2Accumulator);
}

@Test
public void assertGetRetryResponseCodes() {
assertNotNull(producer.getRetryResponseCodes());
Expand Down Expand Up @@ -875,4 +1006,11 @@ private void setField(final Object target, final String fieldName, final Object
field.setAccessible(true);
field.set(target, newValue);
}

private <T> T getField(final Object target, final String fieldName, final Class<T> fieldClassType) throws NoSuchFieldException, IllegalAccessException {
Class<?> targetClazz = target.getClass();
Field field = targetClazz.getDeclaredField(fieldName);
field.setAccessible(true);
return fieldClassType.cast(field.get(target));
}
}