Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed every single index operation to not replace the index within the original request #7223

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -85,28 +86,32 @@ protected AnalyzeResponse newResponse() {
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) {
if (request.index() != null) {
request.index(state.metaData().concreteSingleIndex(request.index(), request.indicesOptions()));
protected boolean resolveIndex(AnalyzeRequest request) {
return request.index() != null;
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
if (request.concreteIndex() != null) {
return super.checkRequestBlock(state, request);
}
return null;
}

@Override
protected ShardsIterator shards(ClusterState state, AnalyzeRequest request) {
if (request.index() == null) {
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
if (request.concreteIndex() == null) {
// just execute locally....
return null;
}
return state.routingTable().index(request.index()).randomAllActiveShardsIt();
return state.routingTable().index(request.concreteIndex()).randomAllActiveShardsIt();
}

@Override
protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) throws ElasticsearchException {
protected AnalyzeResponse shardOperation(AnalyzeRequest request, ShardId shardId) throws ElasticsearchException {
IndexService indexService = null;
if (request.index() != null) {
indexService = indicesService.indexServiceSafe(request.index());
if (shardId != null) {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
}
Analyzer analyzer = null;
boolean closeAnalyzer = false;
Expand Down
Expand Up @@ -37,10 +37,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -76,14 +76,21 @@ protected String executor() {
}

@Override
protected ShardsIterator shards(ClusterState state, GetFieldMappingsIndexRequest request) {
protected boolean resolveIndex(GetFieldMappingsIndexRequest request) {
//internal action, index already resolved
return false;
}

@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
// Will balance requests between shards
return state.routingTable().index(request.index()).randomAllActiveShardsIt();
return state.routingTable().index(request.concreteIndex()).randomAllActiveShardsIt();
}

@Override
protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexRequest request, int shardId) throws ElasticsearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexRequest request, ShardId shardId) throws ElasticsearchException {
assert shardId != null;
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Collection<String> typeIntersection;
if (request.types().length == 0) {
typeIntersection = indexService.mapperService().types();
Expand All @@ -98,7 +105,7 @@ public boolean apply(String type) {

});
if (typeIntersection.isEmpty()) {
throw new TypeMissingException(new Index(request.index()), request.types());
throw new TypeMissingException(shardId.index(), request.types());
}
}

Expand All @@ -111,7 +118,7 @@ public boolean apply(String type) {
}
}

return new GetFieldMappingsResponse(ImmutableMap.of(request.index(), typeMappings.immutableMap()));
return new GetFieldMappingsResponse(ImmutableMap.of(shardId.getIndex(), typeMappings.immutableMap()));
}

@Override
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -51,14 +51,9 @@
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -201,39 +196,38 @@ public void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bulk
}

private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
ClusterState clusterState = clusterService.state();
final ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);

final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState.metaData());
MetaData metaData = clusterState.metaData();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
String aliasOrIndex = indexRequest.index();
indexRequest.index(clusterState.metaData().concreteSingleIndex(indexRequest.index(), indexRequest.indicesOptions()));

String concreteIndex = concreteIndices.resolveIfAbsent(indexRequest.index(), indexRequest.indicesOptions());
MappingMetaData mappingMd = null;
if (metaData.hasIndex(indexRequest.index())) {
mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type());
if (metaData.hasIndex(concreteIndex)) {
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
}
try {
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
} catch (ElasticsearchParseException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
concreteIndices.resolveIfAbsent(deleteRequest.index(), deleteRequest.indicesOptions());
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
deleteRequest.index(clusterState.metaData().concreteSingleIndex(deleteRequest.index(), deleteRequest.indicesOptions()));
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
concreteIndices.resolveIfAbsent(updateRequest.index(), updateRequest.indicesOptions());
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index()));
updateRequest.index(clusterState.metaData().concreteSingleIndex(updateRequest.index(), updateRequest.indicesOptions()));
}
}

Expand All @@ -244,7 +238,8 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
String concreteIndex = concreteIndices.getConcreteIndex(indexRequest.index());
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
Expand All @@ -253,10 +248,11 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
list.add(new BulkItemRequest(i, request));
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
MappingMetaData mappingMd = clusterState.metaData().index(deleteRequest.index()).mappingOrDefault(deleteRequest.type());
String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index());
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(deleteRequest.type());
if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) {
// if routing is required, and no routing on the delete request, we need to broadcast it....
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, deleteRequest.index());
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, concreteIndex);
for (ShardIterator shardIt : groupShards) {
List<BulkItemRequest> list = requestsByShard.get(shardIt.shardId());
if (list == null) {
Expand All @@ -266,7 +262,7 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
list.add(new BulkItemRequest(i, new DeleteRequest(deleteRequest)));
}
} else {
ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
Expand All @@ -276,11 +272,12 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
}
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
MappingMetaData mappingMd = clusterState.metaData().index(updateRequest.index()).mappingOrDefault(updateRequest.type());
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index());
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type());
if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) {
continue; // What to do?
}
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, updateRequest.index(), updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
Expand Down Expand Up @@ -323,15 +320,15 @@ public void onFailure(Throwable e) {
if (request.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message, status)));
new BulkItemResponse.Failure(concreteIndices.getConcreteIndex(indexRequest.index()), indexRequest.type(), indexRequest.id(), message, status)));
} else if (request.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message, status)));
new BulkItemResponse.Failure(concreteIndices.getConcreteIndex(deleteRequest.index()), deleteRequest.type(), deleteRequest.id(), message, status)));
} else if (request.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message, status)));
new BulkItemResponse.Failure(concreteIndices.getConcreteIndex(updateRequest.index()), updateRequest.type(), updateRequest.id(), message, status)));
}
}
if (counter.decrementAndGet() == 0) {
Expand All @@ -345,4 +342,27 @@ private void finishHim() {
});
}
}

private static class ConcreteIndices {

private final Map<String, String> indices = new HashMap<>();
private final MetaData metaData;

ConcreteIndices(MetaData metaData) {
this.metaData = metaData;
}

String getConcreteIndex(String indexOrAlias) {
return indices.get(indexOrAlias);
}

String resolveIfAbsent(String indexOrAlias, IndicesOptions indicesOptions) {
String concreteIndex = indices.get(indexOrAlias);
if (concreteIndex == null) {
concreteIndex = metaData.concreteSingleIndex(indexOrAlias, indicesOptions);
indices.put(indexOrAlias, concreteIndex);
}
return concreteIndex;
}
}
}