Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a java level freeze/unfreeze API #35353

Merged
merged 12 commits into from
Nov 10, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -59,6 +63,8 @@
* stats in order to obtain the number of reopens.
*/
public final class FrozenEngine extends ReadOnlyEngine {
public static final Setting<Boolean> INDEX_FROZEN = Setting.boolSetting("index.frozen", false, Setting.Property.IndexScope,
Setting.Property.PrivateIndex);
private volatile DirectoryReader lastOpenedReader;

public FrozenEngine(EngineConfig config) {
Expand Down Expand Up @@ -232,6 +238,45 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) {
return null;
}

/*
* We register this listener for a frozen index that will
* 1. reset the reader every time the search context is validated which happens when the context is looked up ie. on a fetch phase
* etc.
* 2. register a releasable resource that is cleaned after each phase that releases the reader for this searcher
*/
public static class ReacquireEngineSearcherListener implements SearchOperationListener {

@Override
public void validateSearchContext(SearchContext context, TransportRequest transportRequest) {
Searcher engineSearcher = context.searcher().getEngineSearcher();
LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader());
if (lazyDirectoryReader != null) {
try {
lazyDirectoryReader.reset();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
// also register a release resource in this case if we have multiple roundtrips like in DFS
onNewContext(context);
s1monw marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onNewContext(SearchContext context) {
Searcher engineSearcher = context.searcher().getEngineSearcher();
context.addReleasable(() -> {
LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader());
if (lazyDirectoryReader != null) {
try {
lazyDirectoryReader.release();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}, SearchContext.Lifetime.PHASE);
}
}

/**
* This class allows us to use the same high level reader across multiple search phases but replace the underpinnings
* on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
package org.elasticsearch.xpack.core;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.license.LicensingClient;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
Expand Down Expand Up @@ -103,4 +106,8 @@ public XPackInfoRequestBuilder prepareInfo() {
public void info(XPackInfoRequest request, ActionListener<XPackInfoResponse> listener) {
client.execute(XPackInfoAction.INSTANCE, request, listener);
}

public void freeze(TransportFreezeIndexAction.FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
client.execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.FrozenEngine;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.license.Licensing;
Expand All @@ -55,6 +57,7 @@
import org.elasticsearch.snapshots.SourceOnlySnapshotRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
Expand Down Expand Up @@ -266,6 +269,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class));
actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class));
actions.add(new ActionHandler<>(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
TransportFreezeIndexAction.class));
actions.addAll(licensing.getActions());
return actions;
}
Expand Down Expand Up @@ -359,14 +364,26 @@ public Map<String, Repository.Factory> getRepositories(Environment env, NamedXCo
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) {
return Optional.of(SourceOnlySnapshotRepository.getEngineFactory());
} else if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) {
return Optional.of(FrozenEngine::new);
}

return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = super.getSettings();
settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY);
settings.add(FrozenEngine.INDEX_FROZEN);
return settings;
}

@Override
public void onIndexModule(IndexModule indexModule) {
if (FrozenEngine.INDEX_FROZEN.get(indexModule.getSettings())) {
indexModule.addSearchOperationListener(new FrozenEngine.ReacquireEngineSearcherListener());
}
super.onIndexModule(indexModule);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.FrozenEngine;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

public final class TransportFreezeIndexAction extends
TransportMasterNodeAction<TransportFreezeIndexAction.FreezeRequest, AcknowledgedResponse> {

private final DestructiveOperations destructiveOperations;

@Inject
public TransportFreezeIndexAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
DestructiveOperations destructiveOperations) {
super(FreezeIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
FreezeRequest::new);
this.destructiveOperations = destructiveOperations;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected void doExecute(Task task, FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
destructiveOperations.failDestructive(request.indices());
super.doExecute(task, request, listener);
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

@Override
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
throw new ResourceNotFoundException("index not found");
}

clusterService.submitStateUpdateTask("toggle-frozen-settings",
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, listener) {
@Override
public ClusterState execute(final ClusterState currentState) {
final MetaData.Builder builder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
for (Index index : concreteIndices) {
IndexMetaData meta = currentState.metaData().index(index);
s1monw marked this conversation as resolved.
Show resolved Hide resolved
if (meta.getState() != IndexMetaData.State.CLOSE) {
throw new IllegalStateException("index [" + index.getName() + "] is not closed");
}
final IndexMetaData.Builder imdBuilder = IndexMetaData.builder(meta);
final Settings.Builder settingsBuilder =
Settings.builder()
.put(currentState.metaData().index(index).getSettings())
.put("index.blocks.write", request.freeze())
.put(FrozenEngine.INDEX_FROZEN.getKey(), request.freeze())
.put(IndexSettings.INDEX_SEARCH_THROTTLED.getKey(), request.freeze());
if (request.freeze()) {
blocks.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK);
// we never remove this block when unfreeze for now. we don't know if it was read-only
s1monw marked this conversation as resolved.
Show resolved Hide resolved
}
imdBuilder.settings(settingsBuilder);
builder.put(imdBuilder.build(), true);
}
return ClusterState.builder(currentState).blocks(blocks).metaData(builder).build();
}

@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}
});
}

@Override
protected ClusterBlockException checkBlock(FreezeRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, request));
}

public static class FreezeIndexAction extends Action<AcknowledgedResponse> {

public static final FreezeIndexAction INSTANCE = new FreezeIndexAction();
public static final String NAME = "indices:admin/freeze";

private FreezeIndexAction() {
super(NAME);
}

@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
}

public static class FreezeRequest extends AcknowledgedRequest<FreezeRequest>
implements IndicesRequest.Replaceable {
private OpenIndexRequest openIndexRequest;
s1monw marked this conversation as resolved.
Show resolved Hide resolved
private boolean freeze = true;

public FreezeRequest() {
openIndexRequest = new OpenIndexRequest();
}

s1monw marked this conversation as resolved.
Show resolved Hide resolved
public FreezeRequest(String... indices) {
openIndexRequest = new OpenIndexRequest(indices);
}

@Override
public ActionRequestValidationException validate() {
return openIndexRequest.validate();
}

public void setFreeze(boolean freeze) {
this.freeze = freeze;
}

public boolean freeze() {
return freeze;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
openIndexRequest = new OpenIndexRequest();
openIndexRequest.readFrom(in);
freeze = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
openIndexRequest.writeTo(out);
out.writeBoolean(freeze);
}

@Override
public String[] indices() {
return openIndexRequest.indices();
}

@Override
public IndicesOptions indicesOptions() {
return openIndexRequest.indicesOptions();
}

@Override
public IndicesRequest indices(String... indices) {
openIndexRequest.indices(indices);
return this;
}

public FreezeRequest indicesOptions(IndicesOptions indicesOptions) {
openIndexRequest.indicesOptions(indicesOptions);
return this;
}
}
}