diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 92332225ed5c8..a6f76a486a7fd 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -71,6 +71,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.gateway.IncrementalClusterStateWriter; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -226,6 +227,7 @@ public void apply(Settings value, Settings current, Settings previous) { GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING, GatewayService.RECOVER_AFTER_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, + IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, NetworkModule.HTTP_TYPE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index 292af96b11a67..644a1eae644ac 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -92,8 +92,10 @@ public void start(Settings settings, TransportService transportService, ClusterS } final IncrementalClusterStateWriter incrementalClusterStateWriter - = new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(), - prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2())); + = new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService, + manifestClusterStateTuple.v1(), + prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()), + transportService.getThreadPool()::relativeTimeInMillis); if (DiscoveryNode.isMasterNode(settings) == false) { if (DiscoveryNode.isDataNode(settings)) { // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's diff --git a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java index 5facb826a2443..d015bcc5b6c22 100644 --- a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java +++ b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java @@ -18,12 +18,18 @@ */ package org.elasticsearch.gateway; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import java.util.ArrayList; @@ -33,11 +39,17 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.LongSupplier; /** * Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata). */ -class IncrementalClusterStateWriter { +public class IncrementalClusterStateWriter { + + private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class); + + public static final Setting SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold", + TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic); private final MetaStateService metaStateService; @@ -46,13 +58,24 @@ class IncrementalClusterStateWriter { // no need to synchronize access to these fields. private Manifest previousManifest; private ClusterState previousClusterState; + private final LongSupplier relativeTimeMillisSupplier; private boolean incrementalWrite; - IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) { + private volatile TimeValue slowWriteLoggingThreshold; + + IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest, + ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) { this.metaStateService = metaStateService; this.previousManifest = manifest; this.previousClusterState = clusterState; + this.relativeTimeMillisSupplier = relativeTimeMillisSupplier; this.incrementalWrite = false; + this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + } + + private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { + this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } void setCurrentTerm(long currentTerm) throws WriteStateException { @@ -85,14 +108,26 @@ void setIncrementalWrite(boolean incrementalWrite) { void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException { MetaData newMetaData = newState.metaData(); + final long startTimeMillis = relativeTimeMillisSupplier.getAsLong(); + final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest); long globalStateGeneration = writeGlobalState(writer, newMetaData); Map indexGenerations = writeIndicesMetadata(writer, newState, previousState); Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations); writeManifest(writer, manifest); - previousManifest = manifest; previousClusterState = newState; + + final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis; + final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold; + if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) { + logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped()); + } else { + logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices", + durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped()); + } } private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException { @@ -256,6 +291,9 @@ static class AtomicClusterStateWriter { private final MetaStateService metaStateService; private boolean finished; + private int indicesWritten; + private int indicesSkipped; + AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) { this.metaStateService = metaStateService; assert previousManifest != null; @@ -320,6 +358,22 @@ void rollback() { rollbackCleanupActions.forEach(Runnable::run); finished = true; } + + void incrementIndicesWritten() { + indicesWritten++; + } + + void incrementIndicesSkipped() { + indicesSkipped++; + } + + int getIndicesWritten() { + return indicesWritten; + } + + int getIndicesSkipped() { + return indicesSkipped; + } } static class KeepPreviousGeneration implements IndexMetaDataAction { @@ -338,6 +392,7 @@ public Index getIndex() { @Override public long execute(AtomicClusterStateWriter writer) { + writer.incrementIndicesSkipped(); return generation; } } @@ -356,6 +411,7 @@ public Index getIndex() { @Override public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + writer.incrementIndicesWritten(); return writer.writeIndex("freshly created", indexMetaData); } } @@ -376,6 +432,7 @@ public Index getIndex() { @Override public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + writer.incrementIndicesWritten(); return writer.writeIndex( "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]", newIndexMetaData); diff --git a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java index b41a24bb820d8..d5a03dee70e1c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java @@ -18,26 +18,35 @@ */ package org.elasticsearch.gateway; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.mockito.ArgumentCaptor; import java.io.IOException; @@ -48,15 +57,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { @@ -250,13 +262,19 @@ public void testResolveStatesToBeWritten() throws WriteStateException { assertThat(actions, hasSize(3)); + boolean keptPreviousGeneration = false; + boolean wroteNewIndex = false; + boolean wroteChangedIndex = false; + for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) { if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) { assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex())); IncrementalClusterStateWriter.AtomicClusterStateWriter writer = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); assertThat(action.execute(writer), equalTo(3L)); - verifyZeroInteractions(writer); + verify(writer, times(1)).incrementIndicesSkipped(); + verifyNoMoreInteractions(writer); + keptPreviousGeneration = true; } if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) { assertThat(action.getIndex(), equalTo(newIndex.getIndex())); @@ -264,6 +282,8 @@ public void testResolveStatesToBeWritten() throws WriteStateException { = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L); assertThat(action.execute(writer), equalTo(0L)); + verify(writer, times(1)).incrementIndicesWritten(); + wroteNewIndex = true; } if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) { assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex())); @@ -273,10 +293,16 @@ public void testResolveStatesToBeWritten() throws WriteStateException { assertThat(action.execute(writer), equalTo(3L)); ArgumentCaptor reason = ArgumentCaptor.forClass(String.class); verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex)); + verify(writer, times(1)).incrementIndicesWritten(); assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion()))); assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion()))); + wroteChangedIndex = true; } } + + assertTrue(keptPreviousGeneration); + assertTrue(wroteNewIndex); + assertTrue(wroteChangedIndex); } private static class MetaStateServiceWithFailures extends MetaStateService { @@ -426,4 +452,84 @@ public void testAtomicityWithFailures() throws IOException { assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData))); } } + + @TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level") + public void testSlowLogging() throws WriteStateException, IllegalAccessException { + final long slowWriteLoggingThresholdMillis; + final Settings settings; + if (randomBoolean()) { + slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis(); + settings = Settings.EMPTY; + } else { + slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000); + settings = Settings.builder() + .put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms") + .build(); + } + + final DiscoveryNode localNode = newNode("node"); + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build(); + + final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10); + final AtomicLong currentTime = new AtomicLong(startTimeMillis); + final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis); + + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final IncrementalClusterStateWriter incrementalClusterStateWriter + = new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class), + new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()), + clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get())); + + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( + "should see warning at threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote metadata for [0] indices and skipped [0] unchanged indices")); + + writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2)); + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( + "should see warning above threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote metadata for [0] indices and skipped [0] unchanged indices")); + + writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1)); + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation( + "should not see warning below threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "*")); + + clusterSettings.applySettings(Settings.builder() + .put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms") + .build()); + assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation( + "should see warning at reduced threshold", + IncrementalClusterStateWriter.class.getCanonicalName(), + Level.WARN, + "writing cluster state took [*] which is above the warn threshold of [*]; " + + "wrote metadata for [0] indices and skipped [0] unchanged indices")); + + assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow + } + + private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter, + MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(expectation); + Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class); + Loggers.addAppender(classLogger, mockAppender); + + try { + incrementalClusterStateWriter.updateClusterState(clusterState, clusterState); + } finally { + Loggers.removeAppender(classLogger, mockAppender); + mockAppender.stop(); + } + mockAppender.assertAllExpectationsMatched(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index b66b5ea3ee2b3..b73a90b428485 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -23,12 +23,17 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.MetaDataUpgrader; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * {@link GatewayMetaState} constructor accepts a lot of arguments. * It's not always easy / convenient to construct these dependencies. @@ -55,6 +60,12 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust } public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXContentRegistry xContentRegistry) { - start(settings, null, null, new MetaStateService(nodeEnvironment, xContentRegistry), null, null); + final TransportService transportService = mock(TransportService.class); + when(transportService.getThreadPool()).thenReturn(mock(ThreadPool.class)); + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()) + .thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + start(settings, transportService, clusterService, new MetaStateService(nodeEnvironment, xContentRegistry), + null, null); } }