Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metrics for processing latency #5974

Merged
merged 3 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ public final class StreamProcessorMetrics {
Histogram.build()
.namespace(NAMESPACE)
.name("stream_processor_latency")
.help("Latency of processing in seconds")
.help(
"Time between a record is written until it is picked up for processing (in seconds)")
.labelNames("recordType", "partition")
.register();
private static final Histogram PROCESSING_DURATION =
Histogram.build()
.namespace(NAMESPACE)
.name("stream_processor_processing_duration")
.help("Time for processing a record (in seconds)")
.labelNames("recordType", "partition")
.register();

Expand Down Expand Up @@ -64,6 +72,13 @@ public void processingLatency(
.observe((processed - written) / 1000f);
}

public void processingDuration(
final RecordType recordType, final long started, final long processed) {
PROCESSING_DURATION
.labels(recordType.name(), partitionIdLabel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be interesting to find hotspots by adding value type/intent as labels - however I'm not sure if that creates too much dimensions/data explosion. We can definitely do that as a second iteration though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can improve it when we need it.

.observe((processed - started) / 1000f);
}

public void eventProcessed() {
event("processed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public final class ProcessingStateMachine {
private long errorRecordPosition = StreamProcessor.UNSET_POSITION;
private volatile boolean onErrorHandlingLoop;
private int onErrorRetries;
// Used for processing duration metrics
private long processingStartTime;

public ProcessingStateMachine(
final ProcessingContext context, final BooleanSupplier shouldProcessNext) {
Expand Down Expand Up @@ -217,8 +219,8 @@ private void processEvent(final LoggedEvent event) {
return;
}

metrics.processingLatency(
metadata.getRecordType(), event.getTimestamp(), ActorClock.currentTimeMillis());
processingStartTime = ActorClock.currentTimeMillis();
metrics.processingLatency(metadata.getRecordType(), event.getTimestamp(), processingStartTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the choice to include deserialization as part of the processing time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first step I did not want to narrow the scope of a metrics. If processing duration is higher than what we expect, then we can narrow down the metrics to smaller blocks. Would you like to take deserialization out of the processing time?

Copy link
Member

@npepinpe npepinpe Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly wanted to know if it was a conscious choice, and if so, why. Do you think it might be confusing/unexpected that deserializing is part of the processing time? I think it might be a little unexpected, but once you know it doesn't sound out of place imho, you can make a case that it is part of the processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I wanted to include the complete processing - from this event is ready to process until the next event is ready. This would give us an idea how much time we spent in StreamProcessor. So that should include the steps updateState and writeEvent, which is not currently included in the processing time. Wdyt? Shall I update it? Then it wouldn't be weird to have deserializing also part of the processing time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes more sense - we can make it more granular by adding a metric per step later on (i.e. updateState, writeFollowUpEvents, etc.). If we add one here, can we also add one in reprocessing? Both will be very useful when refactoring how we do stream processing next quarter 👍


try {
final UnifiedRecordValue value = recordValues.readRecordValue(event, metadata.getValueType());
Expand Down Expand Up @@ -426,6 +428,8 @@ private void executeSideEffects() {

notifyListener();

metrics.processingDuration(
metadata.getRecordType(), processingStartTime, ActorClock.currentTimeMillis());
// continue with next event
currentProcessor = null;
actor.submit(this::readNextEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.zeebe.logstreams.impl.log;

import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;

public class AppenderMetrics {

Expand All @@ -27,6 +28,21 @@ public class AppenderMetrics {
.labelNames("partition")
.register();

private static final Histogram WRITE_LATENCY =
Histogram.build()
.namespace("zeebe")
.name("log_appender_append_latency")
.help("Latency to append an event to the log in seconds")
.labelNames("partition")
.register();
private static final Histogram COMMIT_LATENCY =
Histogram.build()
.namespace("zeebe")
.name("log_appender_commit_latency")
.help("Latency to commit an event to the log in seconds")
.labelNames("partition")
.register();

private final String partitionLabel;

public AppenderMetrics(final String partitionLabel) {
Expand All @@ -40,4 +56,12 @@ public void setLastCommittedPosition(final long position) {
public void setLastAppendedPosition(final long position) {
LAST_APPENDED_POSITION.labels(partitionLabel).set(position);
}

public void appendLatency(final long startTime, final long currentTime) {
WRITE_LATENCY.labels(partitionLabel).observe((currentTime - startTime) / 1000f);
}

public void commitLatency(final long startTime, final long currentTime) {
COMMIT_LATENCY.labels(partitionLabel).observe((currentTime - startTime) / 1000f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
import java.util.NoSuchElementException;

public final class Listener implements AppendListener {

private final LogStorageAppender appender;
private final long highestPosition;
private final long startTime;

public Listener(final LogStorageAppender appender, final long highestPosition) {
public Listener(
final LogStorageAppender appender, final long highestPosition, final long startTime) {
this.appender = appender;
this.highestPosition = highestPosition;
this.startTime = startTime;
}

@Override
public void onWrite(final long address) {
appender.notifyWritePosition(highestPosition);
appender.notifyWritePosition(highestPosition, startTime);
}

@Override
Expand All @@ -45,7 +47,7 @@ public void onWriteError(final Throwable error) {
@Override
public void onCommit(final long address) {
releaseBackPressure();
appender.notifyCommitPosition(highestPosition);
appender.notifyCommitPosition(highestPosition, startTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.zeebe.util.health.HealthMonitorable;
import io.zeebe.util.health.HealthStatus;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -116,7 +117,7 @@ private void appendBlock(final BlockPeek blockPeek) {
// Commit position is the position of the last event.
appendBackpressureMetrics.newEntryToAppend();
if (appendEntryLimiter.tryAcquire(positions.getRight())) {
final var listener = new Listener(this, positions.getRight());
final var listener = new Listener(this, positions.getRight(), ActorClock.currentTimeMillis());
logStorage.append(positions.getLeft(), positions.getRight(), copiedBuffer, listener);

blockPeek.markCompleted();
Expand Down Expand Up @@ -215,18 +216,20 @@ void releaseBackPressure(final long highestPosition) {
actor.run(() -> appendEntryLimiter.onCommit(highestPosition));
}

void notifyWritePosition(final long highestPosition) {
void notifyWritePosition(final long highestPosition, final long startTime) {
actor.run(
() -> {
appenderMetrics.setLastAppendedPosition(highestPosition);
appenderMetrics.appendLatency(startTime, ActorClock.currentTimeMillis());
});
}

void notifyCommitPosition(final long highestPosition) {
void notifyCommitPosition(final long highestPosition, final long startTime) {
actor.run(
() -> {
commitPositionListener.accept(highestPosition);
appenderMetrics.setLastCommittedPosition(highestPosition);
appenderMetrics.commitLatency(startTime, ActorClock.currentTimeMillis());
});
}
}