Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn on slow metadata persistence #47005

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.IncrementalClusterStateWriter;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -226,6 +227,7 @@ public void apply(Settings value, Settings current, Settings previous) {
GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_NODES_SETTING,
GatewayService.RECOVER_AFTER_TIME_SETTING,
IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD,
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,
NetworkModule.HTTP_TYPE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ public void start(Settings settings, TransportService transportService, ClusterS
}

final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()));
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()),
transportService.getThreadPool()::relativeTimeInMillis);
if (DiscoveryNode.isMasterNode(settings) == false) {
if (DiscoveryNode.isDataNode(settings)) {
// Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
*/
package org.elasticsearch.gateway;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;

import java.util.ArrayList;
Expand All @@ -33,11 +39,17 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongSupplier;

/**
* Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata).
*/
class IncrementalClusterStateWriter {
public class IncrementalClusterStateWriter {

private static final Logger logger = LogManager.getLogger(IncrementalClusterStateWriter.class);

public static final Setting<TimeValue> SLOW_WRITE_LOGGING_THRESHOLD = Setting.timeSetting("gateway.slow_write_logging_threshold",
TimeValue.timeValueSeconds(10), TimeValue.ZERO, Setting.Property.NodeScope, Setting.Property.Dynamic);

private final MetaStateService metaStateService;

Expand All @@ -46,13 +58,24 @@ class IncrementalClusterStateWriter {
// no need to synchronize access to these fields.
private Manifest previousManifest;
private ClusterState previousClusterState;
private final LongSupplier relativeTimeMillisSupplier;
private boolean incrementalWrite;

IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) {
private volatile TimeValue slowWriteLoggingThreshold;

IncrementalClusterStateWriter(Settings settings, ClusterSettings clusterSettings, MetaStateService metaStateService, Manifest manifest,
ClusterState clusterState, LongSupplier relativeTimeMillisSupplier) {
this.metaStateService = metaStateService;
this.previousManifest = manifest;
this.previousClusterState = clusterState;
this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
this.incrementalWrite = false;
this.slowWriteLoggingThreshold = SLOW_WRITE_LOGGING_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
}

private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}

void setCurrentTerm(long currentTerm) throws WriteStateException {
Expand Down Expand Up @@ -85,14 +108,26 @@ void setIncrementalWrite(boolean incrementalWrite) {
void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException {
MetaData newMetaData = newState.metaData();

final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();

final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest);
long globalStateGeneration = writeGlobalState(writer, newMetaData);
Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState, previousState);
Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations);
writeManifest(writer, manifest);

previousManifest = manifest;
previousClusterState = newState;

final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
final TimeValue finalSlowWriteLoggingThreshold = this.slowWriteLoggingThreshold;
if (durationMillis >= finalSlowWriteLoggingThreshold.getMillis()) {
logger.warn("writing cluster state took [{}ms] which is above the warn threshold of [{}]; " +
"wrote metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, finalSlowWriteLoggingThreshold, writer.getIndicesWritten(), writer.getIndicesSkipped());
} else {
logger.debug("writing cluster state took [{}ms]; wrote metadata for [{}] indices and skipped [{}] unchanged indices",
durationMillis, writer.getIndicesWritten(), writer.getIndicesSkipped());
}
}

private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException {
Expand Down Expand Up @@ -256,6 +291,9 @@ static class AtomicClusterStateWriter {
private final MetaStateService metaStateService;
private boolean finished;

private int indicesWritten;
private int indicesSkipped;

AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) {
this.metaStateService = metaStateService;
assert previousManifest != null;
Expand Down Expand Up @@ -320,6 +358,22 @@ void rollback() {
rollbackCleanupActions.forEach(Runnable::run);
finished = true;
}

void incrementIndicesWritten() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we can just make the fields package-visible and operate directly on them, removing getter and setters here, given that their are only used within IncrementalClusterStateWriter. Reduces the amount of clutter and avoids the extra test provisions for the interaction mocking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserting on the interactions with the increment* methods is deliberate; it would add a good deal more noise to check that they're being called in a different way.

indicesWritten++;
}

void incrementIndicesSkipped() {
indicesSkipped++;
}

int getIndicesWritten() {
return indicesWritten;
}

int getIndicesSkipped() {
return indicesSkipped;
}
}

static class KeepPreviousGeneration implements IndexMetaDataAction {
Expand All @@ -338,6 +392,7 @@ public Index getIndex() {

@Override
public long execute(AtomicClusterStateWriter writer) {
writer.incrementIndicesSkipped();
return generation;
}
}
Expand All @@ -356,6 +411,7 @@ public Index getIndex() {

@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
writer.incrementIndicesWritten();
return writer.writeIndex("freshly created", indexMetaData);
}
}
Expand All @@ -376,6 +432,7 @@ public Index getIndex() {

@Override
public long execute(AtomicClusterStateWriter writer) throws WriteStateException {
writer.incrementIndicesWritten();
return writer.writeIndex(
"version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]",
newIndexMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,35 @@
*/
package org.elasticsearch.gateway;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
Expand All @@ -48,15 +57,18 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class IncrementalClusterStateWriterTests extends ESAllocationTestCase {
Expand Down Expand Up @@ -250,20 +262,28 @@ public void testResolveStatesToBeWritten() throws WriteStateException {

assertThat(actions, hasSize(3));

boolean keptPreviousGeneration = false;
boolean wroteNewIndex = false;
boolean wroteChangedIndex = false;

for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) {
if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) {
assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex()));
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
assertThat(action.execute(writer), equalTo(3L));
verifyZeroInteractions(writer);
verify(writer, times(1)).incrementIndicesSkipped();
verifyNoMoreInteractions(writer);
keptPreviousGeneration = true;
}
if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) {
assertThat(action.getIndex(), equalTo(newIndex.getIndex()));
IncrementalClusterStateWriter.AtomicClusterStateWriter writer
= mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class);
when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L);
assertThat(action.execute(writer), equalTo(0L));
verify(writer, times(1)).incrementIndicesWritten();
wroteNewIndex = true;
}
if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) {
assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex()));
Expand All @@ -273,10 +293,16 @@ public void testResolveStatesToBeWritten() throws WriteStateException {
assertThat(action.execute(writer), equalTo(3L));
ArgumentCaptor<String> reason = ArgumentCaptor.forClass(String.class);
verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex));
verify(writer, times(1)).incrementIndicesWritten();
assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion())));
assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion())));
wroteChangedIndex = true;
}
}

