Skip to content

Commit

Permalink
KAFKA-16440: Update security_test.py to support KIP-848’s group proto…
Browse files Browse the repository at this point in the history
…col config (#15628)

Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Walker Carlson <wcarlson@apache.org>
  • Loading branch information
kirktrue committed Apr 3, 2024
1 parent 6569a35 commit 6bb9cac
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions tests/kafkatest/tests/core/security_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError

from kafkatest.services.kafka import quorum
from kafkatest.services.kafka import quorum, consumer_group
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.security.security_config import SslStores
from kafkatest.tests.end_to_end import EndToEndTest
Expand Down Expand Up @@ -61,28 +61,30 @@ def producer_consumer_have_expected_error(self, error):
@matrix(
security_protocol=['PLAINTEXT'],
interbroker_security_protocol=['SSL'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
security_protocol=['PLAINTEXT'],
interbroker_security_protocol=['SSL'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
@matrix(
security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'],
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk, use_new_coordinator=False):
def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbroker_security_protocol, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
"""
Test that invalid hostname in certificate results in connection failures.
When security_protocol=SSL, client SSL handshakes are expected to fail due to hostname verification failure.
Expand Down Expand Up @@ -120,11 +122,11 @@ def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbr
# the inter-broker security protocol using TLS with a hostname verification failure
# doesn't impact a producer in case of a single broker with a KRaft Controller,
# so confirm that this is in fact the observed behavior
self.create_and_start_clients(log_level="INFO")
self.create_and_start_clients(log_level="INFO", group_protocol=group_protocol)
self.run_validation()
else:
# We need more verbose logging to catch the expected errors
self.create_and_start_clients(log_level="DEBUG")
self.create_and_start_clients(log_level="DEBUG", group_protocol=group_protocol)

try:
wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30)
Expand All @@ -143,26 +145,27 @@ def test_client_ssl_endpoint_validation_failure(self, security_protocol, interbr

SecurityConfig.ssl_stores.valid_hostname = True
self.kafka.restart_cluster()
self.create_and_start_clients(log_level="INFO")
self.create_and_start_clients(log_level="INFO", group_protocol=group_protocol)
self.run_validation()

def create_and_start_clients(self, log_level):
def create_and_start_clients(self, log_level, group_protocol):
self.create_producer(log_level=log_level)
self.producer.start()

self.create_consumer(log_level=log_level)
self.create_consumer(log_level=log_level, group_protocol=group_protocol)
self.consumer.start()

@cluster(num_nodes=2)
@matrix(
metadata_quorum=[quorum.zk],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False]
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False):
def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
"""
Test that invalid hostname in ZooKeeper or KRaft Controller results in broker inability to start.
"""
Expand Down

0 comments on commit 6bb9cac

Please sign in to comment.