Skip to content

Commit

Permalink
Add the possibility to inject a custom RecoveryState factory to Index…
Browse files Browse the repository at this point in the history
…StorePlugin implementations (#59124)

Add a custom factory for recovery state into IndexStorePlugin that
allows different implementors to provide its own RecoveryState
implementation.

Backport of #59038
  • Loading branch information
fcofdez committed Jul 15, 2020
1 parent bc11503 commit 66ef1cd
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_RECOVERY_TYPE_SETTING,
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
Store.FORCE_RAM_TERM_DICT,
Expand Down
30 changes: 28 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -100,9 +101,14 @@ public final class IndexModule {

private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();

private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;

public static final Setting<String> INDEX_STORE_TYPE_SETTING =
new Setting<>("index.store.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);

public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING =
new Setting<>("index.recovery.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);

/** On which extensions to load data into the file-system cache upon opening of files.
* This only works with the mmap directory, and even in that case is still
* best-effort only. */
Expand Down Expand Up @@ -134,6 +140,7 @@ public final class IndexModule {
private final IndexNameExpressionResolver expressionResolver;
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -150,7 +157,8 @@ public IndexModule(
final EngineFactory engineFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver) {
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
Expand All @@ -159,6 +167,7 @@ public IndexModule(
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
}

/**
Expand Down Expand Up @@ -410,6 +419,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
boolean success = false;
Expand All @@ -432,7 +442,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
valuesSourceRegistry);
valuesSourceRegistry, recoveryStateFactory);
success = true;
return indexService;
} finally {
Expand Down Expand Up @@ -471,6 +481,22 @@ private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
return factory;
}

private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
final IndexSettings indexSettings, final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
final String recoveryType = indexSettings.getValue(INDEX_RECOVERY_TYPE_SETTING);

if (recoveryType.isEmpty()) {
return DEFAULT_RECOVERY_STATE_FACTORY;
}

IndexStorePlugin.RecoveryStateFactory factory = recoveryStateFactories.get(recoveryType);
if (factory == null) {
throw new IllegalArgumentException("Unknown recovery type [" + recoveryType + "]");
}

return factory;
}

/**
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
* doing so will result in an exception.
Expand Down
11 changes: 10 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
Expand Down Expand Up @@ -115,6 +117,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
private final MapperService mapperService;
Expand Down Expand Up @@ -175,7 +178,8 @@ public IndexService(
BooleanSupplier idFieldDataEnabled,
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry) {
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -224,6 +228,7 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.readerWrapper = wrapperFactory.apply(this);
Expand Down Expand Up @@ -564,6 +569,10 @@ private void onShardClose(ShardLock lock) {
}
}

public RecoveryState createRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, DiscoveryNode sourceNode) {
return recoveryStateFactory.newRecoveryState(shardRouting, targetNode, sourceNode);
}

@Override
public IndexSettings getIndexSettings() {
return indexSettings;
Expand Down
16 changes: 11 additions & 5 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;
Expand All @@ -246,7 +247,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry) {
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -292,6 +294,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
}

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -642,7 +645,7 @@ private synchronized IndexService createIndexService(IndexService.IndexCreationC
indexCreationContext);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
Expand Down Expand Up @@ -713,7 +716,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
public synchronized MapperService createIndexMapperService(IndexMetadata indexMetadata) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetadata, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
Expand Down Expand Up @@ -748,16 +751,19 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
@Override
public IndexShard createShard(
final ShardRouting shardRouting,
final RecoveryState recoveryState,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,16 +590,16 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
try {
final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(
shardRouting,
recoveryState,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
retentionLeaseSyncer);
retentionLeaseSyncer,
nodes.getLocalNode(),
sourceNode);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
}
Expand Down Expand Up @@ -889,25 +889,27 @@ U createIndex(IndexMetadata indexMetadata,
* Creates a shard for the specified shard routing and starts recovery.
*
* @param shardRouting the shard routing
* @param recoveryState the recovery state
* @param recoveryTargetService recovery service for the target
* @param recoveryListener a callback when recovery changes state (finishes or fails)
* @param repositoriesService service responsible for snapshot/restore
* @param onShardFailure a callback when this shard fails
* @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
* @param retentionLeaseSyncer a callback when this shard syncs retention leases
* @param targetNode the node where this shard will be recovered
* @param sourceNode the source node to recover this shard from (it might be null)
* @return a new shard
* @throws IOException if an I/O exception occurs when creating the shard
*/
T createShard(
ShardRouting shardRouting,
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer) throws IOException;
RetentionLeaseSyncer retentionLeaseSyncer,
DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode) throws IOException;

/**
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.
Expand Down
9 changes: 8 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ protected Node(final Environment initialEnvironment,
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getRecoveryStateFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = Collections.unmodifiableMap(pluginsService
.filterPlugins(SystemIndexPlugin.class)
.stream()
Expand All @@ -487,7 +494,7 @@ protected Node(final Environment initialEnvironment,
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
searchModule.getValuesSourceRegistry());
searchModule.getValuesSourceRegistry(), recoveryStateFactories);

final AliasValidator aliasValidator = new AliasValidator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
package org.elasticsearch.plugins;

import org.apache.lucene.store.Directory;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.recovery.RecoveryState;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

/**
Expand Down Expand Up @@ -55,4 +60,26 @@ interface DirectoryFactory {
*/
Map<String, DirectoryFactory> getDirectoryFactories();

/**
* An interface that allows to create a new {@link RecoveryState} per shard.
*/
@FunctionalInterface
interface RecoveryStateFactory {
/**
* Creates a new {@link RecoveryState} per shard. This method is called once per shard on shard creation.
* @return a new RecoveryState instance
*/
RecoveryState newRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode);
}

/**
* The {@link RecoveryStateFactory} mappings for this plugin. When an index is created the recovery type setting
* {@link org.elasticsearch.index.IndexModule#INDEX_RECOVERY_TYPE_SETTING} on the index will be examined and either use the default
* or looked up among all the recovery state factories from {@link IndexStorePlugin} plugins.
*
* @return a map from recovery type to an recovery state factory
*/
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.emptyMap();
}
}

0 comments on commit 66ef1cd

Please sign in to comment.