Skip to content

Commit

Permalink
Internal: changed every single index operation to not replace the ind…
Browse files Browse the repository at this point in the history
…ex within the original request

An anti-pattern that we have in our code, noticeable for java API users, is that we modify incoming requests by replacing the index or alias with the concrete index. This way not only the request has changed, but all following communications that use that request will lose the information on whether the original request was performed against an alias or an index.

Refactored the following base classes: `TransportShardReplicationOperationAction`, `TransportShardSingleOperationAction`, `TransportSingleCustomOperationAction`, `TransportInstanceSingleOperationAction` and all subclasses by introduced an InternalRequest object that contains the original request plus additional info (e.g. the concrete index). This internal request doesn't get sent over the transport but rebuilt on each node on demand (not different to what currently happens anyway, as concrete index gets set on each node). When the request becomes a shard level request, instead of using the only int shardId we serialize the ShardId that contains both concrete index name (which might then differ ffrom the original one within the request) and shard id.

Using this pattern we can move get, multi_get, explain, analyze, term_vector, multi_term_vector, index, delete, update, bulk to not replace the index name with the concrete one within the request. The index name within the original request will stay the same.

Made it also clearer within the different transport actions when the index needs to be resolved and when that's not needed (e.g. shard level request), by exposing `resolveIndex` method. Moved check block methods to parent classes as their content was always the same on every subclass.

Improved existing tests by randomly introducing the use of an alias, and verifying that the responses always contain the concrete index name and not the original one, as that's the expected behaviour.

Added backwards compatibility tests to make sure that the change is applied in a backwards compatible manner.

Closes #7223
  • Loading branch information
javanna committed Aug 12, 2014
1 parent 371d602 commit 5d987ad
Show file tree
Hide file tree
Showing 38 changed files with 1,277 additions and 1,019 deletions.
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -85,28 +86,32 @@ protected AnalyzeResponse newResponse() {
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, AnalyzeRequest request) {
if (request.index() != null) {
request.index(state.metaData().concreteSingleIndex(request.index(), request.indicesOptions()));
protected boolean resolveIndex(AnalyzeRequest request) {
return request.index() != null;
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
if (request.concreteIndex() != null) {
return super.checkRequestBlock(state, request);
}
return null;
}

@Override
protected ShardsIterator shards(ClusterState state, AnalyzeRequest request) {
if (request.index() == null) {
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
if (request.concreteIndex() == null) {
// just execute locally....
return null;
}
return state.routingTable().index(request.index()).randomAllActiveShardsIt();
return state.routingTable().index(request.concreteIndex()).randomAllActiveShardsIt();
}

@Override
protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) throws ElasticsearchException {
protected AnalyzeResponse shardOperation(AnalyzeRequest request, ShardId shardId) throws ElasticsearchException {
IndexService indexService = null;
if (request.index() != null) {
indexService = indicesService.indexServiceSafe(request.index());
if (shardId != null) {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
}
Analyzer analyzer = null;
boolean closeAnalyzer = false;
Expand Down
Expand Up @@ -37,10 +37,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -76,14 +76,21 @@ protected String executor() {
}

@Override
protected ShardsIterator shards(ClusterState state, GetFieldMappingsIndexRequest request) {
protected boolean resolveIndex(GetFieldMappingsIndexRequest request) {
//internal action, index already resolved
return false;
}

@Override
protected ShardsIterator shards(ClusterState state, InternalRequest request) {
// Will balance requests between shards
return state.routingTable().index(request.index()).randomAllActiveShardsIt();
return state.routingTable().index(request.concreteIndex()).randomAllActiveShardsIt();
}

@Override
protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexRequest request, int shardId) throws ElasticsearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexRequest request, ShardId shardId) throws ElasticsearchException {
assert shardId != null;
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
Collection<String> typeIntersection;
if (request.types().length == 0) {
typeIntersection = indexService.mapperService().types();
Expand All @@ -98,7 +105,7 @@ public boolean apply(String type) {

});
if (typeIntersection.isEmpty()) {
throw new TypeMissingException(new Index(request.index()), request.types());
throw new TypeMissingException(shardId.index(), request.types());
}
}

Expand All @@ -111,7 +118,7 @@ public boolean apply(String type) {
}
}

return new GetFieldMappingsResponse(ImmutableMap.of(request.index(), typeMappings.immutableMap()));
return new GetFieldMappingsResponse(ImmutableMap.of(shardId.getIndex(), typeMappings.immutableMap()));
}

@Override
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -51,14 +51,9 @@
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -201,39 +196,38 @@ public void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bulk
}

private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
ClusterState clusterState = clusterService.state();
final ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);

