New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4318 Migrate ProducerSendTest to the new consumer #2083
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, suggested a few changes.
val overridingProps = new Properties() | ||
val numServers = 2 | ||
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) | ||
TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), | ||
trustStoreFile = trustStoreFile, saslProperties = saslProperties).map(KafkaConfig.fromProps(_, overridingProps)) | ||
} | ||
|
||
private var consumer1: SimpleConsumer = null | ||
private var consumer2: SimpleConsumer = null | ||
private var consumer1: KafkaConsumer[Array[Byte], Array[Byte]] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can name this simply consumer
.
} | ||
if (iters > maxIters) | ||
throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") | ||
iters += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more concise and better way to do this is:
TestUtils.waitUntilTrue(() => {
for (record <- consumer.poll(50).asScala)
records.add(record)
records.size == numRecords
}, "Failed to receive all expected records from the consumer")
Probably worth creating a helper method and reusing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks I created a helper method for this.
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() | ||
val maxIters = 50 | ||
var iters = 0 | ||
consumer1.seek(new TopicPartition(topic, 0),0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space missing after ,
. Do we need the seek
here given that we are seeing to offset 0?
@baluchicken can you address the above comments. |
6376d7e
to
bff9eec
Compare
Refer to this link for build results (access rights to CI server needed): |
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() | ||
val maxIters = 50 | ||
var iters = 0 | ||
consumer1.seek(new TopicPartition(topic, 0),0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma thanks for the reply. I rechecked the code and the seek here was required, because without that, the poll consumes only 100 records (ignores the already consumed ones), so without that seek, the assert on line 472 failed. This assert is from the original code where we used the old consumer API. I think checking 100 records on every iteration does the same thing, so I rebased that part of the code too, with that I can also remove the seek.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@baluchicken, thanks for the explanation. I think the seek is fine then. The error message is a bit more informative if we consume all of the data instead of just what was produced in the last iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To close the loop on this one, using seek
makes the test a lot slower, so the approach in the PR where we just consume the records that have been produced since the last iteration is better.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. Left a few more comments.
val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer | ||
assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) | ||
val records = pollAllExpectedRecord(numRecords) | ||
assertEquals("Should have fetched " + numRecords + " messages", numRecords, records.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed since it will always be true. pollAllExpectedRecord
ensures that.
@@ -83,6 +79,16 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { | |||
producer | |||
} | |||
|
|||
protected def pollAllExpectedRecord(numRecords: Int) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be private. Also, it's good to specify the return type in the method signature. Finally, maybe the method name should be pollUntilNumRecords
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() | ||
TestUtils.waitUntilTrue(() => { | ||
for( record <- consumer.poll(50).asScala) | ||
records.add(record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two lines can be replaced by records ++= consumer.poll(50).asScala
if we use ArrayBuffer
as per my other comment.
for( record <- consumer.poll(50).asScala) | ||
records.add(record) | ||
records.size == numRecords | ||
}, "Failed to receive all expected records from the consumer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can tweak this message to be: s"Consumed ${records.size} records before timeout, but expected $numRecords records.
assertEquals(i.toLong, messageSet1(i).offset) | ||
assertEquals(topic, records(i).topic()) | ||
assertEquals(partition, records(i).partition) | ||
assertEquals(i.toLong, records(i).offset) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit more idiomatic:
records.zipWithIndex.foreach { (record, i) =>
assertEquals(topic, record.topic)
assertEquals(partition, record.partition)
assertEquals(i.toLong, record.offset)
]
import java.util.concurrent.TimeUnit | ||
|
||
import collection.JavaConverters._ | ||
import collection.JavaConversions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaConversions
is deprecated in Scala 2.12 and we should not use it.
@@ -83,6 +79,16 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { | |||
producer | |||
} | |||
|
|||
protected def pollAllExpectedRecord(numRecords: Int) = { | |||
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we are using ArrayList
instead of ArrayBuffer
?
@@ -276,20 +282,16 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { | |||
assertEquals(partition, recordMetadata.partition) | |||
} | |||
|
|||
val leader1 = leaders(partition) | |||
consumer.subscribe(List(topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we are using subscribe
instead of assign
? The latter is closer to the simple consumer.
consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build()) | ||
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() | ||
for (record <- consumer.poll(50)) { | ||
records.add(record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do this instead of just using the result of consumer.poll(50)
?
expectedNumRecords, fetchResponse.messageSet(topic, 0).size) | ||
val records = pollAllExpectedRecord(numRecords) | ||
assertEquals("Fetch response to partition 0 should have %d messages.".format(numRecords), | ||
numRecords, records.size()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert is not needed since it's already ensured by the previous line.
bff9eec
to
dfe5802
Compare
@ijuma thanks for the review, I updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM. Will merge to trunk with a couple of trivial tweaks.
Thanks, and also thanks for your patience, I learned a lot from your comments. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
This PR only changes a test and that test passed. The builds that failed are unrelated to this change. |
Author: Balint Molnar <balintmolnar91@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes apache#2083 from baluchicken/KAFKA-4318
No description provided.