Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16272: Adding new coordinator related changes for connect_distributed.py #15594

Merged
merged 3 commits into from Apr 19, 2024

Conversation

vamossagar12
Copy link
Collaborator

@vamossagar12 vamossagar12 commented Mar 25, 2024

Summary of the changes:

  1. Parameterizes the tests to use new coordinator and pass in consumer group protocol. This would be applicable to sink connectors only.
  2. Enhances the sink connector creation code in system tests to accept a new optional parameter for consumer group protocol to be used.
  3. Sets the consumer group protocol via consumer.override. override config when the new group coordinator is enabled.

Note about testing: There are 288 tests that need to be run and running on my local takes a lot of time. I will try to post the test results once I have a full run.

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @vamossagar12!

This looks good to me. In terms of running the tests, does the figure of 288 tests include the test permutations that aren't affected by this change? It is possible to create a test suite file that lists out the parameterized permutations to test.

Thanks!

Copy link
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @vamossagar12!

@kirktrue
Copy link
Contributor

@lucasbru—can you take a look at this system test change? Thanks!

@vamossagar12
Copy link
Collaborator Author

vamossagar12 commented Mar 31, 2024

Thanks @kirktrue . I ran a single test by passing a parameter

 TC_PATHS="tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector" _DUCKTAPE_OPTIONS='--parameters '\''{"exactly_once_source":"False","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":"True","group_protocol":"consumer"}'\' bash tests/docker/run_tests.sh | tee debug_logs.txt

The test suite file approach doesn't seem to be working when passing json parameters as suggested here:

    - './another_tests_dir/test_file.py::TestClass.parametrized_method@{"x": 100}'  # params are supported too

as the tests aren't getting identified.

@lucasbru
Copy link
Member

@vamossagar12 did the test you ran pass?

Here is an example how I run parameterized tests using a test suite file:

consumer_test:
- tests/kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}

The change looks fine to me. If you want to be sure that the test set up works, you may want to run the parameter combinations and post the results here. However, if you have tested one parameter combination successfully, and you are confident that the general test setup is working, I am fine with merging it like this (please confirm).

@vamossagar12
Copy link
Collaborator Author

@lucasbru , that test did pass. However, let me try again with the snippet you shared above and see if it works. Let me get back to you.

@vamossagar12
Copy link
Collaborator Author

hey @lucasbru , I ran the following test suite

my_test_suite:
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_connector@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_pause_and_resume_sink@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_file_source_and_sink@{"security_protocol":"PLAINTEXT","exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_bounce@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_exactly_once_source@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
  - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_transformations@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}

and here are the results:

================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.11.4
session_id:       2024-04-18--001
run time:         18 minutes 5.491 seconds
tests run:        8
passed:           7
flaky:            0
failed:           1
ignored:          0
================================================================================
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   6 minutes 7.875 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     FAIL
run time:   7 minutes 59.232 seconds


    InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0')
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run
    data = self.run_test()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source
    consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__
    BackgroundThreadService.__init__(self, context, num_nodes)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__
    super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__
    self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes
    self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc
    allocated = self.do_alloc(cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc
    good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec
    raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0

--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_file_source_and_sink.security_protocol=PLAINTEXT.exactly_once_source=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   50.770 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_transformations.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   50.424 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_restart_connector_and_tasks_failed_connector.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   31.785 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_restart_connector_and_tasks_failed_task.connector_type=sink.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   34.508 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_restart_failed_connector.exactly_once_source=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   32.580 seconds
--------------------------------------------------------------------------------
test_id:    kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_restart_failed_task.connector_type=sink.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status:     PASS
run time:   37.332 seconds
--------------------------------------------------------------------------------
================================================================================
FAILED TEST SUITE
Test suite to rerun failed tests: /opt/kafka-dev/results/2024-04-18--001/rerun-failed.yml
================================================================================
FAILED TEST SYMBOLS
Pass the test symbols below to your ducktape run
'tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_exactly_once_source@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}'
ducker-ak test failed

The failing test test_exactly_once_source is already reported in KAFKA-16578 and I have removed the changes for new group protocol from this PR. Also, I ran a sink connector by setting the config consumer.override.group.protocol as consumer and I see AsyncConsumer being loaded while a connector without the above config, loads the LegacyKafkaConsumer.

The other thing I noticed is that the kafka consumers created in the workers don't adhere to the above group.protocol setting but that has got nothing to do with this PR and I will fix it separatelty. Let me know what you think.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lucasbru lucasbru merged commit f22ad66 into apache:trunk Apr 19, 2024
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants