Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136929.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136929
summary: Handle indices with zero/missing uptime correctly in write-load calculation
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
maxShardUptimeInMillis = Math.max(maxShardUptimeInMillis, shardUptimeInMillis);
}
}
// An index only contributes to the weighted average proportionally to its uptime
if (totalShardUptimeInMillis == 0) {
continue;
}
double weightedAverageShardWriteLoad = totalShardWriteLoad / totalShardUptimeInMillis;
double totalIndexWriteLoad = weightedAverageShardWriteLoad * writeLoad.numberOfShards();
// We need to weight the contribution from each index somehow, but we only know
Expand All @@ -171,6 +175,8 @@ static OptionalDouble forecastIndexWriteLoad(List<IndexWriteLoad> indicesWriteLo
// that index. It should be safe to extrapolate our weighted average out to the
// maximum uptime observed, based on the assumption that write-load is roughly
// evenly distributed across shards of a datastream index.
assert Double.isFinite(totalIndexWriteLoad) : "Invalid total index write load: " + totalIndexWriteLoad;
assert maxShardUptimeInMillis > 0 : "Invalid max shard uptime in millis: " + maxShardUptimeInMillis;
allIndicesWriteLoad += totalIndexWriteLoad * maxShardUptimeInMillis;
allIndicesUptime += maxShardUptimeInMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@
import org.junit.Before;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.xpack.writeloadforecaster.LicensedWriteLoadForecaster.forecastIndexWriteLoad;
import static org.hamcrest.Matchers.closeTo;
Expand All @@ -47,6 +51,8 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notANumber;

public class LicensedWriteLoadForecasterTests extends ESTestCase {
ThreadPool threadPool;
Expand Down Expand Up @@ -294,14 +300,18 @@ public void testWriteLoadForecast() {
}

private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) {
return randomIndexWriteLoad(numberOfShards, () -> randomLongBetween(1, 10));
}

private IndexWriteLoad randomIndexWriteLoad(int numberOfShards, LongSupplier uptimeSupplier) {
IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards);
for (int shardId = 0; shardId < numberOfShards; shardId++) {
builder.withShardWriteLoad(
shardId,
randomDoubleBetween(0, 64, true),
randomDoubleBetween(0, 64, true),
randomDoubleBetween(0, 64, true),
randomLongBetween(1, 10)
uptimeSupplier.getAsLong()
);
}
return builder.build();
Expand Down Expand Up @@ -445,6 +455,71 @@ private void testShardChangeDoesNotChangeTotalForecastLoad(ShardCountChange shar
);
}

public void testCanHandleIndicesWithMissingShardWriteLoadsOrZeroUptime() {
final TimeValue maxIndexAge = TimeValue.timeValueDays(7);
final WriteLoadForecaster writeLoadForecaster = new LicensedWriteLoadForecaster(() -> true, threadPool, maxIndexAge);
writeLoadForecaster.refreshLicense();

final ProjectMetadata.Builder metadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault());
final String dataStreamName = "logs-es";
final int numberOfBackingIndices = 10;
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfIndicesWithZeroUptime = randomBoolean()
? numberOfBackingIndices
: randomIntBetween(1, numberOfBackingIndices - 1);
final Set<Integer> indicesWithZeroUptime = new HashSet<>(
randomSubsetOf(numberOfIndicesWithZeroUptime, IntStream.range(0, numberOfBackingIndices).boxed().collect(Collectors.toSet()))
);
logger.info(
"--> indices with zero uptime: {}/{} ({})",
numberOfIndicesWithZeroUptime,
numberOfBackingIndices,
indicesWithZeroUptime
);
final List<Index> backingIndices = new ArrayList<>();
for (int i = 0; i < numberOfBackingIndices; i++) {
final IndexMetadata indexMetadata = createIndexMetadata(
DataStream.getDefaultBackingIndexName(dataStreamName, i),
numberOfShards,
indicesWithZeroUptime.contains(i)
? indexWriteLoadWithMissingOrZeroUptime(numberOfShards)
: randomIndexWriteLoad(numberOfShards),
System.currentTimeMillis() - (maxIndexAge.millis() / 2)
);
backingIndices.add(indexMetadata.getIndex());
metadataBuilder.put(indexMetadata, false);
}

final IndexMetadata writeIndexMetadata = createIndexMetadata(
DataStream.getDefaultBackingIndexName(dataStreamName, numberOfBackingIndices),
numberOfShards,
null,
System.currentTimeMillis()
);
backingIndices.add(writeIndexMetadata.getIndex());
metadataBuilder.put(writeIndexMetadata, false);

final DataStream dataStream = createDataStream(dataStreamName, backingIndices);
metadataBuilder.put(dataStream);

final ProjectMetadata.Builder updatedMetadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(
dataStream.getName(),
metadataBuilder
);
final IndexMetadata writeIndex = updatedMetadataBuilder.getSafe(dataStream.getWriteIndex());
final OptionalDouble forecastedWriteLoad = writeLoadForecaster.getForecastedWriteLoad(writeIndex);

final boolean someIndicesHadUptime = numberOfIndicesWithZeroUptime != numberOfBackingIndices;
assertThat(forecastedWriteLoad.isPresent(), is(someIndicesHadUptime));
if (someIndicesHadUptime) {
assertThat(forecastedWriteLoad.getAsDouble(), not(notANumber()));
}
}

private IndexWriteLoad indexWriteLoadWithMissingOrZeroUptime(int numShards) {
return randomBoolean() ? IndexWriteLoad.builder(numShards).build() : randomIndexWriteLoad(numShards, () -> 0);
}

public enum ShardCountChange implements IntToIntFunction {
INCREASE(1, 15) {
@Override
Expand Down