Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 21 additions & 33 deletions python/integration_tests/test_consumer_rebalancing.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_tasks_written_once_during_rebalancing() -> None:
update_topic_partitions(topic_name, num_partitions)

# Create config files for consumers
print("Creating config files for consumers")
print("\nCreating config files for consumers")
TESTS_OUTPUT_PATH.mkdir(exist_ok=True)
consumer_configs = {}
for i in range(num_consumers):
Expand Down Expand Up @@ -203,41 +203,29 @@ def test_tasks_written_once_during_rebalancing() -> None:
)
consumers_have_data = consumers_have_data and res >= max_pending_count // 3

consumers_have_error = False
consumer_error_logs = []
for i in range(num_consumers):
with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f:
consumers_have_error = consumers_have_error or "[31mERROR" in f.read()
lines = f.readlines()
for log_line_index, line in enumerate(lines):
if "[31mERROR" in line:
# If there is an error in log file, capture 10 lines before and after the error line
consumer_error_logs.append(f"Error found in consumer_{i}. Logging 10 lines before and after the error line:")
for j in range(max(0, log_line_index - 10), min(len(lines) - 1, log_line_index + 10)):
consumer_error_logs.append(lines[j].strip())
consumer_error_logs.append("")

if not all([row[3] == 0 for row in row_count]):
print("Test failed! Got duplicate/missing kafka messages in sqlite")
print("\nTest failed! Got duplicate/missing kafka messages in sqlite")

if not consumers_have_data:
print("Test failed! Lower than expected amount of kafka messages in sqlite")

if consumers_have_error:
print("Test failed! Errors in consumer logs")

if (
not all([row[3] == 0 for row in row_count])
or not consumers_have_data
or consumers_have_error
):
print()
print("Dumping logs")
print()
for i in range(num_consumers):
print(f"=== consumer {i} log ===")
with open(
str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r"
) as f:
print(f.read())

# Clean up test output files
print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}")
shutil.rmtree(TESTS_OUTPUT_PATH)

assert (
all([row[3] == 0 for row in row_count])
and consumers_have_data
and not consumers_have_error
)
print("\nTest failed! Lower than expected amount of kafka messages in sqlite")

if consumer_error_logs:
print("\nTest failed! Errors in consumer logs")
for log in consumer_error_logs:
print(log)

assert all([row[3] == 0 for row in row_count])
assert consumers_have_data
assert not consumer_error_logs
Loading