Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not emit negative lag because of stale offsets #14292

Merged
merged 8 commits into from
Jul 5, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -4220,6 +4220,21 @@ protected void emitLag()
return;
}

// Try emitting lag even with stale metrics provided that none of the partitions has negative lag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Try emitting lag even with stale metrics provided that none of the partitions has negative lag
// Try emitting lag even with stale metrics provided that none of the partitions have negative lag

final long staleMillis = sequenceLastUpdated == null
? 0
: DateTimes.nowUtc().getMillis()
- (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) {
// Log at most once every twenty supervisor runs to reduce noise in the logs
if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) {
log.warn("Lag is negative and will not be emitted because topic offsets have become stale. "
+ "This will not impact data processing. "
+ "Offsets may become stale because of connectivity issues.");
}
return;
Copy link
Contributor

@abhishekrb19 abhishekrb19 May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we skip emitting lag metrics only for the stale partitions? I think in general, it'll be helpful to emit metrics for partitions that have non-negative lag. For example, if a topic's partitions are spread across multiple brokers and only some have connectivity issues. Or for a topic where some partitions receive little to no data, those may selectively be considered "stale".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do that, it will be very easy to get into a wrong debugging trail where the overall lag might appear lower than it actually is. I am in favor of not emitting lag for any partition at all. The partition level lag would still be available in the task reports.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a separate metric which we can emit for partition-level lag, without actually reporting/affecting the overall lag at all. But I guess having them in the report should be enough too.

Copy link
Contributor

@abhishekrb19 abhishekrb19 May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a per-partition lag metric would complement the existing metrics. My main concern with not reporting any lag for a topic in this scenario is we'd have periods of missing lag data for as long as there's at least one stale partition in a topic. The missing metrics data can hide problems silently and affect existing downstream consumers of the data on how they alert, present metrics for visualization, etc. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a missing metric data for a topic is easier to detect and be notified about than metric data missing some partitions. @AmatyaAvadhanula - do we already emit a lag metric for each partition in the topic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do emit metrics for every partition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These partitions usually go stale because supervisor can't connect to Kafka. We can revisit later if not having any metric becomes a pain point. ideally, users should also be alerting on missing metric.

}

LagStats lagStats = computeLags(partitionLags);
Map<String, Object> metricTags = spec.getContextValue(DruidMetrics.TAGS);
for (Map.Entry<PartitionIdType, Long> entry : partitionLags.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand Down Expand Up @@ -1035,6 +1036,30 @@ public void testGetStats()
);
}

@Test
public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
{
expectEmitterSupervisor(false);

CountDownLatch latch = new CountDownLatch(1);

final TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
TestEmittingTestSeekableStreamSupervisor.LAG,
// Record lag must not be emitted
ImmutableMap.of("0", 10L, "1", -100L),
null
);
supervisor.start();
// Forcibly set the offsets to be stale
supervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(Integer.MAX_VALUE);

latch.await();

supervisor.emitLag();
Assert.assertEquals(0, emitter.getEvents().size());
}

private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
{
List<Event> result = events.stream()
Expand Down