From ac7ce2b899567460663c05b677bffea4bdcc76c5 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Wed, 10 Jun 2015 09:05:39 +0200 Subject: [PATCH] Rivers removal. While we had initially planned to keep rivers around in 2.0 to ease migration, keeping support for rivers is challenging as it conflicts with other important changes that we want to bring to 2.0 like synchronous dynamic mappings updates. Nothing impossible to fix, but it would increase the complexity of how we deal with dynamic mappings updates and manage rivers, while handling dynamic mappings updates correctly is important for resiliency and rivers are on the go. So removing rivers in 2.0 may well be a better trade-off. --- .../TransportReplicationAction.java | 32 +- .../metadata/MetaDataCreateIndexService.java | 20 +- .../elasticsearch/common/logging/Loggers.java | 10 - .../java/org/elasticsearch/node/Node.java | 10 - .../river/AbstractRiverComponent.java | 52 ---- .../java/org/elasticsearch/river/River.java | 43 --- .../elasticsearch/river/RiverComponent.java | 29 -- .../elasticsearch/river/RiverException.java | 47 --- .../elasticsearch/river/RiverIndexName.java | 50 ---- .../org/elasticsearch/river/RiverModule.java | 93 ------ .../org/elasticsearch/river/RiverName.java | 74 ----- .../elasticsearch/river/RiverNameModule.java | 39 --- .../elasticsearch/river/RiverSettings.java | 48 --- .../elasticsearch/river/RiversManager.java | 67 ----- .../org/elasticsearch/river/RiversModule.java | 64 ---- .../river/RiversPluginsModule.java | 51 ---- .../elasticsearch/river/RiversService.java | 279 ------------------ .../river/RiversTypesRegistry.java | 39 --- .../PublishRiverClusterStateAction.java | 121 -------- .../cluster/RiverClusterChangedEvent.java | 53 ---- .../river/cluster/RiverClusterService.java | 170 ----------- .../river/cluster/RiverClusterState.java | 96 ------ .../cluster/RiverClusterStateListener.java | 28 -- .../cluster/RiverClusterStateUpdateTask.java | 28 -- .../river/cluster/RiverNodeHelper.java | 55 ---- .../elasticsearch/river/dummy/DummyRiver.java | 48 --- .../river/dummy/DummyRiverModule.java | 34 --- .../river/routing/RiverRouting.java | 87 ------ .../river/routing/RiversRouter.java | 255 ---------------- .../river/routing/RiversRouting.java | 123 -------- .../org/elasticsearch/river/RiverTests.java | 168 ----------- docs/reference/migration/migrate_2_0.asciidoc | 10 + docs/reference/modules/plugins.asciidoc | 43 --- 33 files changed, 21 insertions(+), 2345 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java delete mode 100644 core/src/main/java/org/elasticsearch/river/River.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverComponent.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverException.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverIndexName.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverModule.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverName.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverNameModule.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiverSettings.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiversManager.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiversModule.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiversService.java delete mode 100644 core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java delete mode 100644 core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java delete mode 100644 core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java delete mode 100644 core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java delete mode 100644 core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java delete mode 100644 core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java delete mode 100644 core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java delete mode 100644 core/src/test/java/org/elasticsearch/river/RiverTests.java diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d3f7a5b935627..77386088709fb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -54,7 +54,6 @@ import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.shard.IndexShard; @@ -64,7 +63,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; @@ -1068,34 +1066,18 @@ private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardR protected final WriteResult executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable { Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - final boolean created; final ShardId shardId = indexShard.shardId(); if (update != null) { final String indexName = shardId.getIndex(); - if (indexName.equals(RiverIndexName.Conf.indexName(settings))) { - // With rivers, we have a chicken and egg problem if indexing - // the _meta document triggers a mapping update. Because we would - // like to validate the mapping update first, but on the other - // hand putting the mapping would start the river, which expects - // to find a _meta document - // So we have no choice but to index first and send mappings afterwards - MapperService mapperService = indexShard.indexService().mapperService(); - mapperService.merge(request.type(), new CompressedXContent(update.toBytes()), true); - created = operation.execute(indexShard); - mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update); - } else { - mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); - operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard); - update = operation.parsedDoc().dynamicMappingsUpdate(); - if (update != null) { - throw new RetryOnPrimaryException(shardId, - "Dynamics mappings are not available on the node that holds the primary yet"); - } - created = operation.execute(indexShard); + mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update); + operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard); + update = operation.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnPrimaryException(shardId, + "Dynamics mappings are not available on the node that holds the primary yet"); } - } else { - created = operation.execute(indexShard); } + final boolean created = operation.execute(indexShard); // update the version on request so it will happen on the replicas final long version = operation.version(); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index b4ca88f046b75..f8e809e9a55bc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -53,7 +53,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DocumentMapper; @@ -62,7 +61,6 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.IndexService; import org.elasticsearch.indices.*; -import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -93,7 +91,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final AllocationService allocationService; private final MetaDataService metaDataService; private final Version version; - private final String riverIndexName; private final AliasValidator aliasValidator; private final IndexTemplateFilter indexTemplateFilter; private final NodeEnvironment nodeEnv; @@ -101,7 +98,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { @Inject public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService, AllocationService allocationService, MetaDataService metaDataService, - Version version, @RiverIndexName String riverIndexName, AliasValidator aliasValidator, + Version version, AliasValidator aliasValidator, Set indexTemplateFilters, NodeEnvironment nodeEnv) { super(settings); this.threadPool = threadPool; @@ -110,7 +107,6 @@ public MetaDataCreateIndexService(Settings settings, ThreadPool threadPool, Clus this.allocationService = allocationService; this.metaDataService = metaDataService; this.version = version; - this.riverIndexName = riverIndexName; this.aliasValidator = aliasValidator; this.nodeEnv = nodeEnv; @@ -163,7 +159,7 @@ public void validateIndexName(String index, ClusterState state) { if (index.contains("#")) { throw new InvalidIndexNameException(new Index(index), index, "must not contain '#'"); } - if (!index.equals(riverIndexName) && index.charAt(0) == '_') { + if (index.charAt(0) == '_') { throw new InvalidIndexNameException(new Index(index), index, "must not start with '_'"); } if (!index.toLowerCase(Locale.ROOT).equals(index)) { @@ -306,11 +302,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) { - if (request.index().equals(riverIndexName)) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1)); - } else { - indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); - } + indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5)); } } if (request.index().equals(ScriptService.SCRIPT_INDEX)) { @@ -318,11 +310,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all"); } else { if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) { - if (request.index().equals(riverIndexName)) { - indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); - } else { - indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); - } + indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1)); } } diff --git a/core/src/main/java/org/elasticsearch/common/logging/Loggers.java b/core/src/main/java/org/elasticsearch/common/logging/Loggers.java index 512b8a40fe6ee..8f546c93e89c6 100644 --- a/core/src/main/java/org/elasticsearch/common/logging/Loggers.java +++ b/core/src/main/java/org/elasticsearch/common/logging/Loggers.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.river.RiverName; import java.net.InetAddress; import java.net.UnknownHostException; @@ -71,15 +70,6 @@ public static ESLogger getLogger(Class clazz, Settings settings, Index index, St return getLogger(clazz, settings, Lists.asList(SPACE, index.name(), prefixes).toArray(new String[0])); } - public static ESLogger getLogger(Class clazz, Settings settings, RiverName riverName, String... prefixes) { - List l = Lists.newArrayList(); - l.add(SPACE); - l.add(riverName.type()); - l.add(riverName.name()); - l.addAll(Lists.newArrayList(prefixes)); - return getLogger(clazz, settings, l.toArray(new String[l.size()])); - } - public static ESLogger getLogger(Class clazz, Settings settings, String... prefixes) { return getLogger(buildClassLoggerName(clazz), settings, prefixes); } diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 355bea50643f8..e562bff50e57f 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -81,8 +81,6 @@ import org.elasticsearch.repositories.RepositoriesModule; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestModule; -import org.elasticsearch.river.RiversManager; -import org.elasticsearch.river.RiversModule; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; @@ -182,7 +180,6 @@ public Node(Settings preparedSettings, boolean loadConfigSettings) { if (settings.getAsBoolean(HTTP_ENABLED, true)) { modules.add(new HttpServerModule(settings)); } - modules.add(new RiversModule(settings)); modules.add(new IndicesModule(settings)); modules.add(new SearchModule(settings)); modules.add(new ActionModule(false)); @@ -247,7 +244,6 @@ public Node start() { injector.getInstance(IndexingMemoryController.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); injector.getInstance(IndicesTTLService.class).start(); - injector.getInstance(RiversManager.class).start(); injector.getInstance(SnapshotsService.class).start(); injector.getInstance(TransportService.class).start(); injector.getInstance(ClusterService.class).start(); @@ -289,8 +285,6 @@ private Node stop() { injector.getInstance(HttpServer.class).stop(); } - injector.getInstance(RiversManager.class).stop(); - injector.getInstance(SnapshotsService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); @@ -339,10 +333,6 @@ public synchronized void close() { if (settings.getAsBoolean("http.enabled", true)) { injector.getInstance(HttpServer.class).close(); } - - stopWatch.stop().start("rivers"); - injector.getInstance(RiversManager.class).close(); - stopWatch.stop().start("snapshot_service"); injector.getInstance(SnapshotsService.class).close(); stopWatch.stop().start("client"); diff --git a/core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java b/core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java deleted file mode 100644 index b8ce3985d5f86..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/AbstractRiverComponent.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.Loggers; - -/** - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public class AbstractRiverComponent implements RiverComponent { - - protected final ESLogger logger; - - protected final RiverName riverName; - - protected final RiverSettings settings; - - protected AbstractRiverComponent(RiverName riverName, RiverSettings settings) { - this.riverName = riverName; - this.settings = settings; - - this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName); - } - - @Override - public RiverName riverName() { - return riverName; - } - - public String nodeName() { - return settings.globalSettings().get("name", ""); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/River.java b/core/src/main/java/org/elasticsearch/river/River.java deleted file mode 100644 index 574b5fd4f0b71..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/River.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -/** - * Allows to import data into elasticsearch via plugin - * Gets allocated on a node and eventually automatically re-allocated if needed - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public interface River extends RiverComponent { - - /** - * Called whenever the river is registered on a node, which can happen when: - * 1) the river _meta document gets indexed - * 2) an already registered river gets started on a node - */ - void start(); - - /** - * Called when the river is closed on a node, which can happen when: - * 1) the river is deleted by deleting its type through the delete mapping api - * 2) the node where the river is allocated is shut down or the river gets rerouted to another node - */ - void close(); -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverComponent.java b/core/src/main/java/org/elasticsearch/river/RiverComponent.java deleted file mode 100644 index a8cfb898798c8..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverComponent.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -/** - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public interface RiverComponent { - - RiverName riverName(); -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverException.java b/core/src/main/java/org/elasticsearch/river/RiverException.java deleted file mode 100644 index 4f9243949f68f..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.ElasticsearchException; - -/** - * - */ -public class RiverException extends ElasticsearchException { - - private final RiverName river; - - public RiverException(RiverName river, String msg) { - this(river, msg, null); - } - - public RiverException(RiverName river, String msg, Throwable cause) { - this(river, true, msg, cause); - } - - protected RiverException(RiverName river, boolean withSpace, String msg, Throwable cause) { - super("[" + river.type() + "][" + river.name() + "]" + (withSpace ? " " : "") + msg, cause); - this.river = river; - } - - public RiverName riverName() { - return river; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverIndexName.java b/core/src/main/java/org/elasticsearch/river/RiverIndexName.java deleted file mode 100644 index 66d2d03bab88e..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverIndexName.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.inject.BindingAnnotation; -import org.elasticsearch.common.settings.Settings; - -import java.lang.annotation.Documented; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - * - */ - -@BindingAnnotation -@Target({FIELD, PARAMETER}) -@Retention(RUNTIME) -@Documented -public @interface RiverIndexName { - - static class Conf { - public static final String DEFAULT_INDEX_NAME = "_river"; - - public static String indexName(Settings settings) { - return settings.get("river.index_name", DEFAULT_INDEX_NAME); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverModule.java b/core/src/main/java/org/elasticsearch/river/RiverModule.java deleted file mode 100644 index 83a686903b213..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverModule.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableList; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.Modules; -import org.elasticsearch.common.inject.SpawnModules; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.settings.NoClassSettingsException; - -import java.util.Locale; -import java.util.Map; - -import static org.elasticsearch.common.Strings.toCamelCase; - -/** - * - */ -public class RiverModule extends AbstractModule implements SpawnModules { - - private RiverName riverName; - - private final Settings globalSettings; - - private final Map settings; - - private final RiversTypesRegistry typesRegistry; - - public RiverModule(RiverName riverName, Map settings, Settings globalSettings, RiversTypesRegistry typesRegistry) { - this.riverName = riverName; - this.globalSettings = globalSettings; - this.settings = settings; - this.typesRegistry = typesRegistry; - } - - @Override - public Iterable spawnModules() { - return ImmutableList.of(Modules.createModule(loadTypeModule(riverName.type(), "org.elasticsearch.river.", "RiverModule"), globalSettings)); - } - - @Override - protected void configure() { - bind(RiverSettings.class).toInstance(new RiverSettings(globalSettings, settings)); - } - - private Class loadTypeModule(String type, String prefixPackage, String suffixClassName) { - Class registered = typesRegistry.type(type); - if (registered != null) { - return registered; - } - String fullClassName = type; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e) { - fullClassName = prefixPackage + Strings.capitalize(toCamelCase(type)) + suffixClassName; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e1) { - fullClassName = prefixPackage + toCamelCase(type) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e2) { - fullClassName = prefixPackage + toCamelCase(type).toLowerCase(Locale.ROOT) + "." + Strings.capitalize(toCamelCase(type)) + suffixClassName; - try { - return (Class) globalSettings.getClassLoader().loadClass(fullClassName); - } catch (ClassNotFoundException e3) { - throw new NoClassSettingsException("Failed to load class with value [" + type + "]", e); - } - } - } - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverName.java b/core/src/main/java/org/elasticsearch/river/RiverName.java deleted file mode 100644 index 7078574c1b4ef..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverName.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import java.io.Serializable; - -/** - * @deprecated See blog post https://www.elastic.co/blog/deprecating_rivers - */ -@Deprecated -public class RiverName implements Serializable { - - private final String type; - - private final String name; - - public RiverName(String type, String name) { - this.type = type; - this.name = name; - } - - public String type() { - return this.type; - } - - public String getType() { - return type(); - } - - public String name() { - return this.name; - } - - public String getName() { - return name(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - RiverName that = (RiverName) o; - - if (name != null ? !name.equals(that.name) : that.name != null) return false; - if (type != null ? !type.equals(that.type) : that.type != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = type != null ? type.hashCode() : 0; - result = 31 * result + (name != null ? name.hashCode() : 0); - return result; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverNameModule.java b/core/src/main/java/org/elasticsearch/river/RiverNameModule.java deleted file mode 100644 index cc908c0fd6e09..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverNameModule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.inject.AbstractModule; - -/** - * - */ -public class RiverNameModule extends AbstractModule { - - private final RiverName riverName; - - public RiverNameModule(RiverName riverName) { - this.riverName = riverName; - } - - @Override - protected void configure() { - bind(RiverName.class).toInstance(riverName); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiverSettings.java b/core/src/main/java/org/elasticsearch/river/RiverSettings.java deleted file mode 100644 index a06214861a799..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiverSettings.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.settings.Settings; - -import java.util.Map; - -/** - * (shayy.banon) - */ - -public class RiverSettings { - - private final Settings globalSettings; - - private final Map settings; - - public RiverSettings(Settings globalSettings, Map settings) { - this.globalSettings = globalSettings; - this.settings = settings; - } - - public Settings globalSettings() { - return globalSettings; - } - - public Map settings() { - return settings; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversManager.java b/core/src/main/java/org/elasticsearch/river/RiversManager.java deleted file mode 100644 index c1fc6da82b2b5..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversManager.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.routing.RiversRouter; - -/** - * - */ -public class RiversManager extends AbstractLifecycleComponent { - - private final RiversService riversService; - - private final RiverClusterService clusterService; - - private final RiversRouter riversRouter; - - @Inject - public RiversManager(Settings settings, RiversService riversService, RiverClusterService clusterService, RiversRouter riversRouter) { - super(settings); - this.riversService = riversService; - this.clusterService = clusterService; - this.riversRouter = riversRouter; - } - - @Override - protected void doStart() { - riversRouter.start(); - riversService.start(); - clusterService.start(); - } - - @Override - protected void doStop() { - riversRouter.stop(); - clusterService.stop(); - riversService.stop(); - } - - @Override - protected void doClose() { - riversRouter.close(); - clusterService.close(); - riversService.close(); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversModule.java b/core/src/main/java/org/elasticsearch/river/RiversModule.java deleted file mode 100644 index 912874fce725a..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversModule.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.routing.RiversRouter; - -import java.util.Map; - -/** - * - */ -public class RiversModule extends AbstractModule { - - private final Settings settings; - - private Map> riverTypes = Maps.newHashMap(); - - public RiversModule(Settings settings) { - this.settings = settings; - } - - /** - * Registers a custom river type name against a module. - * - * @param type The type - * @param module The module - */ - public void registerRiver(String type, Class module) { - riverTypes.put(type, module); - } - - @Override - protected void configure() { - bind(String.class).annotatedWith(RiverIndexName.class).toInstance(RiverIndexName.Conf.indexName(settings)); - bind(RiversService.class).asEagerSingleton(); - bind(RiverClusterService.class).asEagerSingleton(); - bind(RiversRouter.class).asEagerSingleton(); - bind(RiversManager.class).asEagerSingleton(); - bind(RiversTypesRegistry.class).toInstance(new RiversTypesRegistry(ImmutableMap.copyOf(riverTypes))); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java b/core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java deleted file mode 100644 index f2059fd3c42ef..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversPluginsModule.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.PreProcessModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.plugins.PluginsService; - -/** - * A module that simply calls the {@link PluginsService#processModule(org.elasticsearch.common.inject.Module)} - * in order to allow plugins to pre process specific river modules. - */ -public class RiversPluginsModule extends AbstractModule implements PreProcessModule { - - private final Settings settings; - - private final PluginsService pluginsService; - - public RiversPluginsModule(Settings settings, PluginsService pluginsService) { - this.settings = settings; - this.pluginsService = pluginsService; - } - - @Override - public void processModule(Module module) { - pluginsService.processModule(module); - } - - @Override - protected void configure() { - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/river/RiversService.java b/core/src/main/java/org/elasticsearch/river/RiversService.java deleted file mode 100644 index 268956eecf824..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversService.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.get.GetRequestBuilder; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.support.ThreadedActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.inject.ModulesBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.river.cluster.RiverClusterChangedEvent; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.cluster.RiverClusterState; -import org.elasticsearch.river.cluster.RiverClusterStateListener; -import org.elasticsearch.river.routing.RiverRouting; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; - -/** - * - */ -public class RiversService extends AbstractLifecycleComponent { - - private final String riverIndexName; - - private Client client; - - private final ThreadPool threadPool; - - private final ClusterService clusterService; - - private final RiversTypesRegistry typesRegistry; - - private final Injector injector; - - private final Map riversInjectors = Maps.newHashMap(); - - private volatile ImmutableMap rivers = ImmutableMap.of(); - - @Inject - public RiversService(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, RiversTypesRegistry typesRegistry, RiverClusterService riverClusterService, Injector injector) { - super(settings); - this.riverIndexName = RiverIndexName.Conf.indexName(settings); - this.client = client; - this.threadPool = threadPool; - this.clusterService = clusterService; - this.typesRegistry = typesRegistry; - this.injector = injector; - riverClusterService.add(new ApplyRivers()); - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - ImmutableSet indices = ImmutableSet.copyOf(this.rivers.keySet()); - final CountDownLatch latch = new CountDownLatch(indices.size()); - for (final RiverName riverName : indices) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - try { - closeRiver(riverName); - } catch (Exception e) { - logger.warn("failed to delete river on stop [{}]/[{}]", e, riverName.type(), riverName.name()); - } finally { - latch.countDown(); - } - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - // ignore - } - } - - @Override - protected void doClose() { - } - - public synchronized void createRiver(RiverName riverName, Map settings) { - if (riversInjectors.containsKey(riverName)) { - logger.warn("ignoring river [{}][{}] creation, already exists", riverName.type(), riverName.name()); - return; - } - - logger.info("rivers have been deprecated. Read https://www.elastic.co/blog/deprecating_rivers"); - logger.debug("creating river [{}][{}]", riverName.type(), riverName.name()); - - try { - ModulesBuilder modules = new ModulesBuilder(); - modules.add(new RiverNameModule(riverName)); - modules.add(new RiverModule(riverName, settings, this.settings, typesRegistry)); - modules.add(new RiversPluginsModule(this.settings, injector.getInstance(PluginsService.class))); - - Injector indexInjector = modules.createChildInjector(injector); - riversInjectors.put(riverName, indexInjector); - River river = indexInjector.getInstance(River.class); - rivers = MapBuilder.newMapBuilder(rivers).put(riverName, river).immutableMap(); - - - // we need this start so there can be operations done (like creating an index) which can't be - // done on create since Guice can't create two concurrent child injectors - river.start(); - - XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - - builder.startObject("node"); - builder.field("id", clusterService.localNode().id()); - builder.field("name", clusterService.localNode().name()); - builder.field("transport_address", clusterService.localNode().address().toString()); - builder.endObject(); - - builder.endObject(); - - - client.prepareIndex(riverIndexName, riverName.name(), "_status") - .setConsistencyLevel(WriteConsistencyLevel.ONE) - .setSource(builder).execute().actionGet(); - } catch (Exception e) { - logger.warn("failed to create river [{}][{}]", e, riverName.type(), riverName.name()); - - try { - XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - builder.field("error", ExceptionsHelper.detailedMessage(e)); - - builder.startObject("node"); - builder.field("id", clusterService.localNode().id()); - builder.field("name", clusterService.localNode().name()); - builder.field("transport_address", clusterService.localNode().address().toString()); - builder.endObject(); - builder.endObject(); - - client.prepareIndex(riverIndexName, riverName.name(), "_status") - .setConsistencyLevel(WriteConsistencyLevel.ONE) - .setSource(builder).execute().actionGet(); - } catch (Exception e1) { - logger.warn("failed to write failed status for river creation", e); - } - } - } - - public synchronized void closeRiver(RiverName riverName) { - Injector riverInjector; - River river; - synchronized (this) { - riverInjector = riversInjectors.remove(riverName); - if (riverInjector == null) { - throw new RiverException(riverName, "missing"); - } - logger.debug("closing river [{}][{}]", riverName.type(), riverName.name()); - - Map tmpMap = Maps.newHashMap(rivers); - river = tmpMap.remove(riverName); - rivers = ImmutableMap.copyOf(tmpMap); - } - - river.close(); - } - - private class ApplyRivers implements RiverClusterStateListener { - @Override - public void riverClusterChanged(RiverClusterChangedEvent event) { - DiscoveryNode localNode = clusterService.localNode(); - RiverClusterState state = event.state(); - - // first, go over and delete ones that either don't exists or are not allocated - for (final RiverName riverName : rivers.keySet()) { - RiverRouting routing = state.routing().routing(riverName); - if (routing == null || !localNode.equals(routing.node())) { - // not routed at all, and not allocated here, clean it (we delete the relevant ones before) - closeRiver(riverName); - } - } - - for (final RiverRouting routing : state.routing()) { - // not allocated - if (routing.node() == null) { - logger.trace("river {} has no routing node", routing.riverName().getName()); - continue; - } - // only apply changes to the local node - if (!routing.node().equals(localNode)) { - logger.trace("river {} belongs to node {}", routing.riverName().getName(), routing.node()); - continue; - } - // if its already created, ignore it - if (rivers.containsKey(routing.riverName())) { - logger.trace("river {} is already allocated", routing.riverName().getName()); - continue; - } - prepareGetMetaDocument(routing.riverName().name()).execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, new ActionListener() { - @Override - public void onResponse(GetResponse getResponse) { - if (!rivers.containsKey(routing.riverName())) { - if (getResponse.isExists()) { - // only create the river if it exists, otherwise, the indexing meta data has not been visible yet... - createRiver(routing.riverName(), getResponse.getSourceAsMap()); - } else { - //this should never happen as we've just found the _meta document in RiversRouter - logger.warn("{}/{}/_meta document not found", riverIndexName, routing.riverName().getName()); - } - } - } - - @Override - public void onFailure(Throwable e) { - // if its this is a failure that need to be retried, then do it - // this might happen if the state of the river index has not been propagated yet to this node, which - // should happen pretty fast since we managed to get the _meta in the RiversRouter - Throwable failure = ExceptionsHelper.unwrapCause(e); - if (isShardNotAvailableException(failure)) { - logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name()); - final ActionListener listener = this; - try { - threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.LISTENER, new Runnable() { - @Override - public void run() { - prepareGetMetaDocument(routing.riverName().name()).execute(listener); - } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Couldn't schedule river start retry, node might be shutting down", ex); - } - } else { - logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name()); - } - } - })); - } - } - - private GetRequestBuilder prepareGetMetaDocument(String riverName) { - return client.prepareGet(riverIndexName, riverName, "_meta").setPreference("_primary"); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java b/core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java deleted file mode 100644 index 3400dc40ec7b4..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/RiversTypesRegistry.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.collect.ImmutableMap; -import org.elasticsearch.common.inject.Module; - -/** - * A type registry for rivers - */ -public class RiversTypesRegistry { - - private final ImmutableMap> riverTypes; - - public RiversTypesRegistry(ImmutableMap> riverTypes) { - this.riverTypes = riverTypes; - } - - public Class type(String type) { - return riverTypes.get(type); - } -} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java b/core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java deleted file mode 100644 index d783adee752fd..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/PublishRiverClusterStateAction.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; - -import java.io.IOException; - -/** - * - */ -public class PublishRiverClusterStateAction extends AbstractComponent { - - public static final String ACTION_NAME = "internal:river/state/publish"; - - public interface NewClusterStateListener { - void onNewClusterState(RiverClusterState clusterState); - } - - private final TransportService transportService; - - private final ClusterService clusterService; - - private final NewClusterStateListener listener; - - public PublishRiverClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, - NewClusterStateListener listener) { - super(settings); - this.transportService = transportService; - this.clusterService = clusterService; - this.listener = listener; - transportService.registerRequestHandler(ACTION_NAME, PublishClusterStateRequest.class, ThreadPool.Names.SAME, new PublishClusterStateRequestHandler()); - } - - public void close() { - transportService.removeHandler(ACTION_NAME); - } - - public void publish(RiverClusterState clusterState) { - final DiscoveryNodes discoNodes = clusterService.state().nodes(); - final DiscoveryNode localNode = discoNodes.localNode(); - for (final DiscoveryNode node : discoNodes) { - if (node.equals(localNode)) { - // no need to send to our self - continue; - } - - // we only want to send nodes that are either possible master nodes or river nodes - // master nodes because they will handle the state and the allocation of rivers - // and river nodes since they will end up creating indexes - - if (!node.masterNode() && !RiverNodeHelper.isRiverNode(node)) { - continue; - } - - transportService.sendRequest(node, ACTION_NAME, new PublishClusterStateRequest(clusterState), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleException(TransportException exp) { - logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node); - } - }); - } - } - - static class PublishClusterStateRequest extends TransportRequest { - - private RiverClusterState clusterState; - - PublishClusterStateRequest() { - } - - private PublishClusterStateRequest(RiverClusterState clusterState) { - this.clusterState = clusterState; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - clusterState = RiverClusterState.Builder.readFrom(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - RiverClusterState.Builder.writeTo(clusterState, out); - } - } - - private class PublishClusterStateRequestHandler implements TransportRequestHandler { - @Override - public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception { - listener.onNewClusterState(request.clusterState); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java deleted file mode 100644 index 2863a401bf66b..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterChangedEvent.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -/** - * - */ -public class RiverClusterChangedEvent { - - private final String source; - - private final RiverClusterState previousState; - - private final RiverClusterState state; - - public RiverClusterChangedEvent(String source, RiverClusterState state, RiverClusterState previousState) { - this.source = source; - this.state = state; - this.previousState = previousState; - } - - /** - * The source that caused this cluster event to be raised. - */ - public String source() { - return this.source; - } - - public RiverClusterState state() { - return this.state; - } - - public RiverClusterState previousState() { - return this.previousState; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java deleted file mode 100644 index 3a5208da45755..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterService.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.TransportService; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static java.util.concurrent.Executors.newSingleThreadExecutor; -import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; - -/** - * - */ -public class RiverClusterService extends AbstractLifecycleComponent { - - private final ClusterService clusterService; - - private final PublishRiverClusterStateAction publishAction; - - private final List clusterStateListeners = new CopyOnWriteArrayList<>(); - - private volatile ExecutorService updateTasksExecutor; - - private volatile RiverClusterState clusterState = RiverClusterState.builder().build(); - - @Inject - public RiverClusterService(Settings settings, TransportService transportService, ClusterService clusterService) { - super(settings); - this.clusterService = clusterService; - - this.publishAction = new PublishRiverClusterStateAction(settings, transportService, clusterService, new UpdateClusterStateListener()); - } - - @Override - protected void doStart() { - this.updateTasksExecutor = newSingleThreadExecutor(daemonThreadFactory(settings, "riverClusterService#updateTask")); - } - - @Override - protected void doStop() { - updateTasksExecutor.shutdown(); - try { - updateTasksExecutor.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // ignore - } - } - - @Override - protected void doClose() { - } - - public void add(RiverClusterStateListener listener) { - clusterStateListeners.add(listener); - } - - public void remove(RiverClusterStateListener listener) { - clusterStateListeners.remove(listener); - } - - /** - * The current state. - */ - public ClusterState state() { - return clusterService.state(); - } - - public void submitStateUpdateTask(final String source, final RiverClusterStateUpdateTask updateTask) { - if (!lifecycle.started()) { - return; - } - updateTasksExecutor.execute(new Runnable() { - @Override - public void run() { - if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster_service not started", source); - return; - } - logger.debug("processing [{}]: execute", source); - - RiverClusterState previousClusterState = clusterState; - try { - clusterState = updateTask.execute(previousClusterState); - } catch (Exception e) { - StringBuilder sb = new StringBuilder("failed to execute cluster state update, state:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); - logger.warn(sb.toString(), e); - return; - } - if (previousClusterState != clusterState) { - if (clusterService.state().nodes().localNodeMaster()) { - // only the master controls the version numbers - clusterState = new RiverClusterState(clusterState.version() + 1, clusterState); - } else { - // we got this cluster state from the master, filter out based on versions (don't call listeners) - if (clusterState.version() < previousClusterState.version()) { - logger.debug("got old cluster state [" + clusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring"); - return; - } - } - - if (logger.isTraceEnabled()) { - StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(clusterState.version()).append("], source [").append(source).append("]\n"); - logger.trace(sb.toString()); - } else if (logger.isDebugEnabled()) { - logger.debug("cluster state updated, version [{}], source [{}]", clusterState.version(), source); - } - - RiverClusterChangedEvent clusterChangedEvent = new RiverClusterChangedEvent(source, clusterState, previousClusterState); - - for (RiverClusterStateListener listener : clusterStateListeners) { - listener.riverClusterChanged(clusterChangedEvent); - } - - // if we are the master, publish the new state to all nodes - if (clusterService.state().nodes().localNodeMaster()) { - publishAction.publish(clusterState); - } - - logger.debug("processing [{}]: done applying updated cluster_state", source); - } else { - logger.debug("processing [{}]: no change in cluster_state", source); - } - } - }); - } - - private class UpdateClusterStateListener implements PublishRiverClusterStateAction.NewClusterStateListener { - @Override - public void onNewClusterState(final RiverClusterState clusterState) { - ClusterState state = clusterService.state(); - if (state.nodes().localNodeMaster()) { - logger.warn("master should not receive new cluster state from [{}]", state.nodes().masterNode()); - return; - } - - submitStateUpdateTask("received_state", new RiverClusterStateUpdateTask() { - @Override - public RiverClusterState execute(RiverClusterState currentState) { - return clusterState; - } - }); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java deleted file mode 100644 index 7be85a6d51b7a..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterState.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.river.routing.RiversRouting; - -import java.io.IOException; - -/** - * - */ -public class RiverClusterState { - - private final long version; - - private final RiversRouting routing; - - public RiverClusterState(long version, RiverClusterState state) { - this.version = version; - this.routing = state.routing(); - } - - RiverClusterState(long version, RiversRouting routing) { - this.version = version; - this.routing = routing; - } - - public long version() { - return this.version; - } - - public RiversRouting routing() { - return routing; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private long version = 0; - - private RiversRouting routing = RiversRouting.EMPTY; - - public Builder state(RiverClusterState state) { - this.version = state.version(); - this.routing = state.routing(); - return this; - } - - public Builder routing(RiversRouting.Builder builder) { - return routing(builder.build()); - } - - public Builder routing(RiversRouting routing) { - this.routing = routing; - return this; - } - - public RiverClusterState build() { - return new RiverClusterState(version, routing); - } - - public static RiverClusterState readFrom(StreamInput in) throws IOException { - Builder builder = new Builder(); - builder.version = in.readVLong(); - builder.routing = RiversRouting.Builder.readFrom(in); - return builder.build(); - } - - public static void writeTo(RiverClusterState clusterState, StreamOutput out) throws IOException { - out.writeVLong(clusterState.version); - RiversRouting.Builder.writeTo(clusterState.routing, out); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java deleted file mode 100644 index c8cac757155f9..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -/** - * - */ -public interface RiverClusterStateListener { - - void riverClusterChanged(RiverClusterChangedEvent event); -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java deleted file mode 100644 index eb7d3fe275bf7..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverClusterStateUpdateTask.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -/** - * - */ -public interface RiverClusterStateUpdateTask { - - RiverClusterState execute(RiverClusterState currentState); -} diff --git a/core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java b/core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java deleted file mode 100644 index 630075144705b..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/cluster/RiverNodeHelper.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.cluster; - -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.river.RiverName; - -/** - * - */ -public class RiverNodeHelper { - - public static boolean isRiverNode(DiscoveryNode node) { - // we don't allocate rivers on client nodes - if (node.clientNode()) { - return false; - } - String river = node.attributes().get("river"); - // by default, if not set, it's a river node (better OOB exp) - if (river == null) { - return true; - } - if ("_none_".equals(river)) { - return false; - } - // there is at least one river settings, we need it - return true; - } - - public static boolean isRiverNode(DiscoveryNode node, RiverName riverName) { - if (!isRiverNode(node)) { - return false; - } - String river = node.attributes().get("river"); - // by default, if not set, its an river node (better OOB exp) - return river == null || river.contains(riverName.type()) || river.contains(riverName.name()); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java b/core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java deleted file mode 100644 index f90a928bc5e74..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.dummy; - -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.river.AbstractRiverComponent; -import org.elasticsearch.river.River; -import org.elasticsearch.river.RiverName; -import org.elasticsearch.river.RiverSettings; - -/** - * - */ -public class DummyRiver extends AbstractRiverComponent implements River { - - @Inject - public DummyRiver(RiverName riverName, RiverSettings settings) { - super(riverName, settings); - logger.info("create"); - } - - @Override - public void start() { - logger.info("start"); - } - - @Override - public void close() { - logger.info("close"); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java b/core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java deleted file mode 100644 index a0e3057955ecb..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/dummy/DummyRiverModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.dummy; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.river.River; - -/** - * - */ -public class DummyRiverModule extends AbstractModule { - - @Override - protected void configure() { - bind(River.class).to(DummyRiver.class).asEagerSingleton(); - } -} diff --git a/core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java b/core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java deleted file mode 100644 index 0fe41d6f24f92..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/routing/RiverRouting.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.routing; - -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.river.RiverName; - -import java.io.IOException; - -/** - * - */ -public class RiverRouting implements Streamable { - - private RiverName riverName; - - private DiscoveryNode node; - - private RiverRouting() { - } - - RiverRouting(RiverName riverName, DiscoveryNode node) { - this.riverName = riverName; - this.node = node; - } - - public RiverName riverName() { - return riverName; - } - - /** - * The node the river is allocated to, null if its not allocated. - */ - public DiscoveryNode node() { - return node; - } - - void node(DiscoveryNode node) { - this.node = node; - } - - public static RiverRouting readRiverRouting(StreamInput in) throws IOException { - RiverRouting routing = new RiverRouting(); - routing.readFrom(in); - return routing; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - riverName = new RiverName(in.readString(), in.readString()); - if (in.readBoolean()) { - node = DiscoveryNode.readNode(in); - } - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(riverName.type()); - out.writeString(riverName.name()); - if (node == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - node.writeTo(out); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java b/core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java deleted file mode 100644 index 065d71d077cb8..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/routing/RiversRouter.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.routing; - -import com.carrotsearch.hppc.cursors.ObjectCursor; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.indices.IndexMissingException; -import org.elasticsearch.river.RiverIndexName; -import org.elasticsearch.river.RiverName; -import org.elasticsearch.river.cluster.RiverClusterService; -import org.elasticsearch.river.cluster.RiverClusterState; -import org.elasticsearch.river.cluster.RiverClusterStateUpdateTask; -import org.elasticsearch.river.cluster.RiverNodeHelper; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * - */ -public class RiversRouter extends AbstractLifecycleComponent implements ClusterStateListener { - - private static final TimeValue RIVER_START_RETRY_INTERVAL = TimeValue.timeValueMillis(1000); - private static final int RIVER_START_MAX_RETRIES = 5; - - private final String riverIndexName; - - private final Client client; - - private final RiverClusterService riverClusterService; - - private final ThreadPool threadPool; - - @Inject - public RiversRouter(Settings settings, Client client, ClusterService clusterService, RiverClusterService riverClusterService, ThreadPool threadPool) { - super(settings); - this.riverIndexName = RiverIndexName.Conf.indexName(settings); - this.riverClusterService = riverClusterService; - this.client = client; - this.threadPool = threadPool; - clusterService.add(this); - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public void clusterChanged(final ClusterChangedEvent event) { - if (!event.localNodeMaster()) { - return; - } - final String source = "reroute_rivers_node_changed"; - //we'll try again a few times if we don't find the river _meta document while the type is there - final CountDown countDown = new CountDown(RIVER_START_MAX_RETRIES); - riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() { - @Override - public RiverClusterState execute(RiverClusterState currentState) { - return updateRiverClusterState(source, currentState, event.state(), countDown); - } - }); - } - - protected RiverClusterState updateRiverClusterState(final String source, final RiverClusterState currentState, - ClusterState newClusterState, final CountDown countDown) { - if (!newClusterState.metaData().hasIndex(riverIndexName)) { - // if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state - if (!currentState.routing().isEmpty()) { - return RiverClusterState.builder().state(currentState).routing(RiversRouting.builder()).build(); - } - return currentState; - } - - RiversRouting.Builder routingBuilder = RiversRouting.builder().routing(currentState.routing()); - boolean dirty = false; - IndexMetaData indexMetaData = newClusterState.metaData().index(riverIndexName); - - boolean metaFound = true; - // go over and create new river routing (with no node) for new types (rivers names) - for (ObjectCursor cursor : indexMetaData.mappings().values()) { - String mappingType = cursor.value.type(); // mapping type is the name of the river - if (MapperService.DEFAULT_MAPPING.equals(mappingType)) { - continue; - } - if (!currentState.routing().hasRiverByName(mappingType)) { - // no river, we need to add it to the routing with no node allocation - try { - GetResponse getResponse = client.prepareGet(riverIndexName, mappingType, "_meta").setPreference("_primary").get(); - if (getResponse.isExists()) { - - logger.debug("{}/{}/_meta document found.", riverIndexName, mappingType); - - String riverType = XContentMapValues.nodeStringValue(getResponse.getSourceAsMap().get("type"), null); - if (riverType == null) { - logger.warn("no river type provided for [{}], ignoring...", riverIndexName); - } else { - routingBuilder.put(new RiverRouting(new RiverName(riverType, mappingType), null)); - dirty = true; - } - } else { - // At least one type does not have _meta - metaFound = false; - } - } catch (NoShardAvailableActionException e) { - // ignore, we will get it next time... - } catch (ClusterBlockException e) { - // ignore, we will get it next time - } catch (IndexMissingException e) { - // ignore, we will get it next time - } catch (IllegalIndexShardStateException e) { - // ignore, we will get it next time - } catch (Exception e) { - logger.warn("failed to get/parse _meta for [{}]", e, mappingType); - } - } - } - - // At least one type does not have _meta, so we are - // going to reschedule some checks - if (!metaFound) { - if (countDown.countDown()) { - logger.warn("no river _meta document found after {} attempts", RIVER_START_MAX_RETRIES); - } else { - logger.debug("no river _meta document found retrying in {} ms", RIVER_START_RETRY_INTERVAL.millis()); - try { - threadPool.schedule(RIVER_START_RETRY_INTERVAL, ThreadPool.Names.GENERIC, new Runnable() { - @Override - public void run() { - riverClusterService.submitStateUpdateTask(source, new RiverClusterStateUpdateTask() { - @Override - public RiverClusterState execute(RiverClusterState currentState) { - return updateRiverClusterState(source, currentState, riverClusterService.state(), countDown); - } - }); - } - }); - } catch (EsRejectedExecutionException ex) { - logger.debug("Couldn't schedule river start retry, node might be shutting down", ex); - } - } - } - - // now, remove routings that were deleted - // also, apply nodes that were removed and rivers were running on - for (RiverRouting routing : currentState.routing()) { - if (!indexMetaData.mappings().containsKey(routing.riverName().name())) { - routingBuilder.remove(routing); - dirty = true; - } else if (routing.node() != null && !newClusterState.nodes().nodeExists(routing.node().id())) { - routingBuilder.remove(routing); - routingBuilder.put(new RiverRouting(routing.riverName(), null)); - dirty = true; - } - } - - // build a list from nodes to rivers - Map> nodesToRivers = Maps.newHashMap(); - - for (DiscoveryNode node : newClusterState.nodes()) { - if (RiverNodeHelper.isRiverNode(node)) { - nodesToRivers.put(node, Lists.newArrayList()); - } - } - - List unassigned = Lists.newArrayList(); - for (RiverRouting routing : routingBuilder.build()) { - if (routing.node() == null) { - unassigned.add(routing); - } else { - List l = nodesToRivers.get(routing.node()); - if (l == null) { - l = Lists.newArrayList(); - nodesToRivers.put(routing.node(), l); - } - l.add(routing); - } - } - for (Iterator it = unassigned.iterator(); it.hasNext(); ) { - RiverRouting routing = it.next(); - DiscoveryNode smallest = null; - int smallestSize = Integer.MAX_VALUE; - for (Map.Entry> entry : nodesToRivers.entrySet()) { - if (RiverNodeHelper.isRiverNode(entry.getKey(), routing.riverName())) { - if (entry.getValue().size() < smallestSize) { - smallestSize = entry.getValue().size(); - smallest = entry.getKey(); - } - } - } - if (smallest != null) { - dirty = true; - it.remove(); - routing.node(smallest); - nodesToRivers.get(smallest).add(routing); - logger.debug("going to allocate river [{}] on node {}", routing.riverName().getName(), smallest); - } - } - - - // add relocation logic... - - if (dirty) { - return RiverClusterState.builder().state(currentState).routing(routingBuilder).build(); - } - return currentState; - } -} diff --git a/core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java b/core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java deleted file mode 100644 index 837e3dd709ed7..0000000000000 --- a/core/src/main/java/org/elasticsearch/river/routing/RiversRouting.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river.routing; - -import com.google.common.collect.ImmutableMap; -import org.elasticsearch.common.collect.MapBuilder; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.river.RiverName; - -import java.io.IOException; -import java.util.Iterator; - -/** - * - */ -public class RiversRouting implements Iterable { - - public static final RiversRouting EMPTY = RiversRouting.builder().build(); - - private final ImmutableMap rivers; - - private RiversRouting(ImmutableMap rivers) { - this.rivers = rivers; - } - - public boolean isEmpty() { - return rivers.isEmpty(); - } - - public RiverRouting routing(RiverName riverName) { - return rivers.get(riverName); - } - - public boolean hasRiverByName(String name) { - for (RiverName riverName : rivers.keySet()) { - if (riverName.name().equals(name)) { - return true; - } - } - return false; - } - - @Override - public Iterator iterator() { - return rivers.values().iterator(); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private MapBuilder rivers = MapBuilder.newMapBuilder(); - - public Builder routing(RiversRouting routing) { - rivers.putAll(routing.rivers); - return this; - } - - public Builder put(RiverRouting routing) { - rivers.put(routing.riverName(), routing); - return this; - } - - public Builder remove(RiverRouting routing) { - rivers.remove(routing.riverName()); - return this; - } - - public Builder remove(RiverName riverName) { - rivers.remove(riverName); - return this; - } - - public Builder remote(String riverName) { - for (RiverName name : rivers.map().keySet()) { - if (name.name().equals(riverName)) { - rivers.remove(name); - } - } - return this; - } - - public RiversRouting build() { - return new RiversRouting(rivers.immutableMap()); - } - - public static RiversRouting readFrom(StreamInput in) throws IOException { - Builder builder = new Builder(); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - builder.put(RiverRouting.readRiverRouting(in)); - } - return builder.build(); - } - - public static void writeTo(RiversRouting routing, StreamOutput out) throws IOException { - out.writeVInt(routing.rivers.size()); - for (RiverRouting riverRouting : routing) { - riverRouting.writeTo(out); - } - } - } -} diff --git a/core/src/test/java/org/elasticsearch/river/RiverTests.java b/core/src/test/java/org/elasticsearch/river/RiverTests.java deleted file mode 100644 index 1a3e0e7016863..0000000000000 --- a/core/src/test/java/org/elasticsearch/river/RiverTests.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.river; - -import com.google.common.base.Predicate; - -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequestBuilder; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.river.dummy.DummyRiverModule; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.Matchers.equalTo; - -@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE) -@AwaitsFix(bugUrl="occasionally fails apparently due to synchronous mappings updates") -public class RiverTests extends ElasticsearchIntegrationTest { - - @Override - protected void beforeIndexDeletion() { - } - - @Test - public void testRiverStart() throws Exception { - startAndCheckRiverIsStarted("dummy-river-test"); - } - - @Test - public void testMultipleRiversStart() throws Exception { - int nbRivers = between(2,10); - logger.info("--> testing with {} rivers...", nbRivers); - Thread[] riverCreators = new Thread[nbRivers]; - final CountDownLatch latch = new CountDownLatch(nbRivers); - final MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); - for (int i = 0; i < nbRivers; i++) { - final String riverName = "dummy-river-test-" + i; - riverCreators[i] = new Thread() { - @Override - public void run() { - try { - startRiver(riverName); - } catch (Throwable t) { - logger.warn("failed to register river {}", t, riverName); - } finally { - latch.countDown(); - } - } - }; - riverCreators[i].start(); - multiGetRequestBuilder.add(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status"); - } - - latch.await(); - - logger.info("--> checking that all rivers were created"); - assertThat(awaitBusy(new Predicate() { - @Override - public boolean apply(Object obj) { - MultiGetResponse multiGetItemResponse = multiGetRequestBuilder.get(); - for (MultiGetItemResponse getItemResponse : multiGetItemResponse) { - if (getItemResponse.isFailed() || !getItemResponse.getResponse().isExists()) { - return false; - } - } - return true; - } - }, 5, TimeUnit.SECONDS), equalTo(true)); - } - - /** - * Test case for https://github.com/elasticsearch/elasticsearch/issues/4577 - * River does not start when using config/templates files - */ - @Test - public void startDummyRiverWithDefaultTemplate() throws Exception { - logger.info("--> create empty template"); - client().admin().indices().preparePutTemplate("template_1") - .setTemplate("*") - .setOrder(0) - .addMapping(MapperService.DEFAULT_MAPPING, - JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING) - .endObject().endObject()) - .get(); - - startAndCheckRiverIsStarted("dummy-river-default-template-test"); - } - - /** - * Test case for https://github.com/elasticsearch/elasticsearch/issues/4577 - * River does not start when using config/templates files - */ - @Test - public void startDummyRiverWithSomeTemplates() throws Exception { - logger.info("--> create some templates"); - client().admin().indices().preparePutTemplate("template_1") - .setTemplate("*") - .setOrder(0) - .addMapping(MapperService.DEFAULT_MAPPING, - JsonXContent.contentBuilder().startObject().startObject(MapperService.DEFAULT_MAPPING) - .endObject().endObject()) - .get(); - client().admin().indices().preparePutTemplate("template_2") - .setTemplate("*") - .setOrder(0) - .addMapping("atype", - JsonXContent.contentBuilder().startObject().startObject("atype") - .endObject().endObject()) - .get(); - - startAndCheckRiverIsStarted("dummy-river-template-test"); - } - - /** - * Create a Dummy river then check it has been started. We will fail after 5 seconds. - * @param riverName Dummy river needed to be started - */ - private void startAndCheckRiverIsStarted(final String riverName) throws InterruptedException { - startRiver(riverName); - checkRiverIsStarted(riverName); - } - - private void startRiver(final String riverName) { - logger.info("--> starting river [{}]", riverName); - IndexResponse indexResponse = client().prepareIndex(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_meta") - .setSource("type", DummyRiverModule.class.getCanonicalName()).get(); - assertTrue(indexResponse.isCreated()); - ensureGreen(); - } - - private void checkRiverIsStarted(final String riverName) throws InterruptedException { - logger.info("--> checking that river [{}] was created", riverName); - assertThat(awaitBusy(new Predicate() { - @Override - public boolean apply(Object obj) { - GetResponse response = client().prepareGet(RiverIndexName.Conf.DEFAULT_INDEX_NAME, riverName, "_status").get(); - return response.isExists(); - } - }, 5, TimeUnit.SECONDS), equalTo(true)); - } - -} diff --git a/docs/reference/migration/migrate_2_0.asciidoc b/docs/reference/migration/migrate_2_0.asciidoc index 0965e53ba9d8b..d010ca4628637 100644 --- a/docs/reference/migration/migrate_2_0.asciidoc +++ b/docs/reference/migration/migrate_2_0.asciidoc @@ -9,6 +9,16 @@ your application to Elasticsearch 2.0. Elasticsearch now binds to the loopback interface by default (usually 127.0.0.1 or ::1), the setting `network.host` can be specified to change this behavior. +=== Rivers removal + +Elasticsearch does not support rivers anymore. While we had first planned to +keep them around to ease migration, keeping support for rivers proved to be +challenging as it conflicted with other important changes that we wanted to +bring to 2.0 like synchronous dynamic mappings updates, so we eventually +decided to remove them entirely. See +https://www.elastic.co/blog/deprecating_rivers for more background about why +we are moving away from rivers. + === Indices API The <> will, by default produce an error response diff --git a/docs/reference/modules/plugins.asciidoc b/docs/reference/modules/plugins.asciidoc index 40c288280cc6f..ecc0a9ab98c63 100644 --- a/docs/reference/modules/plugins.asciidoc +++ b/docs/reference/modules/plugins.asciidoc @@ -214,49 +214,6 @@ You can disable that check using `plugins.check_lucene: false`. * https://github.com/shikhar/eskka[eskka Discovery Plugin] (by Shikhar Bhushan) * https://github.com/grantr/elasticsearch-srv-discovery[DNS SRV Discovery Plugin] (by Grant Rodgers) -[float] -[[river]] -==== River Plugins - -deprecated[1.5.0,Rivers have been deprecated. See https://www.elastic.co/blog/deprecating_rivers for more details] - -.Supported by Elasticsearch -* https://github.com/elasticsearch/elasticsearch-river-couchdb[CouchDB River Plugin] -* https://github.com/elasticsearch/elasticsearch-river-rabbitmq[RabbitMQ River Plugin] -* https://github.com/elasticsearch/elasticsearch-river-twitter[Twitter River Plugin] -* https://github.com/elasticsearch/elasticsearch-river-wikipedia[Wikipedia River Plugin] - -.Supported by the community -* https://github.com/domdorn/elasticsearch-river-activemq/[ActiveMQ River Plugin] (by Dominik Dorn) -* https://github.com/albogdano/elasticsearch-river-amazonsqs[Amazon SQS River Plugin] (by Alex Bogdanovski) -* https://github.com/xxBedy/elasticsearch-river-csv[CSV River Plugin] (by Martin Bednar) -* http://www.pilato.fr/dropbox/[Dropbox River Plugin] (by David Pilato) -* http://www.pilato.fr/fsriver/[FileSystem River Plugin] (by David Pilato) -* https://github.com/obazoud/elasticsearch-river-git[Git River Plugin] (by Olivier Bazoud) -* https://github.com/uberVU/elasticsearch-river-github[GitHub River Plugin] (by uberVU) -* https://github.com/sksamuel/elasticsearch-river-hazelcast[Hazelcast River Plugin] (by Steve Samuel) -* https://github.com/jprante/elasticsearch-river-jdbc[JDBC River Plugin] (by Jörg Prante) -* https://github.com/qotho/elasticsearch-river-jms[JMS River Plugin] (by Steve Sarandos) -* https://github.com/endgameinc/elasticsearch-river-kafka[Kafka River Plugin] (by Endgame Inc.) -* https://github.com/mariamhakobyan/elasticsearch-river-kafka[Kafka River Plugin 2] (by Mariam Hakobyan) -* https://github.com/tlrx/elasticsearch-river-ldap[LDAP River Plugin] (by Tanguy Leroux) -* https://github.com/richardwilly98/elasticsearch-river-mongodb/[MongoDB River Plugin] (by Richard Louapre) -* https://github.com/sksamuel/elasticsearch-river-neo4j[Neo4j River Plugin] (by Steve Samuel) -* https://github.com/jprante/elasticsearch-river-oai/[Open Archives Initiative (OAI) River Plugin] (by Jörg Prante) -* https://github.com/sksamuel/elasticsearch-river-redis[Redis River Plugin] (by Steve Samuel) -* https://github.com/rethinkdb/elasticsearch-river-rethinkdb[RethinkDB River Plugin] (by RethinkDB) -* http://dadoonet.github.com/rssriver/[RSS River Plugin] (by David Pilato) -* https://github.com/adamlofts/elasticsearch-river-sofa[Sofa River Plugin] (by adamlofts) -* https://github.com/javanna/elasticsearch-river-solr/[Solr River Plugin] (by Luca Cavanna) -* https://github.com/sunnygleason/elasticsearch-river-st9[St9 River Plugin] (by Sunny Gleason) -* https://github.com/plombard/SubversionRiver[Subversion River Plugin] (by Pascal Lombard) -* https://github.com/kzwang/elasticsearch-river-dynamodb[DynamoDB River Plugin] (by Kevin Wang) -* https://github.com/salyh/elasticsearch-river-imap[IMAP/POP3 Email River Plugin] (by Hendrik Saly) -* https://github.com/codelibs/elasticsearch-river-web[Web River Plugin] (by CodeLibs Project) -* https://github.com/eea/eea.elasticsearch.river.rdf[EEA ElasticSearch RDF River Plugin] (by the European Environment Agency) -* https://github.com/lbroudoux/es-amazon-s3-river[Amazon S3 River Plugin] (by Laurent Broudoux) -* https://github.com/lbroudoux/es-google-drive-river[Google Drive River Plugin] (by Laurent Broudoux) - [float] [[transport]] ==== Transport Plugins