Skip to content

Commit

Permalink
Fix Shared Snapshot Cache not Created in Data Path (#70362) (#70369)
Browse files Browse the repository at this point in the history
We have to use the node environment instead of the plain environment here to get the
actual data paths.
  • Loading branch information
original-brownbear committed Mar 15, 2021
1 parent 40d53d1 commit 4ed68f8
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public Collection<Object> createComponents(
if (DiscoveryNode.isDataNode(settings)) {
final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment));
this.cacheService.set(cacheService);
final FrozenCacheService frozenCacheService = new FrozenCacheService(environment, threadPool);
final FrozenCacheService frozenCacheService = new FrozenCacheService(nodeEnvironment, settings, threadPool);
this.frozenCacheService.set(frozenCacheService);
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.cache.CacheKey;
import org.elasticsearch.index.store.cache.SparseFileTracker;
Expand Down Expand Up @@ -175,9 +175,8 @@ public Iterator<Setting<?>> settings() {
private final CacheDecayTask decayTask;

@SuppressWarnings({ "unchecked", "rawtypes" })
public FrozenCacheService(Environment environment, ThreadPool threadPool) {
public FrozenCacheService(NodeEnvironment environment, Settings settings, ThreadPool threadPool) {
this.currentTimeSupplier = threadPool::relativeTimeInMillis;
final Settings settings = environment.settings();
long cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings).getBytes();
if (cacheSize == Long.MAX_VALUE) {
cacheSize = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.xpack.searchablesnapshots.preallocate.Preallocate;

import java.io.IOException;
Expand Down Expand Up @@ -46,7 +47,7 @@ public class SharedBytes extends AbstractRefCounted {

private final Path path;

SharedBytes(int numRegions, long regionSize, Environment environment) throws IOException {
SharedBytes(int numRegions, long regionSize, NodeEnvironment environment) throws IOException {
super("shared-bytes");
this.numRegions = numRegions;
this.regionSize = regionSize;
Expand Down Expand Up @@ -81,7 +82,7 @@ public class SharedBytes extends AbstractRefCounted {
}
} else {
this.fileChannel = null;
for (Path path : environment.dataFiles()) {
for (Path path : environment.nodeDataPaths()) {
Files.deleteIfExists(path.resolve(CACHE_FILE_NAME));
}
}
Expand All @@ -94,9 +95,9 @@ public class SharedBytes extends AbstractRefCounted {
* @return path for the cache file or {@code null} if none could be found
*/
@Nullable
public static Path findCacheSnapshotCacheFilePath(Environment environment, long fileSize) throws IOException {
public static Path findCacheSnapshotCacheFilePath(NodeEnvironment environment, long fileSize) throws IOException {
Path cacheFile = null;
for (Path path : environment.dataFiles()) {
for (Path path : environment.nodeDataPaths()) {
Files.createDirectories(path);
// TODO: be resilient to this check failing and try next path?
long usableSpace = Environment.getUsableSpace(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -95,9 +96,9 @@ public void testRandomReads() throws IOException {
for (Path path : environment.dataFiles()) {
Files.createDirectories(path);
}

try (
FrozenCacheService cacheService = new FrozenCacheService(environment, threadPool);
NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, environment);
FrozenCacheService cacheService = new FrozenCacheService(nodeEnvironment, settings, threadPool);
TestSearchableSnapshotDirectory directory = new TestSearchableSnapshotDirectory(cacheService, tempDir, fileInfo, fileData)
) {
directory.loadSnapshot(createRecoveryState(true), ActionListener.wrap(() -> {}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public void setUpTest() throws Exception {
threadPool = new TestThreadPool(getTestName(), SearchableSnapshots.executorBuilders());
clusterService = ClusterServiceUtils.createClusterService(threadPool, node, CLUSTER_SETTINGS);
nodeEnvironment = newNodeEnvironment();
environment = newEnvironment();
}

@After
Expand Down Expand Up @@ -146,7 +145,7 @@ protected CacheService randomCacheService() {
* @return a new {@link FrozenCacheService} instance configured with default settings
*/
protected FrozenCacheService defaultFrozenCacheService() {
return new FrozenCacheService(environment, threadPool);
return new FrozenCacheService(nodeEnvironment, Settings.EMPTY, threadPool);
}

protected FrozenCacheService randomFrozenCacheService() {
Expand All @@ -163,7 +162,7 @@ protected FrozenCacheService randomFrozenCacheService() {
if (randomBoolean()) {
cacheSettings.put(FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
}
return new FrozenCacheService(newEnvironment(cacheSettings.build()), threadPool);
return new FrozenCacheService(nodeEnvironment, cacheSettings.build(), threadPool);
}

/**
Expand All @@ -183,12 +182,11 @@ protected CacheService createCacheService(final ByteSizeValue cacheSize, final B

protected FrozenCacheService createFrozenCacheService(final ByteSizeValue cacheSize, final ByteSizeValue cacheRangeSize) {
return new FrozenCacheService(
newEnvironment(
Settings.builder()
.put(FrozenCacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(FrozenCacheService.SHARED_CACHE_RANGE_SIZE_SETTING.getKey(), cacheRangeSize)
.build()
),
nodeEnvironment,
Settings.builder()
.put(FrozenCacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(FrozenCacheService.SHARED_CACHE_RANGE_SIZE_SETTING.getKey(), cacheRangeSize)
.build(),
threadPool
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.cache.CacheKey;
Expand All @@ -21,8 +21,6 @@
import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService.CacheFileRegion;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;

Expand All @@ -38,11 +36,10 @@ public void testBasicEviction() throws IOException {
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(settings, random());
final Environment environment = TestEnvironment.newEnvironment(settings);
for (Path path : environment.dataFiles()) {
Files.createDirectories(path);
}
try (FrozenCacheService cacheService = new FrozenCacheService(environment, taskQueue.getThreadPool())) {
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
FrozenCacheService cacheService = new FrozenCacheService(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
final CacheFileRegion region0 = cacheService.get(cacheKey, 250, 0);
Expand Down Expand Up @@ -84,11 +81,10 @@ public void testAutoEviction() throws IOException {
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(settings, random());
final Environment environment = TestEnvironment.newEnvironment(settings);
for (Path path : environment.dataFiles()) {
Files.createDirectories(path);
}
try (FrozenCacheService cacheService = new FrozenCacheService(environment, taskQueue.getThreadPool())) {
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
FrozenCacheService cacheService = new FrozenCacheService(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey = generateCacheKey();
assertEquals(2, cacheService.freeRegionCount());
final CacheFileRegion region0 = cacheService.get(cacheKey, 250, 0);
Expand Down Expand Up @@ -121,11 +117,10 @@ public void testForceEviction() throws IOException {
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(settings, random());
final Environment environment = TestEnvironment.newEnvironment(settings);
for (Path path : environment.dataFiles()) {
Files.createDirectories(path);
}
try (FrozenCacheService cacheService = new FrozenCacheService(environment, taskQueue.getThreadPool())) {
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
FrozenCacheService cacheService = new FrozenCacheService(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey1 = generateCacheKey();
final CacheKey cacheKey2 = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
Expand All @@ -150,11 +145,10 @@ public void testDecay() throws IOException {
.put("path.home", createTempDir())
.build();
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(settings, random());
final Environment environment = TestEnvironment.newEnvironment(settings);
for (Path path : environment.dataFiles()) {
Files.createDirectories(path);
}
try (FrozenCacheService cacheService = new FrozenCacheService(environment, taskQueue.getThreadPool())) {
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
FrozenCacheService cacheService = new FrozenCacheService(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey1 = generateCacheKey();
final CacheKey cacheKey2 = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
Expand Down

0 comments on commit 4ed68f8

Please sign in to comment.