From 158318b8937f5a59727f186f19eb3bf7d2adc20e Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 10 Dec 2024 14:25:12 -0500 Subject: [PATCH 1/3] slice out error logs in integration test --- .../test_consumer_rebalancing.py | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index 78cb82f8..fcd7f73b 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -203,10 +203,17 @@ def test_tasks_written_once_during_rebalancing() -> None: ) consumers_have_data = consumers_have_data and res >= max_pending_count // 3 - consumers_have_error = False + consumer_error_logs = [] for i in range(num_consumers): with open(str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r") as f: - consumers_have_error = consumers_have_error or "[31mERROR" in f.read() + lines = f.readlines() + for log_line_index, line in enumerate(lines): + if "[31mERROR" in line: + # If there is an error in log file, capture 10 lines before and after the error line + consumer_error_logs.append(f"Error found in consumer_{i}. Logging 10 lines before and after the error line:") + for j in range(max(0, log_line_index - 10), min(len(lines) - 1, log_line_index + 10)): + consumer_error_logs.append(lines[j].strip()) + consumer_error_logs.append("") if not all([row[3] == 0 for row in row_count]): print("Test failed! Got duplicate/missing kafka messages in sqlite") @@ -214,30 +221,15 @@ def test_tasks_written_once_during_rebalancing() -> None: if not consumers_have_data: print("Test failed! Lower than expected amount of kafka messages in sqlite") - if consumers_have_error: + if consumer_error_logs: print("Test failed! Errors in consumer logs") - - if ( - not all([row[3] == 0 for row in row_count]) - or not consumers_have_data - or consumers_have_error - ): - print() - print("Dumping logs") - print() - for i in range(num_consumers): - print(f"=== consumer {i} log ===") - with open( - str(TESTS_OUTPUT_PATH / f"consumer_{i}_{curr_time}.log"), "r" - ) as f: - print(f.read()) + for log in consumer_error_logs: + print(log) # Clean up test output files print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") shutil.rmtree(TESTS_OUTPUT_PATH) - assert ( - all([row[3] == 0 for row in row_count]) - and consumers_have_data - and not consumers_have_error - ) + assert all([row[3] == 0 for row in row_count]) + assert consumers_have_data + assert not consumer_error_logs From a3f3a4c5d52070d4d972ec3899f02727218e7e9b Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Tue, 10 Dec 2024 14:33:02 -0500 Subject: [PATCH 2/3] clean up --- python/integration_tests/test_consumer_rebalancing.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index fcd7f73b..e791adb4 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -92,7 +92,7 @@ def test_tasks_written_once_during_rebalancing() -> None: update_topic_partitions(topic_name, num_partitions) # Create config files for consumers - print("Creating config files for consumers") + print("\nCreating config files for consumers") TESTS_OUTPUT_PATH.mkdir(exist_ok=True) consumer_configs = {} for i in range(num_consumers): @@ -216,13 +216,13 @@ def test_tasks_written_once_during_rebalancing() -> None: consumer_error_logs.append("") if not all([row[3] == 0 for row in row_count]): - print("Test failed! Got duplicate/missing kafka messages in sqlite") + print("\nTest failed! Got duplicate/missing kafka messages in sqlite") if not consumers_have_data: - print("Test failed! Lower than expected amount of kafka messages in sqlite") + print("\nTest failed! Lower than expected amount of kafka messages in sqlite") if consumer_error_logs: - print("Test failed! Errors in consumer logs") + print("\nTest failed! Errors in consumer logs") for log in consumer_error_logs: print(log) From f8577eb0971da63480d5d2b20def41f6e518dcde Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 12 Dec 2024 10:46:58 -0500 Subject: [PATCH 3/3] do not clean logs --- python/integration_tests/test_consumer_rebalancing.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/integration_tests/test_consumer_rebalancing.py b/python/integration_tests/test_consumer_rebalancing.py index e791adb4..0a7a95cb 100644 --- a/python/integration_tests/test_consumer_rebalancing.py +++ b/python/integration_tests/test_consumer_rebalancing.py @@ -226,10 +226,6 @@ def test_tasks_written_once_during_rebalancing() -> None: for log in consumer_error_logs: print(log) - # Clean up test output files - print(f"Cleaning up test output files in {TESTS_OUTPUT_PATH}") - shutil.rmtree(TESTS_OUTPUT_PATH) - assert all([row[3] == 0 for row in row_count]) assert consumers_have_data assert not consumer_error_logs