Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15183: Add more controller, loader, snapshot emitter metrics #14010

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,4 @@ See [vagrant/README.md](vagrant/README.md).
Apache Kafka is interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).

To contribute follow the instructions here:
* https://kafka.apache.org/contributing.html
* https://kafka.apache.org/contributing.html
24 changes: 21 additions & 3 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
</subpackage>

<subpackage name="controller">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.acl" />
Expand All @@ -73,7 +72,6 @@
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
Expand All @@ -93,13 +91,17 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.mutable" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

<subpackage name="image">
Expand All @@ -122,6 +124,22 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="loader">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
<subpackage name="publisher">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>

<subpackage name="metadata">
Expand Down
30 changes: 23 additions & 7 deletions core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoader
import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig.AddressSpec
Expand All @@ -34,7 +36,7 @@ import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, Process
import org.apache.kafka.server.metrics.KafkaYammerMetrics

import java.util
import java.util.{Collections, Optional}
import java.util.Optional
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference

Expand Down Expand Up @@ -106,6 +108,7 @@ class SharedServer(
val snapshotsDisabledReason = new AtomicReference[String](null)
@volatile var snapshotEmitter: SnapshotEmitter = _
@volatile var snapshotGenerator: SnapshotGenerator = _
@volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _

def isUsed(): Boolean = synchronized {
usedByController || usedByBroker
Expand Down Expand Up @@ -259,15 +262,24 @@ class SharedServer(
raftManager = _raftManager
_raftManager.startup()

metadataLoaderMetrics = if (brokerMetrics != null) {
new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
batchSize => brokerMetrics.updateBatchSize(batchSize),
brokerMetrics.lastAppliedImageProvenance)
} else {
new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
_ => {},
_ => {},
new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
Comment on lines +272 to +274
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about these three parameters. Could you comment as to why they are set like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few broker-specific metrics hanging out in BrokerServerMetrics.scala and this is a way to connect MetadataLoaderMetrics to that class. So that the metadata loader only needs to interact with MetadataLoaderMetrics and not the broker-specific code.

Long-term, we probably want to move all the loader metrics into MetadataLoaderMetrics, and make them all accessible on the controller as well as broker. But that's out of scope for this change (and would need a KIP anyway)

}
val loaderBuilder = new MetadataLoader.Builder().
setNodeId(metaProps.nodeId).
setTime(time).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
setFaultHandler(metadataLoaderFaultHandler).
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
if (brokerMetrics != null) {
loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
}
setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
setMetrics(metadataLoaderMetrics)
loader = loaderBuilder.build()
snapshotEmitter = new SnapshotEmitter.Builder().
setNodeId(metaProps.nodeId).
Expand All @@ -282,15 +294,15 @@ class SharedServer(
setDisabledReason(snapshotsDisabledReason).
setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
build()
_raftManager.register(loader)
rondagostino marked this conversation as resolved.
Show resolved Hide resolved
try {
loader.installPublishers(Collections.singletonList(snapshotGenerator))
loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()
rondagostino marked this conversation as resolved.
Show resolved Hide resolved
} catch {
case t: Throwable => {
error("Unable to install metadata publishers", t)
throw new RuntimeException("Unable to install metadata publishers.", t)
}
}
_raftManager.register(loader)
debug("Completed SharedServer startup.")
started = true
} catch {
Expand Down Expand Up @@ -326,6 +338,10 @@ class SharedServer(
CoreUtils.swallow(loader.close(), this)
loader = null
}
if (metadataLoaderMetrics != null) {
CoreUtils.swallow(metadataLoaderMetrics.close(), this)
metadataLoaderMetrics = null
}
if (snapshotGenerator != null) {
CoreUtils.swallow(snapshotGenerator.close(), this)
snapshotGenerator = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import org.apache.kafka.common.metrics.Gauge
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.MetricConfig
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoaderMetrics
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}

import java.util.Collections
import java.util.concurrent.TimeUnit.NANOSECONDS

final class BrokerServerMetrics private (
metrics: Metrics
) extends MetadataLoaderMetrics {
) extends AutoCloseable {
import BrokerServerMetrics._

private val batchProcessingTimeHistName = KafkaMetricsGroup.explicitMetricName("kafka.server",
Expand Down Expand Up @@ -123,15 +122,15 @@ final class BrokerServerMetrics private (
).foreach(metrics.removeMetric)
}

override def updateBatchProcessingTime(elapsedNs: Long): Unit =
def updateBatchProcessingTime(elapsedNs: Long): Unit =
batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))

override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)

override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
lastAppliedImageProvenance.set(provenance)

override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()
def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()

def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils}
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer._
Expand Down Expand Up @@ -1153,7 +1153,7 @@ class KRaftClusterTest {
val controller = cluster.controllers().values().iterator().next()
controller.controller.waitForReadyBrokers(3).get()
TestUtils.retry(60000) {
val latch = controller.controller.asInstanceOf[QuorumController].pause()
val latch = QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController])
Thread.sleep(1001)
latch.countDown()
assertEquals(0, controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -461,6 +460,12 @@ private void handleEventEnd(String name, long startProcessingTimeNs) {
private Throwable handleEventException(String name,
OptionalLong startProcessingTimeNs,
Throwable exception) {
if (!startProcessingTimeNs.isPresent() &&
ControllerExceptions.isTimeoutException(exception)) {
// If the event never started, and the exception is a timeout, increment the timed
// out metric.
controllerMetrics.incrementOperationsTimedOut();
}
Throwable externalException =
ControllerExceptions.toExternalException(exception, () -> latestController());
if (!startProcessingTimeNs.isPresent()) {
Expand Down Expand Up @@ -492,6 +497,15 @@ private Throwable handleEventException(String name,
return externalException;
}

private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) {
long now = time.nanoseconds();
controllerMetrics.incrementOperationsStarted();
if (eventCreatedTimeNs.isPresent()) {
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong()));
}
return now;
}

/**
* A controller event for handling internal state changes, such as Raft inputs.
*/
Expand All @@ -508,9 +522,8 @@ class ControllerEvent implements EventQueue.Event {

@Override
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
startProcessingTimeNs = OptionalLong.of(now);
startProcessingTimeNs = OptionalLong.of(
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
log.debug("Executing {}.", this);
handler.run();
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
Expand All @@ -527,11 +540,16 @@ public String toString() {
}
}

private void appendControlEvent(String name, Runnable handler) {
void appendControlEvent(String name, Runnable handler) {
ControllerEvent event = new ControllerEvent(name, handler);
queue.append(event);
}

void appendControlEventWithDeadline(String name, Runnable handler, long deadlineNs) {
ControllerEvent event = new ControllerEvent(name, handler);
queue.appendWithDeadline(deadlineNs, event);
}

/**
* A controller event that reads the committed internal state in order to expose it
* to an API.
Expand All @@ -555,9 +573,8 @@ CompletableFuture<T> future() {

@Override
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
startProcessingTimeNs = OptionalLong.of(now);
startProcessingTimeNs = OptionalLong.of(
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
T value = handler.get();
handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
future.complete(value);
Expand Down Expand Up @@ -692,12 +709,11 @@ CompletableFuture<T> future() {

@Override
public void run() throws Exception {
long now = time.nanoseconds();
if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
// We exclude deferred events from the event queue time metric to prevent
// incorrectly including the deferral time in the queue time.
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
}
// Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly
// including their deferral time in the event queue time.
startProcessingTimeNs = OptionalLong.of(
updateEventStartMetricsAndGetTime(flags.contains(DOES_NOT_UPDATE_QUEUE_TIME) ?
OptionalLong.empty() : OptionalLong.of(eventCreatedTimeNs)));
int controllerEpoch = curClaimEpoch;
if (!isActiveController(controllerEpoch)) {
throw ControllerExceptions.newWrongControllerException(latestController());
Expand All @@ -706,7 +722,6 @@ public void run() throws Exception {
log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", name);
throw ControllerExceptions.newPreMigrationException(latestController());
}
startProcessingTimeNs = OptionalLong.of(now);
ControllerResult<T> result = op.generateRecordsAndResult();
if (result.records().isEmpty()) {
op.processBatchEndOffset(writeOffset);
Expand Down Expand Up @@ -1063,6 +1078,9 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> {
final String newLeaderName = newLeader.leaderId().isPresent() ?
String.valueOf(newLeader.leaderId().getAsInt()) : "(none)";
if (newLeader.leaderId().isPresent()) {
controllerMetrics.incrementNewActiveControllers();
}
if (isActiveController()) {
if (newLeader.isLeader(nodeId)) {
log.warn("We were the leader in epoch {}, and are still the leader " +
Expand Down Expand Up @@ -1308,7 +1326,7 @@ private void updateLastCommittedState(
}
}

private void renounce() {
void renounce() {
try {
if (curClaimEpoch == -1) {
throw new RuntimeException("Cannot renounce leadership because we are not the " +
Expand Down Expand Up @@ -2302,20 +2320,12 @@ public void close() throws InterruptedException {
}

// VisibleForTesting
public CountDownLatch pause() {
final CountDownLatch latch = new CountDownLatch(1);
appendControlEvent("pause", () -> {
try {
latch.await();
} catch (InterruptedException e) {
log.info("Interrupted while waiting for unpause.", e);
}
});
return latch;
Time time() {
return time;
}

// VisibleForTesting
Time time() {
return time;
QuorumControllerMetrics controllerMetrics() {
return controllerMetrics;
}
}