assertTrue(keptPreviousGeneration);
assertTrue(wroteNewIndex);
assertTrue(wroteChangedIndex);
}

private static class MetaStateServiceWithFailures extends MetaStateService {
Expand Down Expand Up @@ -426,4 +452,84 @@ public void testAtomicityWithFailures() throws IOException {
assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData)));
}
}

@TestLogging(value = "org.elasticsearch.gateway:WARN", reason = "to ensure that we log gateway events on WARN level")
public void testSlowLogging() throws WriteStateException, IllegalAccessException {
final long slowWriteLoggingThresholdMillis;
final Settings settings;
if (randomBoolean()) {
slowWriteLoggingThresholdMillis = IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.get(Settings.EMPTY).millis();
settings = Settings.EMPTY;
} else {
slowWriteLoggingThresholdMillis = randomLongBetween(2, 100000);
settings = Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), slowWriteLoggingThresholdMillis + "ms")
.build();
}

final DiscoveryNode localNode = newNode("node");
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId())).build();

final long startTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - slowWriteLoggingThresholdMillis * 10);
final AtomicLong currentTime = new AtomicLong(startTimeMillis);
final AtomicLong writeDurationMillis = new AtomicLong(slowWriteLoggingThresholdMillis);

final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(settings, clusterSettings, mock(MetaStateService.class),
new Manifest(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), Collections.emptyMap()),
clusterState, () -> currentTime.getAndAdd(writeDurationMillis.get()));

assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));

writeDurationMillis.set(randomLongBetween(slowWriteLoggingThresholdMillis, slowWriteLoggingThresholdMillis * 2));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning above threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));

writeDurationMillis.set(randomLongBetween(1, slowWriteLoggingThresholdMillis - 1));
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.UnseenEventExpectation(
"should not see warning below threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"*"));

clusterSettings.applySettings(Settings.builder()
.put(IncrementalClusterStateWriter.SLOW_WRITE_LOGGING_THRESHOLD.getKey(), writeDurationMillis.get() + "ms")
.build());
assertExpectedLogs(clusterState, incrementalClusterStateWriter, new MockLogAppender.SeenEventExpectation(
"should see warning at reduced threshold",
IncrementalClusterStateWriter.class.getCanonicalName(),
Level.WARN,
"writing cluster state took [*] which is above the warn threshold of [*]; " +
"wrote metadata for [0] indices and skipped [0] unchanged indices"));

assertThat(currentTime.get(), lessThan(startTimeMillis + 10 * slowWriteLoggingThresholdMillis)); // ensure no overflow
}

private void assertExpectedLogs(ClusterState clusterState, IncrementalClusterStateWriter incrementalClusterStateWriter,
MockLogAppender.LoggingExpectation expectation) throws IllegalAccessException, WriteStateException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class);
Loggers.addAppender(classLogger, mockAppender);

try {
incrementalClusterStateWriter.updateClusterState(clusterState, clusterState);
} finally {
Loggers.removeAppender(classLogger, mockAppender);
mockAppender.stop();
}
mockAppender.assertAllExpectationsMatched();
}
}
Loading