diff --git a/mr/src/itest/java/org/elasticsearch/hadoop/EsEmbeddedServer.java b/mr/src/itest/java/org/elasticsearch/hadoop/EsEmbeddedServer.java index cf02723e9..58bf7d5ea 100644 --- a/mr/src/itest/java/org/elasticsearch/hadoop/EsEmbeddedServer.java +++ b/mr/src/itest/java/org/elasticsearch/hadoop/EsEmbeddedServer.java @@ -37,9 +37,12 @@ public EsEmbeddedServer(String clusterName, String homePath, String dataPath, St props.setProperty("path.data", dataPath); props.setProperty("http.port", httpRange); props.setProperty("transport.tcp.port", transportRange); + props.setProperty("cluster.name", "es.hadoop.test"); + props.setProperty("node.local", "true"); //props.setProperty("es.index.store.type", "memory"); // props.setProperty("gateway.type", "none"); if (!hasSlave) { + props.setProperty("discovery.zen.ping.multicast", "false"); props.setProperty("discovery.zen.ping.multicast.enabled", "false"); } //props.setProperty("script.disable_dynamic", "false"); diff --git a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormBoltTests.java b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormBoltTests.java index f06e8e03e..272203660 100644 --- a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormBoltTests.java +++ b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormBoltTests.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; +import org.elasticsearch.hadoop.util.TestSettings; import org.elasticsearch.storm.cfg.StormConfigurationOptions; import org.junit.After; import org.junit.Before; @@ -35,7 +36,7 @@ import com.google.common.collect.ImmutableMap; -import static org.elasticsearch.integration.storm.AbstractStormSuite.*; +import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) @@ -47,6 +48,7 @@ public abstract class AbstractStormBoltTests { public AbstractStormBoltTests(Map conf, String index) { this.conf = conf; this.index = index; + conf.putAll(TestSettings.TESTING_PROPS); } @Before diff --git a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormJsonSimpleBoltTests.java b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormJsonSimpleBoltTests.java index d8dafa055..3cfb11413 100644 --- a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormJsonSimpleBoltTests.java +++ b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormJsonSimpleBoltTests.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; -import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.mr.RestUtils; import org.elasticsearch.hadoop.util.unit.TimeValue; import org.elasticsearch.storm.EsBolt; @@ -33,16 +32,16 @@ import com.google.common.collect.ImmutableList; -import static org.junit.Assert.*; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; -import static org.elasticsearch.integration.storm.AbstractStormSuite.*; -import static org.hamcrest.CoreMatchers.*; +import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED; +import static org.hamcrest.CoreMatchers.containsString; public class AbstractStormJsonSimpleBoltTests extends AbstractStormBoltTests { public AbstractStormJsonSimpleBoltTests(Map conf, String index) { super(conf, index); - conf.put(ConfigurationOptions.ES_INPUT_JSON, "true"); } @Test diff --git a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSimpleBoltTests.java b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSimpleBoltTests.java index b7d13f1ab..7b1ed4333 100644 --- a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSimpleBoltTests.java +++ b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSimpleBoltTests.java @@ -23,6 +23,7 @@ import java.util.Map; import org.elasticsearch.hadoop.mr.RestUtils; +import org.elasticsearch.hadoop.util.TestSettings; import org.elasticsearch.hadoop.util.unit.TimeValue; import org.elasticsearch.storm.EsBolt; import org.junit.Test; @@ -33,15 +34,17 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import static org.junit.Assert.*; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; -import static org.elasticsearch.integration.storm.MultiIndexSpoutStormSuite.*; -import static org.hamcrest.CoreMatchers.*; +import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED; +import static org.hamcrest.CoreMatchers.containsString; public class AbstractStormSimpleBoltTests extends AbstractStormBoltTests { public AbstractStormSimpleBoltTests(Map conf, String index) { super(conf, index); + conf.putAll(TestSettings.TESTING_PROPS); } @Test diff --git a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSpoutTests.java b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSpoutTests.java index 04d79ae1e..27adeee56 100644 --- a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSpoutTests.java +++ b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSpoutTests.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.Map; +import org.elasticsearch.hadoop.util.TestSettings; import org.elasticsearch.storm.cfg.StormConfigurationOptions; import org.junit.After; import org.junit.Before; @@ -32,7 +34,7 @@ import com.google.common.collect.ImmutableMap; -import static org.elasticsearch.integration.storm.AbstractStormSuite.*; +import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED; @RunWith(Parameterized.class) public abstract class AbstractStormSpoutTests { @@ -43,6 +45,8 @@ public abstract class AbstractStormSpoutTests { public AbstractStormSpoutTests(Map conf, String index) { this.conf = conf; this.index = index; + new TestSettings(); + conf.putAll(TestSettings.TESTING_PROPS); } @Before @@ -60,19 +64,19 @@ public void destroy() { @Parameters public static Collection configs() throws IOException { // no ack - Map noAck = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.FALSE.toString()); + Map noAck = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.FALSE.toString())); // read ack - Map ack = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString()); + Map ack = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString())); // read ack bounded queue - Map ackWithSize = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(), StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_QUEUE_SIZE, "1"); + Map ackWithSize = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(), StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_QUEUE_SIZE, "1")); // read ack bounded queue with no retries - Map ackWithSizeNoRetries = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(), + Map ackWithSizeNoRetries = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(), StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_QUEUE_SIZE, "1", StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_RETRIES_PER_TUPLE, "1", - StormConfigurationOptions.ES_STORM_SPOUT_FIELDS, "message"); + StormConfigurationOptions.ES_STORM_SPOUT_FIELDS, "message")); return Arrays.asList(new Object[][] { { noAck, "storm-spout" }, diff --git a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSuite.java b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSuite.java index b711d5153..390792294 100644 --- a/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSuite.java +++ b/storm/src/itest/java/org/elasticsearch/integration/storm/AbstractStormSuite.java @@ -64,17 +64,13 @@ protected void after() { @Override protected void before() throws Throwable { - Properties props = TestSettings.TESTING_PROPS; + copyPropertiesIntoCfg(cfg); - for (String property : TestSettings.TESTING_PROPS.stringPropertyNames()) { - cfg.put(property, props.get(property)); - } - - String stormMode = props.getProperty("storm", "local"); + String stormMode = TestSettings.TESTING_PROPS.getProperty("storm", "local"); isLocal = "local".equals(stormMode); //cfg.setDebug(true); - cfg.setNumWorkers(Integer.parseInt(props.getProperty("storm.numworkers", "2"))); + cfg.setNumWorkers(Integer.parseInt(TestSettings.TESTING_PROPS.getProperty("storm.numworkers", "2"))); stormCluster = new LocalCluster(); } @@ -94,6 +90,14 @@ protected void after() { } }; + private static void copyPropertiesIntoCfg(Config cfg) { + Properties props = TestSettings.TESTING_PROPS; + + for (String property : props.stringPropertyNames()) { + cfg.put(property, props.get(property)); + } + } + public static void run(final String name, final StormTopology topo, final Counter hasCompleted) throws Exception { Thread th = new Thread(new Runnable() { @Override @@ -108,6 +112,8 @@ public void run() { } }, "test-storm-runner"); th.setDaemon(true); + + copyPropertiesIntoCfg(cfg); th.start(); }