Skip to content

Commit

Permalink
[STORM] Tests should pick up embedded ES port
Browse files Browse the repository at this point in the history
  • Loading branch information
costin committed Oct 9, 2015
1 parent 65e6586 commit e7639de
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 22 deletions.
Expand Up @@ -37,9 +37,12 @@ public EsEmbeddedServer(String clusterName, String homePath, String dataPath, St
props.setProperty("path.data", dataPath);
props.setProperty("http.port", httpRange);
props.setProperty("transport.tcp.port", transportRange);
props.setProperty("cluster.name", "es.hadoop.test");
props.setProperty("node.local", "true");
//props.setProperty("es.index.store.type", "memory");
// props.setProperty("gateway.type", "none");
if (!hasSlave) {
props.setProperty("discovery.zen.ping.multicast", "false");
props.setProperty("discovery.zen.ping.multicast.enabled", "false");
}
//props.setProperty("script.disable_dynamic", "false");
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.storm.cfg.StormConfigurationOptions;
import org.junit.After;
import org.junit.Before;
Expand All @@ -35,7 +36,7 @@

import com.google.common.collect.ImmutableMap;

import static org.elasticsearch.integration.storm.AbstractStormSuite.*;
import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
Expand All @@ -47,6 +48,7 @@ public abstract class AbstractStormBoltTests {
public AbstractStormBoltTests(Map conf, String index) {
this.conf = conf;
this.index = index;
conf.putAll(TestSettings.TESTING_PROPS);
}

@Before
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;

import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.unit.TimeValue;
import org.elasticsearch.storm.EsBolt;
Expand All @@ -33,16 +32,16 @@

import com.google.common.collect.ImmutableList;

import static org.junit.Assert.*;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import static org.elasticsearch.integration.storm.AbstractStormSuite.*;
import static org.hamcrest.CoreMatchers.*;
import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED;
import static org.hamcrest.CoreMatchers.containsString;

public class AbstractStormJsonSimpleBoltTests extends AbstractStormBoltTests {

public AbstractStormJsonSimpleBoltTests(Map conf, String index) {
super(conf, index);
conf.put(ConfigurationOptions.ES_INPUT_JSON, "true");
}

@Test
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;

import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.hadoop.util.unit.TimeValue;
import org.elasticsearch.storm.EsBolt;
import org.junit.Test;
Expand All @@ -33,15 +34,17 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import static org.junit.Assert.*;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import static org.elasticsearch.integration.storm.MultiIndexSpoutStormSuite.*;
import static org.hamcrest.CoreMatchers.*;
import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED;
import static org.hamcrest.CoreMatchers.containsString;

public class AbstractStormSimpleBoltTests extends AbstractStormBoltTests {

public AbstractStormSimpleBoltTests(Map conf, String index) {
super(conf, index);
conf.putAll(TestSettings.TESTING_PROPS);
}

@Test
Expand Down
Expand Up @@ -21,8 +21,10 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.storm.cfg.StormConfigurationOptions;
import org.junit.After;
import org.junit.Before;
Expand All @@ -32,7 +34,7 @@

import com.google.common.collect.ImmutableMap;

import static org.elasticsearch.integration.storm.AbstractStormSuite.*;
import static org.elasticsearch.integration.storm.AbstractStormSuite.COMPONENT_HAS_COMPLETED;

@RunWith(Parameterized.class)
public abstract class AbstractStormSpoutTests {
Expand All @@ -43,6 +45,8 @@ public abstract class AbstractStormSpoutTests {
public AbstractStormSpoutTests(Map conf, String index) {
this.conf = conf;
this.index = index;
new TestSettings();
conf.putAll(TestSettings.TESTING_PROPS);
}

@Before
Expand All @@ -60,19 +64,19 @@ public void destroy() {
@Parameters
public static Collection<Object[]> configs() throws IOException {
// no ack
Map noAck = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.FALSE.toString());
Map noAck = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.FALSE.toString()));

// read ack
Map ack = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString());
Map ack = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString()));

// read ack bounded queue
Map ackWithSize = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(), StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_QUEUE_SIZE, "1");
Map ackWithSize = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(), StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_QUEUE_SIZE, "1"));

// read ack bounded queue with no retries
Map ackWithSizeNoRetries = ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(),
Map ackWithSizeNoRetries = new LinkedHashMap(ImmutableMap.of(StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE, Boolean.TRUE.toString(),
StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_QUEUE_SIZE, "1",
StormConfigurationOptions.ES_STORM_SPOUT_RELIABLE_RETRIES_PER_TUPLE, "1",
StormConfigurationOptions.ES_STORM_SPOUT_FIELDS, "message");
StormConfigurationOptions.ES_STORM_SPOUT_FIELDS, "message"));

return Arrays.asList(new Object[][] {
{ noAck, "storm-spout" },
Expand Down
Expand Up @@ -64,17 +64,13 @@ protected void after() {
@Override
protected void before() throws Throwable {

Properties props = TestSettings.TESTING_PROPS;
copyPropertiesIntoCfg(cfg);

for (String property : TestSettings.TESTING_PROPS.stringPropertyNames()) {
cfg.put(property, props.get(property));
}

String stormMode = props.getProperty("storm", "local");
String stormMode = TestSettings.TESTING_PROPS.getProperty("storm", "local");

isLocal = "local".equals(stormMode);
//cfg.setDebug(true);
cfg.setNumWorkers(Integer.parseInt(props.getProperty("storm.numworkers", "2")));
cfg.setNumWorkers(Integer.parseInt(TestSettings.TESTING_PROPS.getProperty("storm.numworkers", "2")));

stormCluster = new LocalCluster();
}
Expand All @@ -94,6 +90,14 @@ protected void after() {
}
};

private static void copyPropertiesIntoCfg(Config cfg) {
Properties props = TestSettings.TESTING_PROPS;

for (String property : props.stringPropertyNames()) {
cfg.put(property, props.get(property));
}
}

public static void run(final String name, final StormTopology topo, final Counter hasCompleted) throws Exception {
Thread th = new Thread(new Runnable() {
@Override
Expand All @@ -108,6 +112,8 @@ public void run() {
}
}, "test-storm-runner");
th.setDaemon(true);

copyPropertiesIntoCfg(cfg);
th.start();
}

Expand Down

0 comments on commit e7639de

Please sign in to comment.