Skip to content

Commit

Permalink
add support for write index resolution when creating/updating documents
Browse files Browse the repository at this point in the history
This commit introduces a new option to IndicesOptions that requires
a write index to exist and then uses this option for index/update requests.

This commit also fixes a subtle issue with how write-indices are resolved
in Aliases. Before, all aliases pointing to one-and-only-one index had a
write index even if is_write_index=false. This should not be the case.
  • Loading branch information
talevy committed Jun 22, 2018
1 parent 7313a98 commit d2da8ec
Show file tree
Hide file tree
Showing 21 changed files with 282 additions and 74 deletions.
Expand Up @@ -80,10 +80,7 @@ public void testAliasesContainTarget() {

public void testTargetIsAlias() {
Exception e = expectThrows(IllegalArgumentException.class, () -> succeeds("target_multi", "foo"));
assertThat(e.getMessage(), containsString("Alias [target_multi] has more than one indices associated with it [["));
// The index names can come in either order
assertThat(e.getMessage(), containsString("target"));
assertThat(e.getMessage(), containsString("target2"));
assertThat(e.getMessage(), containsString("Alias [target_multi] points to multiple indices"));
}

public void testRemoteInfoSkipsValidation() {
Expand Down
Expand Up @@ -148,8 +148,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final Set<String> indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -300,7 +300,7 @@ protected void doRun() throws Exception {
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.routing(), docWriteRequest.index(), true));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
Expand Down
Expand Up @@ -69,7 +69,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index(), false));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.request().type())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
Expand Down
Expand Up @@ -67,7 +67,7 @@ protected void doExecute(final MultiGetRequest request, final ActionListener<Mul
try {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, item).getName();

item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index()));
item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index(), false));
if ((item.routing() == null) && (clusterState.getMetaData().routingRequired(concreteSingleIndex, item.type()))) {
String message = "routing is required for [" + concreteSingleIndex + "]/[" + item.type() + "]/[" + item.id() + "]";
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(), new IllegalArgumentException(message)));
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
Expand Down Expand Up @@ -188,6 +189,11 @@ public ActionRequestValidationException validate() {
return validationException;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictAliasToWriteIndexNoExpandForbidClosed();
}

/**
* The content type. This will be used when generating a document from user provided objects like Maps and when parsing the
* source at index time
Expand Down Expand Up @@ -496,7 +502,7 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappi

/* resolve the routing if needed */
public void resolveRouting(MetaData metaData) {
routing(metaData.resolveIndexRouting(routing, index));
routing(metaData.resolveIndexRouting(routing, index, true));
}

