Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3539: KafkaProducer.send() should not block #3478

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -699,12 +701,25 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
return asyncSend(interceptedRecord, callback);
}

/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> asyncSend(final ProducerRecord<K, V> record, final Callback callback) {
return new FutureTask<>(new Callable<RecordMetadata>() {
@Override
public RecordMetadata call() throws Exception {
return doSend(record, callback).get();
}
});
}

/**
* Send a record to a topic. Can potentially block for maxBlockTimeMs milliseconds due to
* {@link #waitOnMetadata(String, Integer, long)} blocking call.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.MockPartitioner;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -473,4 +474,47 @@ public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {

}

@PrepareOnlyThisForTest(Metadata.class)
@Test
public void testSendShouldNotBlock() throws Exception {
final Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
final KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
final Metadata metadata = PowerMock.createNiceMock(Metadata.class);
MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata);

final String topic = "topic";
final ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
final Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
final Cluster emptyCluster = new Cluster(null, nodes,
Collections.<PartitionInfo>emptySet(),
Collections.<String>emptySet(),
Collections.<String>emptySet());
final Cluster cluster = new Cluster(
"dummy",
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
Collections.<String>emptySet(),
Collections.<String>emptySet());

EasyMock.expect(metadata.fetch()).andReturn(emptyCluster);
EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
metadata.awaitUpdate(EasyMock.anyInt(), EasyMock.anyLong());
final long awaitLatencyMillis = 3000;
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
@Override
public Void answer() throws Throwable {
Thread.sleep(awaitLatencyMillis);
return null;
}
});
PowerMock.replay(metadata);
final long begin = System.nanoTime();
producer.send(record);
final long end = System.nanoTime();
Assert.assertTrue("KafkaProducer.send() should not wait for Metadata.awaitUpdate()",
end - begin < awaitLatencyMillis * 1000000);
PowerMock.verify(metadata);
}

}