New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9388: Refactor integration tests to always use different application ids #8530
KAFKA-9388: Refactor integration tests to always use different application ids #8530
Conversation
@@ -119,7 +124,7 @@ public void shouldRestoreStateFromSourceTopic() throws Exception { | |||
final AtomicInteger numReceived = new AtomicInteger(0); | |||
final StreamsBuilder builder = new StreamsBuilder(); | |||
|
|||
final Properties props = props(APPID); | |||
final Properties props = props(APPID + name.getMethodName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the following app id should be changed as well. (I can't add comment to the line for this PR)
private void setCommittedOffset(final String topic, final int limitDelta) {
final Properties consumerConfig = new Properties(); final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
@@ -107,6 +109,9 @@ private Properties props(final String applicationId) { | |||
return streamsConfiguration; | |||
} | |||
|
|||
@Rule | |||
public TestName name = new TestName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the changelog topics are created by BeforeClass
so it seems we need to add Before
to create changelog for different method name.
import static org.hamcrest.MatcherAssert.assertThat; | ||
|
||
|
||
public class KTableKTableForeignKeyJoinPseudoTopicTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this test into the non-integration unit tests.
@@ -14,7 +14,7 @@ | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package org.apache.kafka.streams.integration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I merged this test with another as a non-integration test, since it uses TTD and does not really creates a cluster :)
@@ -80,14 +83,18 @@ public static void setUpBeforeAllTests() throws Exception { | |||
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); | |||
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5); | |||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); | |||
STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this test, we did need to reuse the created topics and hence I reduced the session / heartbeat timeout so that their rebalance timeout could be much smaller.
@@ -104,6 +104,8 @@ private Properties props(final String applicationId) { | |||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); | |||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); | |||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |||
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here, for this test I reduced the session / heartbeat timeout so that their rebalance timeout could be much smaller. I think it is simpler than changing a bunch of changelogs / source / sink / and app ids.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @guozhangwang , I did a quick pass, and this looks awesome to me. That TestName thing is pretty neat.
The Jenkins failures are due to known flaky tests, I'm going to merge the PR as is. |
When debugging KAFKA-9388, I found the reason that the second test method test takes much longer (10s) than the previous one (~500ms) is because they used the same app.id. When the previous clients are shutdown, they would not send leave-group and hence we are still depending on the session timeout (10s) for the members to be removed out of the group.
When the second test is triggered, they will join the same group because of the same application id, and the
prepare-rebalance
phase would would for the full rebalance timeout before it kicks out the previous members.Setting different application ids could resolve such issues for integration tests --- I did a quick search and found some other integration tests have the same issue. And after this PR my local unit test runtime reduced from about 14min to 7min.
Committer Checklist (excluded from commit message)