Skip to content

Commit

Permalink
Fix autoscaling of follower data streams (#83302) (#83349)
Browse files Browse the repository at this point in the history
The presence of follower data streams would cause autoscaling capacity
calculation to fail.

Closes #82857
  • Loading branch information
henningandersen committed Feb 1, 2022
1 parent 18c5b25 commit 6fa10c2
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 7 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/83302.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 83302
summary: Fix autoscaling of follower data streams
area: Autoscaling
type: bug
issues:
- 82857
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ public boolean isSystem() {
public DataStream rollover(Index writeIndex, long generation) {
ensureNotReplicated();

return unsafeRollover(writeIndex, generation);
}

/**
* Like {@link #rollover(Index, long)}, but does no validation, use with care only.
*/
public DataStream unsafeRollover(Index writeIndex, long generation) {
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(writeIndex);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, false, system);
Expand All @@ -189,6 +196,13 @@ public DataStream rollover(Index writeIndex, long generation) {
*/
public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata, Version minNodeVersion) {
ensureNotReplicated();
return unsafeNextWriteIndexAndGeneration(clusterMetadata, minNodeVersion);
}

/**
* Like {@link #nextWriteIndexAndGeneration(Metadata, Version)}, but does no validation, use with care only.
*/
public Tuple<String, Long> unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata, Version minNodeVersion) {
String newWriteIndexName;
long generation = this.generation;
long currentTimeMillis = timeProvider.getAsLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,18 @@ public static DataStream newInstance(
long generation,
Map<String, Object> metadata
) {
return new DataStream(name, timeStampField, indices, generation, metadata, false, false, false);
return newInstance(name, timeStampField, indices, generation, metadata, false);
}

public static DataStream newInstance(
String name,
DataStream.TimestampField timeStampField,
List<Index> indices,
long generation,
Map<String, Object> metadata,
boolean replicated
) {
return new DataStream(name, timeStampField, indices, generation, metadata, false, replicated, false);
}

public static IndexMetadata.Builder createFirstBackingIndex(String dataStreamName) {
Expand Down Expand Up @@ -194,6 +205,15 @@ public static ClusterState getClusterStateWithDataStreams(
List<Tuple<String, Integer>> dataStreams,
List<String> indexNames,
int replicas
) {
return getClusterStateWithDataStreams(dataStreams, indexNames, replicas, false);
}

public static ClusterState getClusterStateWithDataStreams(
List<Tuple<String, Integer>> dataStreams,
List<String> indexNames,
int replicas,
boolean replicated
) {
Metadata.Builder builder = Metadata.builder();

Expand All @@ -210,7 +230,8 @@ public static ClusterState getClusterStateWithDataStreams(
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
dsTuple.v2(),
null
null,
replicated
);
builder.put(ds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,11 @@ private SingleForecast forecast(Metadata metadata, IndexAbstraction.DataStream s
DataStream dataStream = stream.getDataStream();
for (int i = 0; i < numberNewIndices; ++i) {
final String uuid = UUIDs.randomBase64UUID();
final Tuple<String, Long> dummyRolledDatastream = dataStream.nextWriteIndexAndGeneration(state.metadata(), Version.CURRENT);
dataStream = dataStream.rollover(new Index(dummyRolledDatastream.v1(), uuid), dummyRolledDatastream.v2());
final Tuple<String, Long> rolledDataStreamInfo = dataStream.unsafeNextWriteIndexAndGeneration(
state.metadata(),
Version.CURRENT
);
dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2());

// this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices
// not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void testScale() {
ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
org.elasticsearch.core.List.of(Tuple.tuple("test", between(1, 10))),
org.elasticsearch.core.List.of(),
0
0,
randomBoolean()
);
ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
IntStream.range(0, between(1, 10)).forEach(i -> ReactiveStorageDeciderServiceTests.addNode(stateBuilder));
Expand Down Expand Up @@ -162,7 +163,8 @@ public void testForecastNoDates() {
ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
org.elasticsearch.core.List.of(Tuple.tuple("test", between(1, 10))),
org.elasticsearch.core.List.of(),
between(0, 4)
between(0, 4),
randomBoolean()
);
ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
Expand Down Expand Up @@ -221,7 +223,8 @@ public void testForecast() {
ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
org.elasticsearch.core.List.of(Tuple.tuple("test", indices)),
org.elasticsearch.core.List.of(),
shardCopies - 1
shardCopies - 1,
randomBoolean()
);
ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
Expand Down

0 comments on commit 6fa10c2

Please sign in to comment.