Skip to content

Commit

Permalink
[ML] Avoid multiple queued quantiles documents in renormalizer (#85555)…
Browse files Browse the repository at this point in the history
… (#85599)

It's possible that an anomaly detection job can output a flurry
of quantiles documents for use in renormalization in the time that
a single renormalization takes.

When this happens we only use the most recent quantiles document
for the next renormalization, but the previous code queued up all
the quantiles documents until the next renormalization started.

The new approach that this PR implements is to only keep the most
recent new quantiles document while the current renormalization is
in progress. This avoids wasting memory on a queue that will be
largely discarded (apart from one element) later on.

Fixes #85539
  • Loading branch information
droberts195 committed Apr 1, 2022
1 parent e72fb71 commit d47642d
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 135 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/85555.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 85555
summary: Avoid multiple queued quantiles documents in renormalizer
area: Machine Learning
type: bug
issues:
- 85539
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,12 @@ private void readResults() {

public void setProcessKilled() {
processKilled = true;
renormalizer.shutdown();
try {
renormalizer.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

void handleOpenForecasts() {
Expand Down Expand Up @@ -538,7 +543,7 @@ public void clearAwaitingFlush(String flushId) {
flushListener.clear(flushId);
}

public void waitUntilRenormalizerIsIdle() {
public void waitUntilRenormalizerIsIdle() throws InterruptedException {
renormalizer.waitUntilIdle();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public interface Renormalizer {
/**
* Blocks until the renormalizer is idle and no further quantiles updates are pending.
*/
void waitUntilIdle();
void waitUntilIdle() throws InterruptedException;

/**
* Shut down the renormalization ASAP. Do not wait for it to fully complete.
*/
void shutdown();
void shutdown() throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,40 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;

import java.util.Deque;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.Phaser;

/**
* Renormalizer that discards outdated quantiles if even newer ones are received while waiting for a prior renormalization to complete.
* Renormalizer for one job that discards outdated quantiles if even newer ones are received while waiting for a prior renormalization
* to complete. Only one normalization must run at any time for any particular job. Quantiles documents can be large so it's important
* that we don't retain them unnecessarily.
*/
public class ShortCircuitingRenormalizer implements Renormalizer {

private static final Logger LOGGER = LogManager.getLogger(ShortCircuitingRenormalizer.class);
private static final Logger logger = LogManager.getLogger(ShortCircuitingRenormalizer.class);

private final String jobId;
private final ScoresUpdater scoresUpdater;
private final ExecutorService executorService;
private final Deque<QuantilesWithLatch> quantilesDeque = new ConcurrentLinkedDeque<>();
private final Deque<CountDownLatch> latchDeque = new ConcurrentLinkedDeque<>();
/**
* Each job may only have 1 normalization in progress at any time; the semaphore enforces this
* Each job may only have 1 normalization in progress at any time; the phaser enforces this.
* Registrations and arrivals must be synchronized.
*/
private final Semaphore semaphore = new Semaphore(1);
private final Phaser phaser = new Phaser() {
/**
* Don't terminate when registrations drops to zero.
*/
protected boolean onAdvance(int phase, int parties) {
return false;
}
};
/**
* Access to this must be synchronized.
*/
private AugmentedQuantiles latestQuantilesHolder;

public ShortCircuitingRenormalizer(String jobId, ScoresUpdater scoresUpdater, ExecutorService executorService) {
this.jobId = jobId;
Expand All @@ -51,169 +62,136 @@ public void renormalize(Quantiles quantiles) {
return;
}

// This will throw NPE if quantiles is null, so do it first
QuantilesWithLatch quantilesWithLatch = new QuantilesWithLatch(quantiles, new CountDownLatch(1));
// Needed to ensure work is not added while the tryFinishWork() method is running
synchronized (quantilesDeque) {
// Must add to latchDeque before quantilesDeque
latchDeque.addLast(quantilesWithLatch.getLatch());
quantilesDeque.addLast(quantilesWithLatch);
executorService.submit(() -> doRenormalizations());
synchronized (this) {
latestQuantilesHolder = (latestQuantilesHolder == null)
? new AugmentedQuantiles(quantiles, null, new CountDownLatch(1))
: new AugmentedQuantiles(quantiles, latestQuantilesHolder.getEvictedTimestamp(), latestQuantilesHolder.getLatch());
// Don't start a thread if another normalization thread is still working. The existing thread will
// do this normalization when it finishes its current one. This means we serialise normalizations
// without hogging threads or queuing up many large quantiles documents.
if (tryStartWork()) {
executorService.submit(this::doRenormalizations);
}
}
}

@Override
public void waitUntilIdle() {
try {
// We cannot tolerate more than one thread running this loop at any time,
// but need a different lock to the other synchronized parts of the code
synchronized (latchDeque) {
for (CountDownLatch latchToAwait = latchDeque.pollFirst(); latchToAwait != null; latchToAwait = latchDeque.pollFirst()) {
latchToAwait.await();
}
public void waitUntilIdle() throws InterruptedException {
CountDownLatch latch;
do {
// The first bit waits for any not-yet-started renormalization to complete.
synchronized (this) {
latch = (latestQuantilesHolder != null) ? latestQuantilesHolder.getLatch() : null;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (latch != null) {
latch.await();
}
// This next bit waits for any thread that's been started to run doRenormalizations() to exit the loop in that method.
// If no doRenormalizations() thread is running then we'll wait for the previous phase, and a call to do that should
// return immediately.
int phaseToWaitFor;
synchronized (this) {
phaseToWaitFor = phaser.getPhase() - 1 + phaser.getUnarrivedParties();
}
phaser.awaitAdvanceInterruptibly(phaseToWaitFor);
} while (latch != null);
}

@Override
public void shutdown() {
public void shutdown() throws InterruptedException {
scoresUpdater.shutdown();
// We have to wait until idle to avoid a raft of exceptions as other parts of the
// system are stopped after this method returns. However, shutting down the
// scoresUpdater first means it won't do all pending work; it will stop as soon
// as it can without causing further errors.
waitUntilIdle();
phaser.forceTermination();
}

private Quantiles getEarliestQuantiles() {
QuantilesWithLatch earliestQuantilesWithLatch = quantilesDeque.peekFirst();
return (earliestQuantilesWithLatch != null) ? earliestQuantilesWithLatch.getQuantiles() : null;
}

private QuantilesWithLatch getLatestQuantilesWithLatchAndClear() {
// We discard all but the latest quantiles
QuantilesWithLatch latestQuantilesWithLatch = null;
for (QuantilesWithLatch quantilesWithLatch = quantilesDeque.pollFirst(); quantilesWithLatch != null; quantilesWithLatch =
quantilesDeque.pollFirst()) {
// Count down the latches associated with any discarded quantiles
if (latestQuantilesWithLatch != null) {
latestQuantilesWithLatch.getLatch().countDown();
}
latestQuantilesWithLatch = quantilesWithLatch;
}
return latestQuantilesWithLatch;
}

private boolean tryStartWork() {
return semaphore.tryAcquire();
private synchronized AugmentedQuantiles getLatestAugmentedQuantilesAndClear() {
AugmentedQuantiles latest = latestQuantilesHolder;
latestQuantilesHolder = null;
return latest;
}

private boolean tryFinishWork() {
// We cannot tolerate new work being added in between the isEmpty() check and releasing the semaphore
synchronized (quantilesDeque) {
if (quantilesDeque.isEmpty() == false) {
return false;
}
semaphore.release();
return true;
private synchronized boolean tryStartWork() {
if (phaser.getUnarrivedParties() > 0) {
return false;
}
return phaser.register() >= 0;
}

private void forceFinishWork() {
// We cannot allow new quantiles to be added while we are failing from a previous renormalization failure.
synchronized (quantilesDeque) {
// We discard all but the earliest quantiles, if they exist
QuantilesWithLatch earliestQuantileWithLatch = null;
for (QuantilesWithLatch quantilesWithLatch = quantilesDeque.pollFirst(); quantilesWithLatch != null; quantilesWithLatch =
quantilesDeque.pollFirst()) {
if (earliestQuantileWithLatch == null) {
earliestQuantileWithLatch = quantilesWithLatch;
}
// Count down all the latches as they no longer matter since we failed
quantilesWithLatch.latch.countDown();
}
// Keep the earliest quantile so that the next call to doRenormalizations() will include as much as the failed normalization
// window as possible.
// Since this latch is already countedDown, there is no reason to put it in the `latchDeque` again
if (earliestQuantileWithLatch != null) {
quantilesDeque.addLast(earliestQuantileWithLatch);
}
semaphore.release();
private synchronized boolean tryFinishWork() {
// Synchronized because we cannot tolerate new work being added in between the null check and releasing the semaphore
if (latestQuantilesHolder != null) {
return false;
}
phaser.arriveAndDeregister();
return true;
}

private void doRenormalizations() {
// Exit immediately if another normalization is in progress. This means we don't hog threads.
if (tryStartWork() == false) {
return;
}

CountDownLatch latch = null;
try {
do {
// Note that if there is only one set of quantiles in the queue then both these references will point to the same quantiles.
Quantiles earliestQuantiles = getEarliestQuantiles();
QuantilesWithLatch latestQuantilesWithLatch = getLatestQuantilesWithLatchAndClear();
// We could end up with latestQuantilesWithLatch being null if the thread running this method
// was preempted before the tryStartWork() call, another thread already running this method
// did the work and exited, and then this thread got true returned by tryStartWork().
if (latestQuantilesWithLatch != null) {
Quantiles latestQuantiles = latestQuantilesWithLatch.getQuantiles();
latch = latestQuantilesWithLatch.getLatch();
// We could end up with earliestQuantiles being null if quantiles were
// added between getting the earliest and latest quantiles.
if (earliestQuantiles == null) {
earliestQuantiles = latestQuantiles;
}
long earliestBucketTimeMs = earliestQuantiles.getTimestamp().getTime();
long latestBucketTimeMs = latestQuantiles.getTimestamp().getTime();
// If we're going to skip quantiles, renormalize using the latest quantiles
// over the time ranges implied by all quantiles that were provided.
long windowExtensionMs = latestBucketTimeMs - earliestBucketTimeMs;
if (windowExtensionMs < 0) {
LOGGER.warn(
"[{}] Quantiles not supplied in time order - {} after {}",
jobId,
latestBucketTimeMs,
earliestBucketTimeMs
);
windowExtensionMs = 0;
}
scoresUpdater.update(latestQuantiles.getQuantileState(), latestBucketTimeMs, windowExtensionMs);
latch.countDown();
latch = null;
do {
AugmentedQuantiles latestAugmentedQuantiles = getLatestAugmentedQuantilesAndClear();
assert latestAugmentedQuantiles != null;
if (latestAugmentedQuantiles != null) {
Quantiles latestQuantiles = latestAugmentedQuantiles.getQuantiles();
CountDownLatch latch = latestAugmentedQuantiles.getLatch();
try {
scoresUpdater.update(
latestQuantiles.getQuantileState(),
latestQuantiles.getTimestamp().getTime(),
latestAugmentedQuantiles.getWindowExtensionMs()
);
} catch (Exception e) {
logger.error("[" + jobId + "] Normalization failed", e);
}
// Loop if more work has become available while we were working, because the
// tasks originally submitted to do that work will have exited early.
} while (tryFinishWork() == false);
} catch (Exception e) {
LOGGER.error("[" + jobId + "] Normalization failed", e);
if (latch != null) {
latch.countDown();
}
forceFinishWork();
}
// Loop if more work has become available while we were working, because the
// tasks originally submitted to do that work will have exited early.
} while (tryFinishWork() == false);
}

/**
* Simple grouping of a {@linkplain Quantiles} object with its corresponding {@linkplain CountDownLatch} object.
*/
private static class QuantilesWithLatch {
private class AugmentedQuantiles {
private final Quantiles quantiles;
private final Date earliestEvictedTimestamp;
private final CountDownLatch latch;

QuantilesWithLatch(Quantiles quantiles, CountDownLatch latch) {
AugmentedQuantiles(Quantiles quantiles, Date earliestEvictedTimestamp, CountDownLatch latch) {
this.quantiles = Objects.requireNonNull(quantiles);
this.earliestEvictedTimestamp = earliestEvictedTimestamp;
this.latch = Objects.requireNonNull(latch);
}

Quantiles getQuantiles() {
return quantiles;
}

Date getEvictedTimestamp() {
return (earliestEvictedTimestamp != null) ? earliestEvictedTimestamp : quantiles.getTimestamp();
}

long getWindowExtensionMs() {
if (earliestEvictedTimestamp == null) {
return 0;
}
long earliestTimeMs = earliestEvictedTimestamp.getTime();
long latestTimeMs = quantiles.getTimestamp().getTime();
// If we're going to skip quantiles, renormalize using the latest quantiles
// over the time ranges implied by all quantiles that were provided.
long windowExtensionMs = latestTimeMs - earliestTimeMs;
if (windowExtensionMs < 0) {
logger.warn("[{}] Quantiles not supplied in time order - {} after {}", jobId, latestTimeMs, earliestTimeMs);
return 0;
}
return windowExtensionMs;
}

CountDownLatch getLatch() {
return latch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -139,7 +138,7 @@ public void cleanup() {
executor.shutdown();
}

public void testProcess() throws TimeoutException {
public void testProcess() throws Exception {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());

Expand Down Expand Up @@ -439,7 +438,7 @@ public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
verify(renormalizer).isEnabled();
}

public void testAwaitCompletion() throws TimeoutException {
public void testAwaitCompletion() throws Exception {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());

Expand Down Expand Up @@ -492,7 +491,7 @@ public void testParsingErrorSetsFailed() throws Exception {
verify(persister).bulkPersisterBuilder(eq(JOB_ID));
}

public void testKill() throws TimeoutException {
public void testKill() throws Exception {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator());

Expand Down

0 comments on commit d47642d

Please sign in to comment.