@Override
Expand Down Expand Up @@ -603,5 +609,4 @@ public long getAutoGeneratedTimestamp() {
public IndexRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on IndexRequest");
}

}
Expand Up @@ -77,7 +77,8 @@ public enum Option {
IGNORE_ALIASES,
ALLOW_NO_INDICES,
FORBID_ALIASES_TO_MULTIPLE_INDICES,
FORBID_CLOSED_INDICES;
FORBID_CLOSED_INDICES,
REQUIRE_ALIASES_TO_WRITE_INDEX;

public static final EnumSet<Option> NONE = EnumSet.noneOf(Option.class);
}
Expand All @@ -93,6 +94,9 @@ public enum Option {
public static final IndicesOptions STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED =
new IndicesOptions(EnumSet.of(Option.FORBID_ALIASES_TO_MULTIPLE_INDICES, Option.FORBID_CLOSED_INDICES),
EnumSet.noneOf(WildcardStates.class));
public static final IndicesOptions STRICT_ALIAS_TO_WRITE_INDEX_NO_EXPAND_FORBID_CLOSED =
new IndicesOptions(EnumSet.of(Option.REQUIRE_ALIASES_TO_WRITE_INDEX, Option.FORBID_CLOSED_INDICES),
EnumSet.noneOf(WildcardStates.class));

private final EnumSet<Option> options;
private final EnumSet<WildcardStates> expandWildcards;
Expand Down Expand Up @@ -231,6 +235,13 @@ public boolean allowAliasesToMultipleIndices() {
return options.contains(Option.FORBID_ALIASES_TO_MULTIPLE_INDICES) == false;
}

/**
* @return whether aliases pointing to a write index should resolve to that index
*/
public boolean requireAliasesToWriteIndex() {
return options.contains(Option.REQUIRE_ALIASES_TO_WRITE_INDEX);
}

/**
* @return whether aliases should be ignored (when resolving a wildcard)
*/
Expand Down Expand Up @@ -375,6 +386,14 @@ public static IndicesOptions strictSingleIndexNoExpandForbidClosed() {
return STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED;
}

/**
* @return indices option that requires each specified index or alias to exist, doesn't expand wildcards and
* throws error if any of the aliases resolves to multiple indices with none specified as a write-index
*/
public static IndicesOptions strictAliasToWriteIndexNoExpandForbidClosed() {
return STRICT_ALIAS_TO_WRITE_INDEX_NO_EXPAND_FORBID_CLOSED;
}

/**
* @return indices options that ignores unavailable indices, expands wildcards only to open indices and
* allows that no indices are resolved from wildcard expressions (not returning an error).
Expand Down Expand Up @@ -413,6 +432,7 @@ public String toString() {
", allow_aliases_to_multiple_indices=" + allowAliasesToMultipleIndices() +
", forbid_closed_indices=" + forbidClosedIndices() +
", ignore_aliases=" + ignoreAliases() +
", require_aliases_to_write_index=" + requireAliasesToWriteIndex() +
']';
}
}
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -58,6 +59,11 @@ public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictAliasToWriteIndexNoExpandForbidClosed();
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
Expand Up @@ -64,7 +64,7 @@ protected void doExecute(final MultiTermVectorsRequest request, final ActionList
Map<ShardId, MultiTermVectorsShardRequest> shardRequests = new HashMap<>();
for (int i = 0; i < request.requests.size(); i++) {
TermVectorsRequest termVectorsRequest = request.requests.get(i);
termVectorsRequest.routing(clusterState.metaData().resolveIndexRouting(termVectorsRequest.routing(), termVectorsRequest.index()));
termVectorsRequest.routing(clusterState.metaData().resolveIndexRouting(termVectorsRequest.routing(), termVectorsRequest.index(), false));
if (!clusterState.metaData().hasConcreteIndex(termVectorsRequest.index())) {
responses.set(i, new MultiTermVectorsItemResponse(null, new MultiTermVectorsResponse.Failure(termVectorsRequest.index(),
termVectorsRequest.type(), termVectorsRequest.id(), new IndexNotFoundException(termVectorsRequest.index()))));
Expand Down
Expand Up @@ -78,7 +78,7 @@ protected boolean resolveIndex(TermVectorsRequest request) {
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias or a parent)
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index(), false));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetaData().routingRequired(request.concreteIndex(), request.request().type())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
Expand Down
Expand Up @@ -103,7 +103,7 @@ protected void resolveRequest(ClusterState state, UpdateRequest request) {
}

public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
request.routing((metaData.resolveIndexRouting(request.routing(), request.index())));
request.routing((metaData.resolveIndexRouting(request.routing(), request.index(), true)));
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
Expand Down Expand Up @@ -169,6 +170,11 @@ public ActionRequestValidationException validate() {
return validationException;
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictAliasToWriteIndexNoExpandForbidClosed();
}

/**
* The type of the indexed document.
*/
Expand Down
Expand Up @@ -154,12 +154,18 @@ void addIndex(IndexMetaData indexMetaData) {
}

public void computeAndValidateWriteIndex() {
List<IndexMetaData> writeIndices = referenceIndexMetaDatas.stream()
.filter(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).writeIndex()))
.collect(Collectors.toList());
if (referenceIndexMetaDatas.size() == 1) {
writeIndex.set(referenceIndexMetaDatas.get(0));
} else if (writeIndices.size() == 1) {
final List<IndexMetaData> writeIndices;
if (referenceIndexMetaDatas.size() > 1) {
writeIndices = referenceIndexMetaDatas.stream()
.filter(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).writeIndex()))
.collect(Collectors.toList());
} else if(Boolean.FALSE.equals(referenceIndexMetaDatas.get(0).getAliases().get(aliasName).writeIndex()) == false) {
writeIndices = Collections.singletonList(referenceIndexMetaDatas.get(0));
} else {
writeIndices = Collections.emptyList();
}

