From bdba5d394d878d12c961b9756bf107d28ba626ae Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 14 Jul 2022 08:33:33 +0200 Subject: [PATCH 1/2] reduce amount of log and audits if the same failure happens in a row, change the mininimum wait time for retrying to 5s --- .../notifications/TransformAuditMessage.java | 6 + .../transforms/TransformContext.java | 10 +- .../transforms/TransformIndexer.java | 29 ++- .../scheduling/TransformScheduledTask.java | 2 +- .../notifications/MockTransformAuditor.java | 57 +++++- .../transforms/TransformContextTests.java | 5 +- .../TransformIndexerFailureHandlingTests.java | 192 ++++++++++++++++-- .../TransformScheduledTaskTests.java | 8 +- .../scheduling/TransformSchedulerTests.java | 7 +- 9 files changed, 266 insertions(+), 50 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java index c05f7c3ff031f..e30ba3d1fa9a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/notifications/TransformAuditMessage.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.transform.notifications; +import org.elasticsearch.common.Strings; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; @@ -37,4 +38,9 @@ public final String getJobType() { protected Optional getResourceField() { return Optional.of(TRANSFORM_ID.getPreferredName()); } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 3b2a3c0be2b19..c16a310659896 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -31,6 +31,8 @@ public interface Listener { private final Listener taskListener; private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES; private final AtomicInteger failureCount; + // Keeps track of the last failure that occured, used for throttling logs and audit + private final AtomicReference lastFailure = new AtomicReference<>(); private volatile Instant changesLastDetectedAt; private volatile Instant lastSearchTime; private volatile boolean shouldStopAtCheckpoint = false; @@ -68,6 +70,7 @@ void setTaskStateToFailed(String reason) { void resetReasonAndFailureCounter() { stateReason.set(null); failureCount.set(0); + lastFailure.set(null); taskListener.failureCountChanged(); } @@ -99,12 +102,17 @@ int getFailureCount() { return failureCount.get(); } - int incrementAndGetFailureCount() { + int incrementAndGetFailureCount(String failure) { int newFailureCount = failureCount.incrementAndGet(); + lastFailure.set(failure); taskListener.failureCountChanged(); return newFailureCount; } + String getLastFailure() { + return lastFailure.get(); + } + void setChangesLastDetectedAt(Instant time) { changesLastDetectedAt = time; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 212b235dc8448..43500a70a4a46 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -120,8 +120,6 @@ private enum RunState { private volatile TransformCheckpoint lastCheckpoint; private volatile TransformCheckpoint nextCheckpoint; - // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index - private volatile String lastAuditedExceptionMessage = null; private volatile RunState runState; private volatile long lastCheckpointCleanup = 0L; @@ -924,7 +922,8 @@ void stopAndMaybeSaveState() { * (Note: originally this method was synchronized, which is not necessary) */ void handleFailure(Exception e) { - logger.warn(() -> "[" + getJobId() + "] transform encountered an exception: ", e); + // more detailed reporting in the handlers and below + logger.debug(() -> "[" + getJobId() + "] transform encountered an exception: ", e); Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); if (unwrappedException instanceof CircuitBreakingException) { @@ -957,7 +956,13 @@ void handleFailure(Exception e) { int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries()) .orElse(context.getNumFailureRetries()); - if (numFailureRetries != -1 && context.incrementAndGetFailureCount() > numFailureRetries) { + + // group failures to decide whether to report it below + final String thisFailureClass = unwrappedException.getClass().toString(); + final String lastFailureClass = context.getLastFailure(); + final int failureCount = context.incrementAndGetFailureCount(thisFailureClass); + + if (numFailureRetries != -1 && failureCount > numFailureRetries) { failIndexer( "task encountered more than " + numFailureRetries @@ -969,14 +974,16 @@ void handleFailure(Exception e) { // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one - if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { - String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); - - auditor.warning( - getJobId(), - "Transform encountered an exception: " + message + "; Will attempt again at next scheduled trigger." + // and if the number of retries is about to exceed + if (thisFailureClass.equals(lastFailureClass) == false || failureCount == numFailureRetries) { + String message = format( + "Transform encountered an exception: [%s]; Will automatically retry [%d/%d]", + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException), + failureCount, + numFailureRetries ); - lastAuditedExceptionMessage = message; + logger.warn(() -> "[" + getJobId() + "] " + message); + auditor.warning(getJobId(), message); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java index b91f9bd611fe5..2c32092155f3c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTask.java @@ -23,7 +23,7 @@ final class TransformScheduledTask { /** * Minimum delay that can be applied after a failure. */ - private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(1).toMillis(); + private static final long MIN_DELAY_MILLIS = Duration.ofSeconds(5).toMillis(); /** * Maximum delay that can be applied after a failure. */ diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java index 1e3cc46ad50a0..75cf3e162c855 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.transform.notifications; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; @@ -14,7 +16,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.xpack.core.common.notifications.Level; +import org.elasticsearch.xpack.core.transform.notifications.TransformAuditMessage; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -35,6 +39,7 @@ public class MockTransformAuditor extends TransformAuditor { private static final String MOCK_NODE_NAME = "mock_node_name"; + private static final Logger logger = LogManager.getLogger(MockTransformAuditor.class); @SuppressWarnings("unchecked") public static MockTransformAuditor createMockAuditor() { @@ -101,14 +106,22 @@ public abstract static class AbstractAuditExpectation implements AuditExpectatio protected final Level expectedLevel; protected final String expectedResourceId; protected final String expectedMessage; - volatile boolean saw; - - public AbstractAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) { + protected final int expectedCount; + volatile int count; + + public AbstractAuditExpectation( + String expectedName, + Level expectedLevel, + String expectedResourceId, + String expectedMessage, + int expectedCount + ) { this.expectedName = expectedName; this.expectedLevel = expectedLevel; this.expectedResourceId = expectedResourceId; this.expectedMessage = expectedMessage; - this.saw = false; + this.expectedCount = expectedCount; + this.count = 0; } @Override @@ -116,11 +129,11 @@ public void match(final Level level, final String resourceId, final String messa if (level.equals(expectedLevel) && resourceId.equals(expectedResourceId) && innerMatch(level, resourceId, message)) { if (Regex.isSimpleMatchPattern(expectedMessage)) { if (Regex.simpleMatch(expectedMessage, message)) { - saw = true; + ++count; } } else { if (message.contains(expectedMessage)) { - saw = true; + ++count; } } } @@ -134,28 +147,52 @@ public boolean innerMatch(final Level level, final String resourceId, final Stri public static class SeenAuditExpectation extends AbstractAuditExpectation { public SeenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) { - super(expectedName, expectedLevel, expectedResourceId, expectedMessage); + super(expectedName, expectedLevel, expectedResourceId, expectedMessage, 1); } @Override public void assertMatched() { - assertThat("expected to see " + expectedName + " but did not", saw, equalTo(true)); + assertThat("expected to see " + expectedName + " but did not", count, equalTo(expectedCount)); } } public static class UnseenAuditExpectation extends AbstractAuditExpectation { public UnseenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) { - super(expectedName, expectedLevel, expectedResourceId, expectedMessage); + super(expectedName, expectedLevel, expectedResourceId, expectedMessage, 0); + } + + @Override + public void assertMatched() { + assertThat("expected not to see " + expectedName + " but did", count, equalTo(expectedCount)); + } + } + + public static class MultipleSeenAuditExpectation extends AbstractAuditExpectation { + + public MultipleSeenAuditExpectation( + String expectedName, + Level expectedLevel, + String expectedResourceId, + String expectedMessage, + int expectedCount + ) { + super(expectedName, expectedLevel, expectedResourceId, expectedMessage, expectedCount); } @Override public void assertMatched() { - assertThat("expected not to see " + expectedName + " but did", saw, equalTo(false)); + assertThat( + "expected to see " + expectedName + " " + expectedCount + " times but saw it " + count + " times ", + count, + equalTo(expectedCount) + ); } } private void audit(Level level, String resourceId, String message) { + logger.info("AUDIT: {}", new TransformAuditMessage(resourceId, message, level, new Date(), MOCK_NODE_NAME)); + for (AuditExpectation expectation : expectations) { expectation.match(level, resourceId, message); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java index 89c3064226a26..19bd1f1c1bea9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformContextTests.java @@ -36,12 +36,13 @@ public void verifyNoMoreInteractionsOnMocks() { public void testFailureCount() { TransformContext context = new TransformContext(null, null, 0, listener); - assertThat(context.incrementAndGetFailureCount(), is(equalTo(1))); + assertThat(context.incrementAndGetFailureCount("some_exception"), is(equalTo(1))); assertThat(context.getFailureCount(), is(equalTo(1))); - assertThat(context.incrementAndGetFailureCount(), is(equalTo(2))); + assertThat(context.incrementAndGetFailureCount("some_other_exception"), is(equalTo(2))); assertThat(context.getFailureCount(), is(equalTo(2))); context.resetReasonAndFailureCounter(); assertThat(context.getFailureCount(), is(equalTo(0))); + assertThat(context.getLastFailure(), is(nullValue())); // Verify that the listener is notified every time the failure count is incremented or reset verify(listener, times(3)).failureCountChanged(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 3acb33985817f..933d8ae553a92 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker.Durability; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -689,8 +690,8 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce "timed out during dbq", Level.WARNING, transformId, - "Transform encountered an exception: org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq;" - + " Will attempt again at next scheduled trigger." + "Transform encountered an exception: [org.elasticsearch.ElasticsearchTimeoutException: timed out during dbq];" + + " Will automatically retry [1/10]" ) ); TransformContext.Listener contextListener = mock(TransformContext.Listener.class); @@ -835,28 +836,170 @@ public SearchResponse apply(SearchRequest searchRequest) { assertEquals(0, context.getFailureCount()); } + // tests throttling of audits on logs based on repeated exception types + public void testHandleFailureAuditing() { + String transformId = randomAlphaOfLength(10); + TransformConfig config = new TransformConfig.Builder().setId(transformId) + .setSource(randomSourceConfig()) + .setDest(randomDestConfig()) + .setSyncConfig(new TimeSyncConfig("time", TimeSyncConfig.DEFAULT_DELAY)) + .setPivotConfig(randomPivotConfig()) + .build(); + + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = request -> mock(SearchResponse.class); + Function bulkFunction = request -> mock(BulkResponse.class); + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + Consumer failureConsumer = message -> { + failIndexerCalled.compareAndSet(false, true); + failureMessage.compareAndSet(null, message); + }; + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = mock(TransformContext.Listener.class); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + + auditor.addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "timeout_exception_1", + Level.WARNING, + transformId, + "Transform encountered an exception: [*ElasticsearchTimeoutException: timeout_1]; Will automatically retry [1/" + + Transform.DEFAULT_FAILURE_RETRIES + + "]" + ) + ); + + auditor.addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "bulk_exception_1", + Level.WARNING, + transformId, + "Transform encountered an exception: [*BulkIndexingException: bulk_exception_1*]; Will automatically retry [2/" + + Transform.DEFAULT_FAILURE_RETRIES + + "]" + ) + ); + + auditor.addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "timeout_exception_2", + Level.WARNING, + transformId, + "Transform encountered an exception: [*ElasticsearchTimeoutException: timeout_2]; Will automatically retry [3/" + + Transform.DEFAULT_FAILURE_RETRIES + + "]" + ) + ); + + auditor.addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "bulk_exception_2", + Level.WARNING, + transformId, + "Transform encountered an exception: [*BulkIndexingException: bulk_exception_2*]; Will automatically retry [6/" + + Transform.DEFAULT_FAILURE_RETRIES + + "]" + ) + ); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + failureConsumer, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); + + indexer.handleFailure( + new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_1")) } + ) + ); + + indexer.handleFailure( + new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { + new ShardSearchFailure( + new BulkIndexingException("bulk_exception_1", new EsRejectedExecutionException("full queue"), false) + ) } + ) + ); + + indexer.handleFailure( + new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_2")) } + ) + ); + + // not logged + indexer.handleFailure( + new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_2")) } + ) + ); + + // not logged + indexer.handleFailure( + new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timeout_2")) } + ) + ); + + indexer.handleFailure( + new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { + new ShardSearchFailure( + new BulkIndexingException("bulk_exception_2", new EsRejectedExecutionException("full queue"), false) + ) } + ) + ); + + auditor.assertAllExpectationsMatched(); + } + public void testHandleFailure() { - testHandleFailure(0, 5, 0); - testHandleFailure(5, 0, 5); - testHandleFailure(3, 5, 3); - testHandleFailure(5, 3, 5); - testHandleFailure(0, null, 0); - testHandleFailure(3, null, 3); - testHandleFailure(5, null, 5); - testHandleFailure(7, null, 7); - testHandleFailure(Transform.DEFAULT_FAILURE_RETRIES, null, Transform.DEFAULT_FAILURE_RETRIES); - testHandleFailure(null, 0, 0); - testHandleFailure(null, 3, 3); - testHandleFailure(null, 5, 5); - testHandleFailure(null, 7, 7); - testHandleFailure(null, Transform.DEFAULT_FAILURE_RETRIES, Transform.DEFAULT_FAILURE_RETRIES); - testHandleFailure(null, null, Transform.DEFAULT_FAILURE_RETRIES); + testHandleFailure(0, 5, 0, 0); + testHandleFailure(5, 0, 5, 2); + testHandleFailure(3, 5, 3, 2); + testHandleFailure(5, 3, 5, 2); + testHandleFailure(0, null, 0, 0); + testHandleFailure(3, null, 3, 2); + testHandleFailure(5, null, 5, 2); + testHandleFailure(7, null, 7, 2); + testHandleFailure(Transform.DEFAULT_FAILURE_RETRIES, null, Transform.DEFAULT_FAILURE_RETRIES, 2); + testHandleFailure(null, 0, 0, 0); + testHandleFailure(null, 3, 3, 2); + testHandleFailure(null, 5, 5, 2); + testHandleFailure(null, 7, 7, 2); + testHandleFailure(null, Transform.DEFAULT_FAILURE_RETRIES, Transform.DEFAULT_FAILURE_RETRIES, 2); + testHandleFailure(null, null, Transform.DEFAULT_FAILURE_RETRIES, 2); } private void testHandleFailure( Integer configNumFailureRetries, Integer contextNumFailureRetries, - int expectedEffectiveNumFailureRetries + int expectedEffectiveNumFailureRetries, + int expecedNumberOfRetryAudits ) { String transformId = randomAlphaOfLength(10); TransformConfig config = new TransformConfig.Builder().setId(transformId) @@ -885,6 +1028,19 @@ private void testHandleFailure( context.setNumFailureRetries(contextNumFailureRetries); } + int indexerRetries = configNumFailureRetries != null ? configNumFailureRetries + : contextNumFailureRetries != null ? contextNumFailureRetries + : Transform.DEFAULT_FAILURE_RETRIES; + auditor.addExpectation( + new MockTransformAuditor.MultipleSeenAuditExpectation( + getTestName(), + Level.WARNING, + transformId, + "Transform encountered an exception: [*]; Will automatically retry [*/" + indexerRetries + "]", + expecedNumberOfRetryAudits + ) + ); + MockedTransformIndexer indexer = createMockIndexer( config, state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java index 2af5cb87e5179..1bca54361744a 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduledTaskTests.java @@ -58,16 +58,16 @@ public void testNextScheduledTimeMillis() { { TransformScheduledTask task = new TransformScheduledTask(TRANSFORM_ID, FREQUENCY, LAST_TRIGGERED_TIME_MILLIS, 1, LISTENER); // Verify that the next scheduled time is calculated properly when failure count is greater than 0 - assertThat(task.getNextScheduledTimeMillis(), is(equalTo(102000L))); + assertThat(task.getNextScheduledTimeMillis(), is(equalTo(105000L))); } } public void testCalculateNextScheduledTimeAfterFailure() { long lastTriggeredTimeMillis = Instant.now().toEpochMilli(); long[] expectedDelayMillis = { - 1000, // 1s - 2000, // 2s - 4000, // 4s + 5000, // 5s + 5000, // 5s + 5000, // 5s 8000, // 8s 16000, // 16s 32000, // 32s diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java index de5bc05834041..96f497f04dc29 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java @@ -149,20 +149,20 @@ public void testSchedulingWithFailures() { transformScheduler.handleTransformFailureCountChanged(transformId, 1); assertThat( transformScheduler.getTransformScheduledTasks(), - contains(new TransformScheduledTask(transformId, frequency, 0L, 1, 2 * 1000, listener)) + contains(new TransformScheduledTask(transformId, frequency, 0L, 1, 5 * 1000, listener)) ); assertThat(events, hasSize(1)); transformScheduler.processScheduledTasks(); assertThat( transformScheduler.getTransformScheduledTasks(), - contains(new TransformScheduledTask(transformId, frequency, 60 * 1000L, 1, 62 * 1000, listener)) + contains(new TransformScheduledTask(transformId, frequency, 60 * 1000L, 1, 65 * 1000, listener)) ); assertThat(events, hasSize(2)); assertThat( events, - contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 2 * 1000, 60 * 1000)) + contains(new TransformScheduler.Event(transformId, 0, 0), new TransformScheduler.Event(transformId, 5 * 1000, 60 * 1000)) ); transformScheduler.deregisterTransform(transformId); @@ -396,6 +396,7 @@ public void advanceTimeBy(Duration duration) { setCurrentTime(currentTime.plus(duration)); } + @Override public Instant instant() { return currentTime; } From af4f7dc774856d3932ae80e3521d54efc0d80cf2 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 14 Jul 2022 11:30:33 +0200 Subject: [PATCH 2/2] Merge SeenAuditExpectation and MultipleSeenAuditExpectation into one. --- .../notifications/MockTransformAuditor.java | 73 +++++++++++-------- .../TransformIndexerFailureHandlingTests.java | 2 +- 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java index 75cf3e162c855..36c700629d6f7 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/notifications/MockTransformAuditor.java @@ -102,7 +102,7 @@ public interface AuditExpectation { } public abstract static class AbstractAuditExpectation implements AuditExpectation { - protected final String expectedName; + protected final String name; protected final Level expectedLevel; protected final String expectedResourceId; protected final String expectedMessage; @@ -110,13 +110,13 @@ public abstract static class AbstractAuditExpectation implements AuditExpectatio volatile int count; public AbstractAuditExpectation( - String expectedName, + String name, Level expectedLevel, String expectedResourceId, String expectedMessage, int expectedCount ) { - this.expectedName = expectedName; + this.name = name; this.expectedLevel = expectedLevel; this.expectedResourceId = expectedResourceId; this.expectedMessage = expectedMessage; @@ -144,52 +144,67 @@ public boolean innerMatch(final Level level, final String resourceId, final Stri } } + /** + * Expectation to assert a certain audit message has been issued once or multiple times. + */ public static class SeenAuditExpectation extends AbstractAuditExpectation { - public SeenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) { - super(expectedName, expectedLevel, expectedResourceId, expectedMessage, 1); - } - - @Override - public void assertMatched() { - assertThat("expected to see " + expectedName + " but did not", count, equalTo(expectedCount)); - } - } - - public static class UnseenAuditExpectation extends AbstractAuditExpectation { - - public UnseenAuditExpectation(String expectedName, Level expectedLevel, String expectedResourceId, String expectedMessage) { - super(expectedName, expectedLevel, expectedResourceId, expectedMessage, 0); - } - - @Override - public void assertMatched() { - assertThat("expected not to see " + expectedName + " but did", count, equalTo(expectedCount)); + /** + * Expectation to match an audit exactly once. + * + * @param name name of the expected audit, free of choice, used for the assert message + * @param expectedLevel The expected level of the audit + * @param expectedResourceId The expected resource id + * @param expectedMessage Expected message of the audit, supports simple wildcard matching + */ + public SeenAuditExpectation(String name, Level expectedLevel, String expectedResourceId, String expectedMessage) { + super(name, expectedLevel, expectedResourceId, expectedMessage, 1); } - } - public static class MultipleSeenAuditExpectation extends AbstractAuditExpectation { - - public MultipleSeenAuditExpectation( - String expectedName, + /** + * Expectation to match an audit a certain number of times. + * + * @param name name of the expected audit, free of choice, used for the assert message + * @param expectedLevel The expected level of the audit + * @param expectedResourceId The expected resource id + * @param expectedMessage Expected message of the audit, supports simple wildcard matching + * @param expectedCount Expected number of times the audit should be matched + */ + public SeenAuditExpectation( + String name, Level expectedLevel, String expectedResourceId, String expectedMessage, int expectedCount ) { - super(expectedName, expectedLevel, expectedResourceId, expectedMessage, expectedCount); + super(name, expectedLevel, expectedResourceId, expectedMessage, expectedCount); } @Override public void assertMatched() { assertThat( - "expected to see " + expectedName + " " + expectedCount + " times but saw it " + count + " times ", + "expected to see " + name + " " + expectedCount + " times but saw it " + count + " times ", count, equalTo(expectedCount) ); } } + /** + * Expectation to assert a certain audit message is not issued. + */ + public static class UnseenAuditExpectation extends AbstractAuditExpectation { + + public UnseenAuditExpectation(String name, Level expectedLevel, String expectedResourceId, String expectedMessage) { + super(name, expectedLevel, expectedResourceId, expectedMessage, 0); + } + + @Override + public void assertMatched() { + assertThat("expected not to see " + name + " but did", count, equalTo(expectedCount)); + } + } + private void audit(Level level, String resourceId, String message) { logger.info("AUDIT: {}", new TransformAuditMessage(resourceId, message, level, new Date(), MOCK_NODE_NAME)); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 933d8ae553a92..53fe8fbc1489a 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -1032,7 +1032,7 @@ private void testHandleFailure( : contextNumFailureRetries != null ? contextNumFailureRetries : Transform.DEFAULT_FAILURE_RETRIES; auditor.addExpectation( - new MockTransformAuditor.MultipleSeenAuditExpectation( + new MockTransformAuditor.SeenAuditExpectation( getTestName(), Level.WARNING, transformId,