final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState.metaData());
MetaData metaData = clusterState.metaData();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
String aliasOrIndex = indexRequest.index();
indexRequest.index(clusterState.metaData().concreteSingleIndex(indexRequest.index(), indexRequest.indicesOptions()));

String concreteIndex = concreteIndices.resolveIfAbsent(indexRequest.index(), indexRequest.indicesOptions());
MappingMetaData mappingMd = null;
if (metaData.hasIndex(indexRequest.index())) {
mappingMd = metaData.index(indexRequest.index()).mappingOrDefault(indexRequest.type());
if (metaData.hasIndex(concreteIndex)) {
mappingMd = metaData.index(concreteIndex).mappingOrDefault(indexRequest.type());
}
try {
indexRequest.process(metaData, aliasOrIndex, mappingMd, allowIdGeneration);
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
} catch (ElasticsearchParseException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex, indexRequest.type(), indexRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
concreteIndices.resolveIfAbsent(deleteRequest.index(), deleteRequest.indicesOptions());
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
deleteRequest.index(clusterState.metaData().concreteSingleIndex(deleteRequest.index(), deleteRequest.indicesOptions()));
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
concreteIndices.resolveIfAbsent(updateRequest.index(), updateRequest.indicesOptions());
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index()));
updateRequest.index(clusterState.metaData().concreteSingleIndex(updateRequest.index(), updateRequest.indicesOptions()));
}
}

Expand All @@ -244,7 +238,8 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
ActionRequest request = bulkRequest.requests.get(i);
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
String concreteIndex = concreteIndices.getConcreteIndex(indexRequest.index());
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
Expand All @@ -253,10 +248,11 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
list.add(new BulkItemRequest(i, request));
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
MappingMetaData mappingMd = clusterState.metaData().index(deleteRequest.index()).mappingOrDefault(deleteRequest.type());
String concreteIndex = concreteIndices.getConcreteIndex(deleteRequest.index());
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(deleteRequest.type());
if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) {
// if routing is required, and no routing on the delete request, we need to broadcast it....
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, deleteRequest.index());
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, concreteIndex);
for (ShardIterator shardIt : groupShards) {
List<BulkItemRequest> list = requestsByShard.get(shardIt.shardId());
if (list == null) {
Expand All @@ -266,7 +262,7 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
list.add(new BulkItemRequest(i, new DeleteRequest(deleteRequest)));
}
} else {
ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, concreteIndex, deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
Expand All @@ -276,11 +272,12 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
}
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
MappingMetaData mappingMd = clusterState.metaData().index(updateRequest.index()).mappingOrDefault(updateRequest.type());
String concreteIndex = concreteIndices.getConcreteIndex(updateRequest.index());
MappingMetaData mappingMd = clusterState.metaData().index(concreteIndex).mappingOrDefault(updateRequest.type());
if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) {
continue; // What to do?
}
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, updateRequest.index(), updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
Expand Down Expand Up @@ -323,15 +320,15 @@ public void onFailure(Throwable e) {
if (request.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(Locale.ENGLISH),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message, status)));
new BulkItemResponse.Failure(concreteIndices.getConcreteIndex(indexRequest.index()), indexRequest.type(), indexRequest.id(), message, status)));
} else if (request.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message, status)));
new BulkItemResponse.Failure(concreteIndices.getConcreteIndex(deleteRequest.index()), deleteRequest.type(), deleteRequest.id(), message, status)));
} else if (request.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message, status)));
new BulkItemResponse.Failure(concreteIndices.getConcreteIndex(updateRequest.index()), updateRequest.type(), updateRequest.id(), message, status)));
}
}
if (counter.decrementAndGet() == 0) {
Expand All @@ -345,4 +342,27 @@ private void finishHim() {
});
}
}

private static class ConcreteIndices {

private final Map<String, String> indices = new HashMap<>();
private final MetaData metaData;

ConcreteIndices(MetaData metaData) {
this.metaData = metaData;
}

String getConcreteIndex(String indexOrAlias) {
return indices.get(indexOrAlias);
}

String resolveIfAbsent(String indexOrAlias, IndicesOptions indicesOptions) {
String concreteIndex = indices.get(indexOrAlias);
if (concreteIndex == null) {
concreteIndex = metaData.concreteSingleIndex(indexOrAlias, indicesOptions);
indices.put(indexOrAlias, concreteIndex);
}
return concreteIndex;
}
}
}

0 comments on commit 5d987ad

Please sign in to comment.