Skip to content

Commit

Permalink
Fix testDataOnlyNodePersistence (#56893)
Browse files Browse the repository at this point in the history
This test failed if all 1000 top-level `rarely()` calls in the loop returned
`false`, because then we would never set the term of the persisted state. This
commit fixes this by adding an earlier call to `persistedState#setCurrentTerm`.
It also changes the test to clean up the threadpools it starts whether it
passes or fails.
  • Loading branch information
DaveCTurner committed May 18, 2020
1 parent 6d9c2c2 commit db64d2c
Showing 1 changed file with 92 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -328,95 +329,103 @@ public void testStatePersistedOnLoad() throws IOException {
}

public void testDataOnlyNodePersistence() throws Exception {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
final TransportService transportService = mock(TransportService.class);
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
when(transportService.getThreadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
final List<Closeable> cleanup = new ArrayList<>(2);

try {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
cleanup.add(gateway);
final TransportService transportService = mock(TransportService.class);
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
cleanup.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
when(transportService.getThreadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));

//generate random coordinationMetadata with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetadata coordinationMetadata;
do {
coordinationMetadata = createCoordinationMetadata(randomNonNegativeLong());
} while (coordinationMetadata.getLastAcceptedConfiguration().equals(coordinationMetadata.getLastCommittedConfiguration()));

//generate random coordinationMetadata with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetadata coordinationMetadata;
do {
coordinationMetadata = createCoordinationMetadata(randomNonNegativeLong());
} while (coordinationMetadata.getLastAcceptedConfiguration().equals(coordinationMetadata.getLastCommittedConfiguration()));
ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(coordinationMetadata)
.clusterUUID(randomAlphaOfLength(10)).build());
persistedState.setCurrentTerm(state.term());
persistedState.setLastAcceptedState(state);
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));

persistedState.markLastAcceptedStateAsCommitted();
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().coordinationMetadata(coordinationMetadata)
.clusterUUID(randomAlphaOfLength(10)).build());
persistedState.setLastAcceptedState(state);
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));

persistedState.markLastAcceptedStateAsCommitted();
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

CoordinationMetadata expectedCoordinationMetadata = CoordinationMetadata.builder(coordinationMetadata)
.lastCommittedConfiguration(coordinationMetadata.getLastAcceptedConfiguration()).build();
ClusterState expectedClusterState =
ClusterState.builder(state).metadata(Metadata.builder().coordinationMetadata(expectedCoordinationMetadata)
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
long currentTerm = state.term();
for (int i = 0; i < 1000; i++) {
if (rarely()) {
// bump term
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
persistedState.setCurrentTerm(currentTerm);
} else {
// update cluster state
final int numberOfShards = randomIntBetween(1, 5);
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i);
state = createClusterState(state.version() + 1,
Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build());
persistedState.setLastAcceptedState(state);
CoordinationMetadata expectedCoordinationMetadata = CoordinationMetadata.builder(coordinationMetadata)
.lastCommittedConfiguration(coordinationMetadata.getLastAcceptedConfiguration()).build();
ClusterState expectedClusterState =
ClusterState.builder(state).metadata(Metadata.builder().coordinationMetadata(expectedCoordinationMetadata)
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
long currentTerm = state.term();
for (int i = 0; i < 1000; i++) {
if (rarely()) {
// bump term
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
persistedState.setCurrentTerm(currentTerm);
} else {
// update cluster state
final int numberOfShards = randomIntBetween(1, 5);
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i);
state = createClusterState(state.version() + 1,
Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build());
persistedState.setLastAcceptedState(state);
}
}
}
assertEquals(currentTerm, persistedState.getCurrentTerm());
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
assertEquals(currentTerm, persistedState.getCurrentTerm());
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

gateway.close();
gateway.close();
assertTrue(cleanup.remove(gateway));

try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
reloadedPersistedState.getLastAcceptedState());
assertNotNull(reloadedPersistedState.getLastAcceptedState().metadata().index(indexName));
try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
reloadedPersistedState.getLastAcceptedState());
assertNotNull(reloadedPersistedState.getLastAcceptedState().metadata().index(indexName));
}
} finally {
IOUtils.close(cleanup);
}

ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}

public void testStatePersistenceWithIOIssues() throws IOException {
Expand Down

0 comments on commit db64d2c

Please sign in to comment.