if (writeIndices.size() == 1) {
writeIndex.set(writeIndices.get(0));
} else if (writeIndices.size() > 1) {
List<String> writeIndicesStrings = writeIndices.stream()
Expand Down
Expand Up @@ -194,29 +194,43 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}

Collection<IndexMetaData> resolvedIndices = aliasOrIndex.getIndices();
if (resolvedIndices.size() > 1 && !options.allowAliasesToMultipleIndices()) {

if (aliasOrIndex.isAlias() && options.requireAliasesToWriteIndex()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
IndexMetaData writeIndex = alias.getWriteIndex();
if (writeIndex == null) {
if (alias.getIndices().size() > 1) {
throw new IllegalArgumentException("Alias [" + alias.getAliasName() +
"] points to multiple indices with none set as a write-index [is_write_index=true]");
} else {
throw new IllegalArgumentException("Alias [" + alias.getAliasName() + "] points to an index ["
+ alias.getIndices().get(0).getIndex().getName() + "] with [is_write_index=false]");
}
}
concreteIndices.add(writeIndex.getIndex());
} else if (resolvedIndices.size() > 1 && options.allowAliasesToMultipleIndices() == false) {
String[] indexNames = new String[resolvedIndices.size()];
int i = 0;
for (IndexMetaData indexMetaData : resolvedIndices) {
indexNames[i++] = indexMetaData.getIndex().getName();
}
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" +
Arrays.toString(indexNames) + "], can't execute a single index op");
}

for (IndexMetaData index : resolvedIndices) {
if (index.getState() == IndexMetaData.State.CLOSE) {
if (failClosed) {
throw new IndexClosedException(index.getIndex());
} else {
if (options.forbidClosedIndices() == false) {
concreteIndices.add(index.getIndex());
Arrays.toString(indexNames) + "], can't execute a single index op");
} else {
for (IndexMetaData index : resolvedIndices) {
if (index.getState() == IndexMetaData.State.CLOSE) {
if (failClosed) {
throw new IndexClosedException(index.getIndex());
} else {
if (options.forbidClosedIndices() == false) {
concreteIndices.add(index.getIndex());
}
}
} else if (index.getState() == IndexMetaData.State.OPEN) {
concreteIndices.add(index.getIndex());
} else {
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
}
} else if (index.getState() == IndexMetaData.State.OPEN) {
concreteIndices.add(index.getIndex());
} else {
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
}
}
}
Expand Down
Expand Up @@ -477,7 +477,7 @@ public String[] getConcreteAllClosedIndices() {
*/
// TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions
// in the index,bulk,update and delete apis.
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex, boolean isWriteOperation) {
if (aliasOrIndex == null) {
return routing;
}
Expand All @@ -487,10 +487,15 @@ public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex)
return routing;
}
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) result;
if (result.getIndices().size() > 1) {
if ((isWriteOperation && alias.getWriteIndex() == null) || (isWriteOperation == false && result.getIndices().size() > 1)) {
rejectSingleIndexOperation(aliasOrIndex, result);
}
AliasMetaData aliasMd = alias.getFirstAliasMetaData();
final AliasMetaData aliasMd;
if (isWriteOperation) {
aliasMd = alias.getWriteIndex().getAliases().get(alias.getAliasName());
} else {
aliasMd = alias.getFirstAliasMetaData();
}
if (aliasMd.indexRouting() != null) {
if (aliasMd.indexRouting().indexOf(',') != -1) {
throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value [" + aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation");
Expand Down

0 comments on commit d2da8ec

Please sign in to comment.