From 3f9122fa4b1c1a46f24ee2cea4e45e6601e97d31 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Wed, 7 Feb 2024 21:50:53 +0800 Subject: [PATCH] fix(e2e): fix transaction timeout; merge streams tests Signed-off-by: Curtis Wan --- .github/workflows/nightly-extra-e2e.yml | 18 +++------- .../core/group_mode_transactions_test.py | 15 ++++++--- tests/suites/stream_kos_test_suite1.yml | 33 ------------------- tests/suites/stream_kos_test_suite2.yml | 23 ------------- 4 files changed, 15 insertions(+), 74 deletions(-) delete mode 100644 tests/suites/stream_kos_test_suite1.yml delete mode 100644 tests/suites/stream_kos_test_suite2.yml diff --git a/.github/workflows/nightly-extra-e2e.yml b/.github/workflows/nightly-extra-e2e.yml index 44723ec1bc..d164a723aa 100644 --- a/.github/workflows/nightly-extra-e2e.yml +++ b/.github/workflows/nightly-extra-e2e.yml @@ -21,27 +21,19 @@ jobs: test-path: "tests/kafkatest/tests/connect" storage-path: "/data/github-actions/reports" runner: "extra" - streams_e2e_1: - name: "Run streams E2E Tests 1" + streams_e2e: + name: "Run streams E2E Tests" uses: ./.github/workflows/e2e-run.yml with: - suite-id: "streams1" - test-yaml: "tests/suites/stream_kos_test_suite1.yml" - storage-path: "/data/github-actions/reports" - runner: "extra" - streams_e2e_2: - name: "Run streams E2E Tests 2" - uses: ./.github/workflows/e2e-run.yml - with: - suite-id: "streams2" - test-yaml: "tests/suites/stream_kos_test_suite2.yml" + suite-id: "streams" + test-path: "tests/kafkatest/tests/streams" storage-path: "/data/github-actions/reports" runner: "extra" e2e_summary: name: "E2E Tests Summary" runs-on: [ self-hosted, extra ] if: ${{ always() }} - needs: [ benchmarks_e2e, connect_e2e, streams_e2e_1, streams_e2e_2 ] + needs: [ benchmarks_e2e, connect_e2e, streams_e2e ] steps: - name: Report results run: python3 tests/report_e2e_results.py diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py b/tests/kafkatest/tests/core/group_mode_transactions_test.py index bb75578025..62361130af 100644 --- a/tests/kafkatest/tests/core/group_mode_transactions_test.py +++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py @@ -113,7 +113,7 @@ def bounce_brokers(self, clean_shutdown): self.kafka.await_no_under_replicated_partitions() def create_and_start_message_copier(self, input_topic, output_topic, transactional_id, - producer_block_timeout_ms): + producer_block_timeout_ms, transaction_timeout): message_copier = TransactionalMessageCopier( context=self.test_context, num_nodes=1, @@ -125,7 +125,7 @@ def create_and_start_message_copier(self, input_topic, output_topic, transaction output_topic=output_topic, max_messages=-1, transaction_size=self.transaction_size, - transaction_timeout=self.transaction_timeout, + transaction_timeout=transaction_timeout, use_group_metadata=True, group_mode=True, producer_block_timeout_ms=producer_block_timeout_ms @@ -147,14 +147,16 @@ def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240): str(copier.progress_percent()))) copier.restart(clean_shutdown) - def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms): + def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms, + transaction_timeout): copiers = [] for i in range(0, num_copiers): copiers.append(self.create_and_start_message_copier( input_topic=input_topic, output_topic=output_topic, transactional_id="copier-" + str(i), - producer_block_timeout_ms=producer_block_timeout_ms + producer_block_timeout_ms=producer_block_timeout_ms, + transaction_timeout=transaction_timeout )) return copiers @@ -231,13 +233,16 @@ def copy_messages_transactionally(self, failure_mode, bounce_target, It returns the concurrently consumed messages. """ producer_block_timeout_ms = None + transaction_timeout = self.transaction_timeout if failure_mode != "clean_bounce" and bounce_target == "brokers": # change from the default 60s to 90s to wait for broker recovery producer_block_timeout_ms = 90000 + transaction_timeout = 90000 copiers = self.create_and_start_copiers(input_topic=input_topic, output_topic=output_topic, num_copiers=num_copiers, - producer_block_timeout_ms=producer_block_timeout_ms) + producer_block_timeout_ms=producer_block_timeout_ms, + transaction_timeout=transaction_timeout) concurrent_consumer = self.start_consumer(output_topic, group_id="concurrent_consumer") clean_shutdown = False diff --git a/tests/suites/stream_kos_test_suite1.yml b/tests/suites/stream_kos_test_suite1.yml deleted file mode 100644 index 50be917304..0000000000 --- a/tests/suites/stream_kos_test_suite1.yml +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# list all streams tests that are part of the suite under the test suite name: -streams_test_suite: - included: - - ../kafkatest/tests/streams/base_streams_test.py - - ../kafkatest/tests/streams/streams_application_upgrade_test.py - - ../kafkatest/tests/streams/streams_broker_bounce_test.py - - ../kafkatest/tests/streams/streams_broker_compatibility_test.py - - ../kafkatest/tests/streams/streams_broker_down_resilience_test.py - - ../kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py - - ../kafkatest/tests/streams/streams_eos_test.py - - ../kafkatest/tests/streams/streams_named_repartition_topic_test.py - - ../kafkatest/tests/streams/streams_optimized_test.py - - ../kafkatest/tests/streams/streams_relational_smoke_test.py - - ../kafkatest/tests/streams/streams_shutdown_deadlock_test.py - - ../kafkatest/tests/streams/streams_smoke_test.py - - - diff --git a/tests/suites/stream_kos_test_suite2.yml b/tests/suites/stream_kos_test_suite2.yml deleted file mode 100644 index 8b7e32534e..0000000000 --- a/tests/suites/stream_kos_test_suite2.yml +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# list all streams tests that are part of the suite under the test suite name: -streams_test_suite: - included: - - ../kafkatest/tests/streams/streams_standby_replica_test.py - - ../kafkatest/tests/streams/streams_static_membership_test.py - - ../kafkatest/tests/streams/streams_upgrade_test.py - -