Skip to content

Commit

Permalink
Merge pull request #11568 from jpountz/remove/rivers
Browse files Browse the repository at this point in the history
Rivers removal.
  • Loading branch information
jpountz committed Jun 17, 2015
2 parents 6e15e08 + ac7ce2b commit 17fac6d
Show file tree
Hide file tree
Showing 33 changed files with 21 additions and 2,345 deletions.
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -64,7 +63,6 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;

Expand Down Expand Up @@ -1068,34 +1066,18 @@ private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardR
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexShard.indexService().mapperService();
mapperService.merge(request.type(), new CompressedXContent(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
} else {
created = operation.execute(indexShard);
}
final boolean created = operation.execute(indexShard);

// update the version on request so it will happen on the replicas
final long version = operation.version();
Expand Down
Expand Up @@ -53,7 +53,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentMapper;
Expand All @@ -62,7 +61,6 @@
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.*;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -93,15 +91,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final AllocationService allocationService;
private final MetaDataService metaDataService;
private final Version version;
private final String riverIndexName;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;
private final NodeEnvironment nodeEnv;

@Inject
public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService,
Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator,
Version version, AliasValidator aliasValidator,
Set<IndexTemplateFilter> indexTemplateFilters, NodeEnvironment nodeEnv) {
super(settings);
this.threadPool = threadPool;
Expand All @@ -110,7 +107,6 @@ public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, Clus
this.allocationService = allocationService;
this.metaDataService = metaDataService;
this.version = version;
this.riverIndexName = riverIndexName;
this.aliasValidator = aliasValidator;
this.nodeEnv = nodeEnv;

Expand Down Expand Up @@ -163,7 +159,7 @@ public void validateIndexName(String index, ClusterState state) {
if (index.contains("#")) {
throw new InvalidIndexNameException(new Index(index), index, "must not contain '#'");
}
if (!index.equals(riverIndexName) && index.charAt(0) == '_') {
if (index.charAt(0) == '_') {
throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'");
}
if (!index.toLowerCase(Locale.ROOT).equals(index)) {
Expand Down Expand Up @@ -306,23 +302,15 @@ public ClusterState execute(ClusterState currentState) throws Exception {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));
}
}
if (request.index().equals(ScriptService.SCRIPT_INDEX)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0));
indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");
} else {
if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {
if (request.index().equals(riverIndexName)) {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
} else {
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));
}
}

Expand Down
10 changes: 0 additions & 10 deletions core/src/main/java/org/elasticsearch/common/logging/Loggers.java
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.river.RiverName;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -71,15 +70,6 @@ public static ESLogger getLogger(Class clazz, Settings settings, Index index, St
return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0]));
}

public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) {
List<String> l = Lists.newArrayList();
l.add(SPACE);
l.add(riverName.type());
l.add(riverName.name());
l.addAll(Lists.newArrayList(prefixes));
return getLogger(clazz, settings, l.toArray(new String[l.size()]));
}

public static ESLogger getLogger(Class clazz, Settings settings, String... prefixes) {
return getLogger(buildClassLoggerName(clazz), settings, prefixes);
}
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -81,8 +81,6 @@
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestModule;
import org.elasticsearch.river.RiversManager;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
Expand Down Expand Up @@ -182,7 +180,6 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) {
if (settings.getAsBoolean(HTTP_ENABLED, true)) {
modules.add(new HttpServerModule(settings));
}
modules.add(new RiversModule(settings));
modules.add(new IndicesModule(settings));
modules.add(new SearchModule(settings));
modules.add(new ActionModule(false));
Expand Down Expand Up @@ -247,7 +244,6 @@ public Node start() {
injector.getInstance(IndexingMemoryController.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(IndicesTTLService.class).start();
injector.getInstance(RiversManager.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(TransportService.class).start();
injector.getInstance(ClusterService.class).start();
Expand Down Expand Up @@ -289,8 +285,6 @@ private Node stop() {
injector.getInstance(HttpServer.class).stop();
}

injector.getInstance(RiversManager.class).stop();

injector.getInstance(SnapshotsService.class).stop();
// stop any changes happening as a result of cluster state changes
injector.getInstance(IndicesClusterStateService.class).stop();
Expand Down Expand Up @@ -339,10 +333,6 @@ public synchronized void close() {
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
}

stopWatch.stop().start("rivers");
injector.getInstance(RiversManager.class).close();

stopWatch.stop().start("snapshot_service");
injector.getInstance(SnapshotsService.class).close();
stopWatch.stop().start("client");
Expand Down

This file was deleted.

43 changes: 0 additions & 43 deletions core/src/main/java/org/elasticsearch/river/River.java

This file was deleted.

29 changes: 0 additions & 29 deletions core/src/main/java/org/elasticsearch/river/RiverComponent.java

This file was deleted.

47 changes: 0 additions & 47 deletions core/src/main/java/org/elasticsearch/river/RiverException.java

This file was deleted.

0 comments on commit 17fac6d

Please sign in to comment.