Skip to content

Commit

Permalink
Add ShardRouting to DirectoryFactory interface (#93511)
Browse files Browse the repository at this point in the history
Some Directory factories would benefit from knowing if a 
Directory is created for a promotable and/or searchable 
shard, as well as knowing the shard id. This change adds 
the ShardRouting to the DirectoryFactory interface and 
adjusts the DirectoryWrapper accordingly (until we have a 
better way to override the default directory factory).
  • Loading branch information
tlrx committed Feb 7, 2023
1 parent d5e95a2 commit 052a4f1
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 7 deletions.
28 changes: 26 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.IndexAnalyzers;
Expand All @@ -48,6 +50,7 @@
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.indices.IndicesQueryCache;
Expand Down Expand Up @@ -148,7 +151,17 @@ public final class IndexModule {
* created by {@link org.elasticsearch.plugins.IndexStorePlugin.DirectoryFactory}.
*/
@FunctionalInterface
public interface DirectoryWrapper extends CheckedFunction<Directory, Directory, IOException> {}
public interface DirectoryWrapper {
/**
* Wrap a given {@link Directory}
*
* @param directory the {@link Directory} to wrap
* @param shardRouting the {@link ShardRouting} associated with the {@link Directory} or {@code null} is unknown
* @return a {@link Directory}
* @throws IOException
*/
Directory wrap(Directory directory, @Nullable ShardRouting shardRouting) throws IOException;
}

private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
Expand Down Expand Up @@ -575,7 +588,18 @@ private IndexStorePlugin.DirectoryFactory getDirectoryFactory(
final DirectoryWrapper directoryWrapper = this.indexDirectoryWrapper.get();
assert frozen.get() : "IndexModule configuration not frozen";
if (directoryWrapper != null) {
return (idxSettings, shardPath) -> directoryWrapper.apply(factory.newDirectory(idxSettings, shardPath));
return new IndexStorePlugin.DirectoryFactory() {
@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return newDirectory(indexSettings, shardPath, null);
}

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, ShardRouting shardRouting)
throws IOException {
return directoryWrapper.wrap(factory.newDirectory(indexSettings, shardPath, shardRouting), shardRouting);
}
};
}
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public synchronized IndexShard createShard(
warmer.warm(reader, shard, IndexService.this.indexSettings);
}
};
Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
final Directory directory = directoryFactory.newDirectory(this.indexSettings, path, routing);
store = new Store(
shardId,
this.indexSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ interface DirectoryFactory {
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;

/**
* Creates a new directory per shard. This method is called once per shard on shard creation.
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @param shardRouting the {@link ShardRouting}
* @return a new lucene directory instance
* @throws IOException if an IOException occurs while opening the directory
*/
default Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, ShardRouting shardRouting) throws IOException {
return newDirectory(indexSettings, shardPath);
}
}

/**
Expand Down
27 changes: 23 additions & 4 deletions server/src/test/java/org/elasticsearch/index/IndexModuleTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -131,6 +133,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;

public class IndexModuleTests extends ESTestCase {
Expand Down Expand Up @@ -301,8 +304,20 @@ public void testDirectoryWrapper() throws IOException {

final ShardId shardId = new ShardId(indexSettings.getIndex(), randomIntBetween(0, 5));
final Path dataPath = new NodeEnvironment.DataPath(homeDir).resolve(shardId);
final Directory directory = directoryFactory.newDirectory(indexSettings, new ShardPath(false, dataPath, dataPath, shardId));
Directory directory = directoryFactory.newDirectory(indexSettings, new ShardPath(false, dataPath, dataPath, shardId));
assertThat(directory, instanceOf(WrappedDirectory.class));
assertThat(((WrappedDirectory) directory).shardRouting, nullValue());
assertThat(directory, instanceOf(FilterDirectory.class));

final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomIdentifier(),
randomBoolean(),
ShardRoutingState.INITIALIZING
);
directory = directoryFactory.newDirectory(indexSettings, new ShardPath(false, dataPath, dataPath, shardId), shardRouting);
assertThat(directory, instanceOf(WrappedDirectory.class));
assertThat(((WrappedDirectory) directory).shardRouting, sameInstance(shardRouting));
assertThat(directory, instanceOf(FilterDirectory.class));

indexService.close("test done", false);
Expand Down Expand Up @@ -901,16 +916,20 @@ public DirectoryReader apply(DirectoryReader reader) {
}

private static final class TestDirectoryWrapper implements IndexModule.DirectoryWrapper {

@Override
public Directory apply(Directory directory) {
return new WrappedDirectory(directory);
public Directory wrap(Directory delegate, ShardRouting shardRouting) {
return new WrappedDirectory(delegate, shardRouting);
}
}

private static final class WrappedDirectory extends FilterDirectory {

protected WrappedDirectory(Directory in) {
final ShardRouting shardRouting;

protected WrappedDirectory(Directory in, ShardRouting shardRouting) {
super(in);
this.shardRouting = shardRouting;
}
}
}

0 comments on commit 052a4f1

Please sign in to comment.