Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions .github/workflows/nightly-extra-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,19 @@ jobs:
test-path: "tests/kafkatest/tests/connect"
storage-path: "/data/github-actions/reports"
runner: "extra"
streams_e2e_1:
name: "Run streams E2E Tests 1"
streams_e2e:
name: "Run streams E2E Tests"
uses: ./.github/workflows/e2e-run.yml
with:
suite-id: "streams1"
test-yaml: "tests/suites/stream_kos_test_suite1.yml"
storage-path: "/data/github-actions/reports"
runner: "extra"
streams_e2e_2:
name: "Run streams E2E Tests 2"
uses: ./.github/workflows/e2e-run.yml
with:
suite-id: "streams2"
test-yaml: "tests/suites/stream_kos_test_suite2.yml"
suite-id: "streams"
test-path: "tests/kafkatest/tests/streams"
storage-path: "/data/github-actions/reports"
runner: "extra"
e2e_summary:
name: "E2E Tests Summary"
runs-on: [ self-hosted, extra ]
if: ${{ always() }}
needs: [ benchmarks_e2e, connect_e2e, streams_e2e_1, streams_e2e_2 ]
needs: [ benchmarks_e2e, connect_e2e, streams_e2e ]
steps:
- name: Report results
run: python3 tests/report_e2e_results.py
Expand Down
15 changes: 10 additions & 5 deletions tests/kafkatest/tests/core/group_mode_transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def bounce_brokers(self, clean_shutdown):
self.kafka.await_no_under_replicated_partitions()

def create_and_start_message_copier(self, input_topic, output_topic, transactional_id,
producer_block_timeout_ms):
producer_block_timeout_ms, transaction_timeout):
message_copier = TransactionalMessageCopier(
context=self.test_context,
num_nodes=1,
Expand All @@ -125,7 +125,7 @@ def create_and_start_message_copier(self, input_topic, output_topic, transaction
output_topic=output_topic,
max_messages=-1,
transaction_size=self.transaction_size,
transaction_timeout=self.transaction_timeout,
transaction_timeout=transaction_timeout,
use_group_metadata=True,
group_mode=True,
producer_block_timeout_ms=producer_block_timeout_ms
Expand All @@ -147,14 +147,16 @@ def bounce_copiers(self, copiers, clean_shutdown, timeout_sec=240):
str(copier.progress_percent())))
copier.restart(clean_shutdown)

def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms):
def create_and_start_copiers(self, input_topic, output_topic, num_copiers, producer_block_timeout_ms,
transaction_timeout):
copiers = []
for i in range(0, num_copiers):
copiers.append(self.create_and_start_message_copier(
input_topic=input_topic,
output_topic=output_topic,
transactional_id="copier-" + str(i),
producer_block_timeout_ms=producer_block_timeout_ms
producer_block_timeout_ms=producer_block_timeout_ms,
transaction_timeout=transaction_timeout
))
return copiers

Expand Down Expand Up @@ -231,13 +233,16 @@ def copy_messages_transactionally(self, failure_mode, bounce_target,
It returns the concurrently consumed messages.
"""
producer_block_timeout_ms = None
transaction_timeout = self.transaction_timeout
if failure_mode != "clean_bounce" and bounce_target == "brokers":
# change from the default 60s to 90s to wait for broker recovery
producer_block_timeout_ms = 90000
transaction_timeout = 90000
copiers = self.create_and_start_copiers(input_topic=input_topic,
output_topic=output_topic,
num_copiers=num_copiers,
producer_block_timeout_ms=producer_block_timeout_ms)
producer_block_timeout_ms=producer_block_timeout_ms,
transaction_timeout=transaction_timeout)
concurrent_consumer = self.start_consumer(output_topic,
group_id="concurrent_consumer")
clean_shutdown = False
Expand Down
33 changes: 0 additions & 33 deletions tests/suites/stream_kos_test_suite1.yml

This file was deleted.

23 changes: 0 additions & 23 deletions tests/suites/stream_kos_test_suite2.yml

This file was deleted.