Skip to content

Commit

Permalink
Add safeguard limits for file cache during node level allocation (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#8208)

Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
Signed-off-by: sahil buddharaju <sahilbud@amazon.com>
  • Loading branch information
kotwanikunal authored and sahil buddharaju committed Jul 18, 2023
1 parent 1e78dfa commit 837f348
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipelines] Pass pipeline creation context to processor factories ([#8164](https://github.com/opensearch-project/OpenSearch/pull/8164))
- Enabling compression levels for zstd and zstd_no_dict ([#8312](https://github.com/opensearch-project/OpenSearch/pull/8312))
- Optimize Metadata build() to skip redundant computations as part of ClusterState build ([#7853](https://github.com/opensearch-project/OpenSearch/pull/7853))
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
Expand Down Expand Up @@ -192,6 +193,11 @@ public void testClusterInfoServiceCollectsInformation() {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
}

final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is empty on non search nodes", nodeFileCacheStats.size(), Matchers.equalTo(0));

ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getClusterManagerName());
ClusterState state = clusterService.state();
for (ShardRouting shard : state.routingTable().allShards()) {
Expand All @@ -209,6 +215,28 @@ public void testClusterInfoServiceCollectsInformation() {
}
}

public void testClusterInfoServiceCollectsFileCacheInformation() {
internalCluster().startNodes(1);
internalCluster().ensureAtLeastNumSearchAndDataNodes(2);

InternalTestCluster internalTestCluster = internalCluster();
// Get the cluster info service on the cluster-manager node
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(
ClusterInfoService.class,
internalTestCluster.getClusterManagerName()
);
infoService.setUpdateFrequency(TimeValue.timeValueMillis(200));
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, FileCacheStats> nodeFileCacheStats = info.nodeFileCacheStats;
assertNotNull(nodeFileCacheStats);
assertThat("file cache is enabled on both search nodes", nodeFileCacheStats.size(), Matchers.equalTo(2));

for (FileCacheStats fileCacheStats : nodeFileCacheStats.values()) {
assertThat("file cache is non empty", fileCacheStats.getTotal().getBytes(), greaterThan(0L));
}
}

public void testClusterInfoServiceInformationClearOnError() {
internalCluster().startNodes(
2,
Expand Down
24 changes: 22 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.opensearch.Version;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -63,9 +65,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
public static final ClusterInfo EMPTY = new ClusterInfo();
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -83,13 +86,15 @@ public ClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes,
final Map<ShardRouting, String> routingToDataPath,
final Map<NodeAndPath, ReservedSpace> reservedSpace
final Map<NodeAndPath, ReservedSpace> reservedSpace,
final Map<String, FileCacheStats> nodeFileCacheStats
) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -105,6 +110,11 @@ public ClusterInfo(StreamInput in) throws IOException {
this.shardSizes = Collections.unmodifiableMap(sizeMap);
this.routingToDataPath = Collections.unmodifiableMap(routingMap);
this.reservedSpace = Collections.unmodifiableMap(reservedSpaceMap);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.nodeFileCacheStats = in.readMap(StreamInput::readString, FileCacheStats::new);
} else {
this.nodeFileCacheStats = Map.of();
}
}

@Override
Expand All @@ -114,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.shardSizes, StreamOutput::writeString, (o, v) -> out.writeLong(v == null ? -1 : v));
out.writeMap(this.routingToDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString);
out.writeMap(this.reservedSpace, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeMap(this.nodeFileCacheStats, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down Expand Up @@ -187,6 +200,13 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return Collections.unmodifiableMap(this.mostAvailableSpaceUsage);
}

/**
* Returns a node id to file cache stats mapping for the nodes that have search roles assigned to it.
*/
public Map<String, FileCacheStats> getNodeFileCacheStats() {
return Collections.unmodifiableMap(this.nodeFileCacheStats);
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ReceiveTimeoutTransportException;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand Down Expand Up @@ -110,6 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt

private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, FileCacheStats> nodeFileCacheStats;
private volatile IndicesStatsSummary indicesStatsSummary;
// null if this node is not currently the cluster-manager
private final AtomicReference<RefreshAndRescheduleRunnable> refreshAndRescheduleRunnable = new AtomicReference<>();
Expand All @@ -122,6 +125,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodeFileCacheStats = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
Expand Down Expand Up @@ -208,7 +212,8 @@ public ClusterInfo getClusterInfo() {
mostAvailableSpaceUsages,
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardRoutingToDataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodeFileCacheStats
);
}

Expand All @@ -221,6 +226,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FS.metricName());
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.FILE_CACHE_STATS.metricName());
nodesStatsRequest.timeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
Expand Down Expand Up @@ -264,6 +270,13 @@ public void onResponse(NodesStatsResponse nodesStatsResponse) {
);
leastAvailableSpaceUsages = Collections.unmodifiableMap(leastAvailableUsagesBuilder);
mostAvailableSpaceUsages = Collections.unmodifiableMap(mostAvailableUsagesBuilder);

nodeFileCacheStats = Collections.unmodifiableMap(
nodesStatsResponse.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isSearchNode())
.collect(Collectors.toMap(nodeStats -> nodeStats.getNode().getId(), NodeStats::getFileCacheStats))
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -167,6 +174,42 @@ public static long sizeOfRelocatingShards(
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();

/*
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
final long currentNodeRemoteShardSize = remoteShardsOnNode.stream()
.map(ShardRouting::getExpectedShardSize)
.mapToLong(Long::longValue)
.sum();

final long shardSize = getExpectedShardSize(
shardRouting,
0L,
allocation.clusterInfo(),
allocation.snapshotShardSizeInfo(),
allocation.metadata(),
allocation.routingTable()
);

final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
"file cache limit reached - remote shard size will exceed configured safeguard ratio"
);
}
return Decision.YES;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
Expand Down Expand Up @@ -422,6 +465,15 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}

/*
The following block prevents movement for remote shards since they do not use the local storage as
the primary source of data storage.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.ALWAYS;
}

final ClusterInfo clusterInfo = allocation.clusterInfo();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
Expand All @@ -49,7 +50,8 @@ public void testSerialization() throws Exception {
randomDiskUsage(),
randomShardSizes(),
randomRoutingToDataPath(),
randomReservedSpace()
randomReservedSpace(),
randomFileCacheStats()
);
BytesStreamOutput output = new BytesStreamOutput();
clusterInfo.writeTo(output);
Expand All @@ -60,6 +62,7 @@ public void testSerialization() throws Exception {
assertEquals(clusterInfo.shardSizes, result.shardSizes);
assertEquals(clusterInfo.routingToDataPath, result.routingToDataPath);
assertEquals(clusterInfo.reservedSpace, result.reservedSpace);
assertEquals(clusterInfo.getNodeFileCacheStats().size(), result.getNodeFileCacheStats().size());
}

private static Map<String, DiskUsage> randomDiskUsage() {
Expand All @@ -79,6 +82,25 @@ private static Map<String, DiskUsage> randomDiskUsage() {
return builder;
}

private static Map<String, FileCacheStats> randomFileCacheStats() {
int numEntries = randomIntBetween(0, 16);
final Map<String, FileCacheStats> builder = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(16);
FileCacheStats fileCacheStats = new FileCacheStats(
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong(),
randomLong()
);
builder.put(key, fileCacheStats);
}
return builder;
}

private static Map<String, Long> randomShardSizes() {
int numEntries = randomIntBetween(0, 128);
final Map<String, Long> builder = new HashMap<>(numEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ private static ClusterInfo clusterInfo(
final Map<String, DiskUsage> diskUsages,
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace
) {
return new ClusterInfo(diskUsages, null, null, null, reservedSpace);
return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public DevNullClusterInfo(
final Map<String, Long> shardSizes,
final Map<NodeAndPath, ReservedSpace> reservedSpace
) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace);
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace, Map.of());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public DevNullClusterInfo(
final Map<String, DiskUsage> mostAvailableSpaceUsage,
final Map<String, Long> shardSizes
) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of());
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, Map.of(), Map.of());
}

@Override
Expand Down

0 comments on commit 837f348

Please sign in to comment.