Skip to content

Commit

Permalink
Fix autoscaling of follower data streams (#83302)
Browse files Browse the repository at this point in the history
The presence of follower data streams would cause autoscaling capacity
calculation to fail.

Closes #82857
  • Loading branch information
henningandersen committed Jan 31, 2022
1 parent 9c992d7 commit c731fef
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/83302.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 83302
summary: Fix autoscaling of follower data streams
area: Autoscaling
type: bug
issues:
- 82857
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,13 @@ public IndexMode getIndexMode() {
public DataStream rollover(Index writeIndex, long generation) {
ensureNotReplicated();

return unsafeRollover(writeIndex, generation);
}

/**
* Like {@link #rollover(Index, long)}, but does no validation, use with care only.
*/
public DataStream unsafeRollover(Index writeIndex, long generation) {
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(writeIndex);
return new DataStream(
Expand All @@ -299,6 +306,13 @@ public DataStream rollover(Index writeIndex, long generation) {
*/
public Tuple<String, Long> nextWriteIndexAndGeneration(Metadata clusterMetadata) {
ensureNotReplicated();
return unsafeNextWriteIndexAndGeneration(clusterMetadata);
}

/**
* Like {@link #nextWriteIndexAndGeneration(Metadata)}, but does no validation, use with care only.
*/
public Tuple<String, Long> unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata) {
String newWriteIndexName;
long generation = this.generation;
long currentTimeMillis = timeProvider.getAsLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,18 @@ public static DataStream newInstance(
long generation,
Map<String, Object> metadata
) {
return new DataStream(name, timeStampField, indices, generation, metadata, false, false, false, false, null);
return newInstance(name, timeStampField, indices, generation, metadata, false);
}

public static DataStream newInstance(
String name,
DataStream.TimestampField timeStampField,
List<Index> indices,
long generation,
Map<String, Object> metadata,
boolean replicated
) {
return new DataStream(name, timeStampField, indices, generation, metadata, false, replicated, false, false, null);
}

public static String getLegacyDefaultBackingIndexName(
Expand Down Expand Up @@ -265,6 +276,17 @@ public static ClusterState getClusterStateWithDataStreams(
long currentTime,
Settings settings,
int replicas
) {
return getClusterStateWithDataStreams(dataStreams, indexNames, currentTime, settings, replicas, false);
}

public static ClusterState getClusterStateWithDataStreams(
List<Tuple<String, Integer>> dataStreams,
List<String> indexNames,
long currentTime,
Settings settings,
int replicas,
boolean replicated
) {
Metadata.Builder builder = Metadata.builder();

Expand All @@ -283,7 +305,8 @@ public static ClusterState getClusterStateWithDataStreams(
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
dsTuple.v2(),
null
null,
replicated
);
builder.put(ds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ private SingleForecast forecast(Metadata metadata, IndexAbstraction.DataStream s
DataStream dataStream = stream.getDataStream();
for (int i = 0; i < numberNewIndices; ++i) {
final String uuid = UUIDs.randomBase64UUID();
final Tuple<String, Long> dummyRolledDatastream = dataStream.nextWriteIndexAndGeneration(state.metadata());
dataStream = dataStream.rollover(new Index(dummyRolledDatastream.v1(), uuid), dummyRolledDatastream.v2());
final Tuple<String, Long> rolledDataStreamInfo = dataStream.unsafeNextWriteIndexAndGeneration(state.metadata());
dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2());

// this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices
// not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public void testScale() {
ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(Tuple.tuple("test", between(1, 10))),
List.of(),
0
System.currentTimeMillis(),
Settings.EMPTY,
0,
randomBoolean()
);
ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
IntStream.range(0, between(1, 10)).forEach(i -> ReactiveStorageDeciderServiceTests.addNode(stateBuilder));
Expand Down Expand Up @@ -161,7 +164,10 @@ public void testForecastNoDates() {
ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(Tuple.tuple("test", between(1, 10))),
List.of(),
between(0, 4)
System.currentTimeMillis(),
Settings.EMPTY,
between(0, 4),
randomBoolean()
);
ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
Expand Down Expand Up @@ -220,7 +226,10 @@ public void testForecast() {
ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams(
List.of(Tuple.tuple("test", indices)),
List.of(),
shardCopies - 1
System.currentTimeMillis(),
Settings.EMPTY,
shardCopies - 1,
randomBoolean()
);
ClusterState.Builder stateBuilder = ClusterState.builder(originalState);
stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());
Expand Down

0 comments on commit c731fef

Please sign in to comment.