New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15189: only init remote topic metrics when enabled #14133
Changes from 6 commits
e7fa43e
6f7a39c
f7696e8
6b76e3b
9efb0e2
aa89b79
7ba060c
3241b60
5dc462b
c0536c8
4987727
4f37d9c
4c2d335
86ed825
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,9 +24,12 @@ import org.apache.kafka.common.errors.{InvalidTopicException, UnknownTopicOrPart | |
import org.apache.kafka.common.network.ListenerName | ||
import org.apache.kafka.common.security.auth.SecurityProtocol | ||
import org.apache.kafka.common.security.authenticator.TestJaasConfig | ||
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics} | ||
import org.apache.kafka.server.metrics.KafkaYammerMetrics | ||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} | ||
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} | ||
import org.junit.jupiter.api.Assertions._ | ||
import org.junit.jupiter.params.ParameterizedTest | ||
import org.junit.jupiter.params.provider.ValueSource | ||
|
||
import scala.annotation.nowarn | ||
import scala.jdk.CollectionConverters._ | ||
|
@@ -54,6 +57,12 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { | |
|
||
@BeforeEach | ||
override def setUp(testInfo: TestInfo): Unit = { | ||
if (testInfo.getDisplayName.endsWith("true")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't this enable RemoteStorage for all tests which use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right! Currently there's only 1 test in this test suite, but we should make it much clear. Updating now. |
||
// systemRemoteStorageEnabled is enabled | ||
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true") | ||
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName) | ||
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) | ||
} | ||
verifyNoRequestMetrics("Request metrics not removed in a previous test") | ||
startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName)) | ||
super.setUp(testInfo) | ||
|
@@ -70,8 +79,9 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { | |
* Verifies some of the metrics of producer, consumer as well as server. | ||
*/ | ||
@nowarn("cat=deprecation") | ||
@Test | ||
def testMetrics(): Unit = { | ||
@ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {0}") | ||
@ValueSource(booleans = Array(true, false)) | ||
def testMetrics(systemRemoteStorageEnabled: Boolean): Unit = { | ||
val topic = "topicWithOldMessageFormat" | ||
val props = new Properties | ||
props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0") | ||
|
@@ -103,6 +113,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { | |
|
||
generateAuthenticationFailure(tp) | ||
verifyBrokerAuthenticationMetrics(server) | ||
verifyRemoteStorageMetrics(systemRemoteStorageEnabled) | ||
} | ||
|
||
private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, | ||
|
@@ -308,4 +319,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { | |
} | ||
assertTrue(metrics.isEmpty, s"$errorMessage: ${metrics.keys}") | ||
} | ||
|
||
private def verifyRemoteStorageMetrics(shouldContainMetrics: Boolean): Unit = { | ||
val metrics = RemoteStorageMetrics.allMetrics().asScala.filter(name => | ||
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala.find(metric => { | ||
metric._1.getMBeanName().equals(name.getMBeanName) | ||
}).isDefined | ||
).toList | ||
if (shouldContainMetrics) { | ||
assertEquals(RemoteStorageMetrics.allMetrics().size(), metrics.size, s"Only $metrics appear in the metrics") | ||
} else { | ||
assertEquals(0, metrics.size, s"$metrics should not appear in the metrics") | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need these 2 packages in storage module to access
KafkaYammerMetrics
class and yammer metrics