Skip to content

Commit

Permalink
Refactored create index api to make use of the new recently introduce…
Browse files Browse the repository at this point in the history
…d generic ack mechanism

Closes elastic#4421
  • Loading branch information
javanna authored and brusic committed Jan 19, 2014
1 parent 2971c33 commit b48899a
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 492 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.create;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;

import java.util.Map;
import java.util.Set;

import static com.google.common.collect.Maps.newHashMap;

/**
* Cluster state update request that allows to create an index
*/
public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CreateIndexClusterStateUpdateRequest> {

final String cause;
final String index;

private IndexMetaData.State state = IndexMetaData.State.OPEN;

private Settings settings = ImmutableSettings.Builder.EMPTY_SETTINGS;

private Map<String, String> mappings = Maps.newHashMap();

private Map<String, IndexMetaData.Custom> customs = newHashMap();

private Set<ClusterBlock> blocks = Sets.newHashSet();


CreateIndexClusterStateUpdateRequest(String cause, String index) {
this.cause = cause;
this.index = index;
}

public CreateIndexClusterStateUpdateRequest settings(Settings settings) {
this.settings = settings;
return this;
}

public CreateIndexClusterStateUpdateRequest mappings(Map<String, String> mappings) {
this.mappings.putAll(mappings);
return this;
}

public CreateIndexClusterStateUpdateRequest customs(Map<String, IndexMetaData.Custom> customs) {
this.customs.putAll(customs);
return this;
}

public CreateIndexClusterStateUpdateRequest blocks(Set<ClusterBlock> blocks) {
this.blocks.addAll(blocks);
return this;
}

public CreateIndexClusterStateUpdateRequest state(IndexMetaData.State state) {
this.state = state;
return this;
}

public String cause() {
return cause;
}

public String index() {
return index;
}

public IndexMetaData.State state() {
return state;
}

public Settings settings() {
return settings;
}

public Map<String, String> mappings() {
return mappings;
}

public Map<String, IndexMetaData.Custom> customs() {
return customs;
}

public Set<ClusterBlock> blocks() {
return blocks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -35,7 +34,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -48,7 +46,6 @@
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
import static org.elasticsearch.common.unit.TimeValue.readTimeValue;

/**
* A request to create an index. Best created with {@link org.elasticsearch.client.Requests#createIndexRequest(String)}.
Expand All @@ -59,7 +56,7 @@
* @see org.elasticsearch.client.Requests#createIndexRequest(String)
* @see CreateIndexResponse
*/
public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRequest> {
public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest> {

private String cause = "";

Expand All @@ -71,8 +68,6 @@ public class CreateIndexRequest extends MasterNodeOperationRequest<CreateIndexRe

private Map<String, IndexMetaData.Custom> customs = newHashMap();

private TimeValue timeout = AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;

CreateIndexRequest() {
}

Expand Down Expand Up @@ -173,6 +168,7 @@ public CreateIndexRequest settings(XContentBuilder builder) {
/**
* The settings to crete the index with (either json/yaml/properties format)
*/
@SuppressWarnings("unchecked")
public CreateIndexRequest settings(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
Expand Down Expand Up @@ -224,6 +220,7 @@ public CreateIndexRequest mapping(String type, XContentBuilder source) {
* @param type The mapping type
* @param source The mapping source
*/
@SuppressWarnings("unchecked")
public CreateIndexRequest mapping(String type, Map source) {
// wrap it in a type map if its not
if (source.size() != 1 || !source.containsKey(type)) {
Expand Down Expand Up @@ -292,6 +289,7 @@ public CreateIndexRequest source(BytesReference source) {
/**
* Sets the settings and mappings as a single source.
*/
@SuppressWarnings("unchecked")
public CreateIndexRequest source(Map<String, Object> source) {
boolean found = false;
for (Map.Entry<String, Object> entry : source.entrySet()) {
Expand Down Expand Up @@ -338,38 +336,13 @@ Map<String, IndexMetaData.Custom> customs() {
return this.customs;
}

/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public TimeValue timeout() {
return timeout;
}

/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}

/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequest timeout(String timeout) {
return timeout(TimeValue.parseTimeValue(timeout, null));
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
cause = in.readString();
index = in.readString();
settings = readSettingsFromStream(in);
timeout = readTimeValue(in);
readTimeout(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
mappings.put(in.readString(), in.readString());
Expand All @@ -388,7 +361,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(cause);
out.writeString(index);
writeSettingsToStream(settings, out);
timeout.writeTo(out);
writeTimeout(out);
out.writeVInt(mappings.size());
for (Map.Entry<String, String> entry : mappings.entrySet()) {
out.writeString(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalIndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.util.Map;

/**
*
* Builder for a create index request
*/
public class CreateIndexRequestBuilder extends MasterNodeOperationRequestBuilder<CreateIndexRequest, CreateIndexResponse, CreateIndexRequestBuilder> {
public class CreateIndexRequestBuilder extends AcknowledgedRequestBuilder<CreateIndexRequest, CreateIndexResponse, CreateIndexRequestBuilder> {

public CreateIndexRequestBuilder(IndicesAdminClient indicesClient) {
super((InternalIndicesAdminClient) indicesClient, new CreateIndexRequest());
Expand Down Expand Up @@ -200,24 +199,6 @@ public CreateIndexRequestBuilder setSource(XContentBuilder source) {
return this;
}

/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}

/**
* Timeout to wait for the index creation to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public CreateIndexRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}

@Override
protected void doExecute(ActionListener<CreateIndexResponse> listener) {
((IndicesAdminClient) client).create(request, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -28,34 +28,24 @@
/**
* A response for a create index action.
*/
public class CreateIndexResponse extends ActionResponse {

private boolean acknowledged;
public class CreateIndexResponse extends AcknowledgedResponse {

CreateIndexResponse() {
}

CreateIndexResponse(boolean acknowledged) {
this.acknowledged = acknowledged;
}

/**
* Has the index creation been acknowledged by all current cluster nodes within the
* provided {@link CreateIndexRequest#timeout(org.elasticsearch.common.unit.TimeValue)}.
*/
public boolean isAcknowledged() {
return acknowledged;
super(acknowledged);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
acknowledged = in.readBoolean();
readAcknowledged(in, null);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
writeAcknowledged(out, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
Expand Down Expand Up @@ -80,26 +82,27 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
cause = "api";
}

createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings())
.mappings(request.mappings())
.customs(request.customs())
.timeout(request.timeout())
.masterTimeout(request.masterNodeTimeout()),
new MetaDataCreateIndexService.Listener() {
@Override
public void onResponse(MetaDataCreateIndexService.Response response) {
listener.onResponse(new CreateIndexResponse(response.acknowledged()));
}
CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(cause, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.customs(request.customs());

@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
createIndexService.createIndex(updateRequest, new ClusterStateUpdateListener() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
}
}
1 change: 0 additions & 1 deletion src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ protected void configure() {
bind(RoutingService.class).asEagerSingleton();

bind(ShardStateAction.class).asEagerSingleton();
bind(NodeIndexCreatedAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
Expand Down
Loading

0 comments on commit b48899a

Please sign in to comment.