Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rivers removal. #11568

Merged
merged 1 commit into from Jun 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -64,7 +63,6 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;

Expand Down Expand Up @@ -1068,34 +1066,18 @@ private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardR
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexShard.indexService().mapperService();
mapperService.merge(request.type(), new CompressedXContent(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
} else {
created = operation.execute(indexShard);
}
final boolean created = operation.execute(indexShard);

// update the version on request so it will happen on the replicas
final long version = operation.version();
Expand Down
Expand Up @@ -53,7 +53,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
Expand All @@ -62,7 +61,6 @@
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.*;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -93,15 +91,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final AllocationService allocationService;
private final MetaDataService metaDataService;
private final Version version;
private final String riverIndexName;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final NodeEnvironment nodeEnv;

@Inject
public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator,
Version version, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, NodeEnvironment nodeEnv) {
super(settings);
this.threadPool = threadPool;
Expand All @@ -110,7 +107,6 @@ public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, Clus
this.allocationService = allocationService;
this.metaDataService = metaDataService;
this.version = version;
this.riverIndexName = riverIndexName;
this.aliasValidator = aliasValidator;
this.nodeEnv = nodeEnv;

Expand Down Expand Up @@ -163,7 +159,7 @@ public void validateIndexName(String index, ClusterState state) {
if (index.contains("#")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain '#'");
}
if (!index.equals(riverIndexName) && index.charAt(0) == '_') {
if (index.charAt(0) == '_') {
throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'");
}
if (!index.toLowerCase(Locale.ROOT).equals(index)) {
Expand Down Expand Up @@ -306,23 +302,15 @@ public ClusterState execute(ClusterState currentState) throws Exception {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
}
if (request.index().equals(ScriptService.SCRIPT_INDEX)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0));
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
}

Expand Down
10 changes: 0 additions & 10 deletions core/src/main/java/org/elasticsearch/common/logging/Loggers.java
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.river.RiverName;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -71,15 +70,6 @@ public static ESLogger getLogger(Class clazz, Settings settings, Index index, St
return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0]));
}

public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) {
List<String> l = Lists.newArrayList();
l.add(SPACE);
l.add(riverName.type());
l.add(riverName.name());
l.addAll(Lists.newArrayList(prefixes));
return getLogger(clazz, settings, l.toArray(new String[l.size()]));
}

public static ESLogger getLogger(Class clazz, Settings settings, String... prefixes) {
return getLogger(buildClassLoggerName(clazz), settings, prefixes);
}
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -81,8 +81,6 @@
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestModule;
import org.elasticsearch.river.RiversManager;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
Expand Down Expand Up @@ -182,7 +180,6 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
if (settings.getAsBoolean(HTTP_ENABLED, true)) {
modules.add(new HttpServerModule(settings));
}
modules.add(new RiversModule(settings));
modules.add(new IndicesModule(settings));
modules.add(new SearchModule(settings));
modules.add(new ActionModule(false));
Expand Down Expand Up @@ -247,7 +244,6 @@ public Node start() {
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(RiversManager.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(ClusterService.class).start();
Expand Down Expand Up @@ -289,8 +285,6 @@ private Node stop() {
injector.getInstance(HttpServer.class).stop();
}

injector.getInstance(RiversManager.class).stop();

injector.getInstance(SnapshotsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
Expand Down Expand Up @@ -339,10 +333,6 @@ public synchronized void close() {
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
}

stopWatch.stop().start("rivers");
injector.getInstance(RiversManager.class).close();

stopWatch.stop().start("snapshot_service");
injector.getInstance(SnapshotsService.class).close();
stopWatch.stop().start("client");
Expand Down

This file was deleted.

43 changes: 0 additions & 43 deletions core/src/main/java/org/elasticsearch/river/River.java

This file was deleted.

29 changes: 0 additions & 29 deletions core/src/main/java/org/elasticsearch/river/RiverComponent.java

This file was deleted.

47 changes: 0 additions & 47 deletions core/src/main/java/org/elasticsearch/river/RiverException.java

This file was deleted.