From 9617e099d79920b1ffd40555c036f3826ea96e6b Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 25 Jan 2017 19:01:21 -0500 Subject: [PATCH 1/3] Fixing integration test. --- .../components/ConfigUploadComponent.java | 46 +++++++++++++++++++ .../integration/IndexingIntegrationTest.java | 43 ++++++++++++++++- 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java index a819105016..3338770058 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.function.Function; import static org.apache.metron.common.configuration.ConfigurationsUtils.*; @@ -38,6 +39,7 @@ public class ConfigUploadComponent implements InMemoryComponent { private String enrichmentConfigsPath; private String indexingConfigsPath; private String profilerConfigPath; + private Optional> postStartCallback = Optional.empty(); private Optional globalConfig = Optional.empty(); private Map parserSensorConfigs = new HashMap<>(); public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) { @@ -78,6 +80,47 @@ public ConfigUploadComponent withGlobalConfig(String globalConfig) { return this; } + public ConfigUploadComponent withPostStartCallback(Function f) { + this.postStartCallback = Optional.ofNullable(f); + return this; + } + + public Properties getTopologyProperties() { + return topologyProperties; + } + + public String getGlobalConfigPath() { + return globalConfigPath; + } + + public String getParserConfigsPath() { + return parserConfigsPath; + } + + public String getEnrichmentConfigsPath() { + return enrichmentConfigsPath; + } + + public String getIndexingConfigsPath() { + return indexingConfigsPath; + } + + public String getProfilerConfigPath() { + return profilerConfigPath; + } + + public Optional> getPostStartCallback() { + return postStartCallback; + } + + public Optional getGlobalConfig() { + return globalConfig; + } + + public Map getParserSensorConfigs() { + return parserSensorConfigs; + } + @Override public void start() throws UnableToStartException { try { @@ -99,6 +142,9 @@ public void start() throws UnableToStartException { if(globalConfig.isPresent()) { writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl); } + if(postStartCallback.isPresent()) { + postStartCallback.get().apply(this); + } } catch (Exception e) { throw new UnableToStartException(e.getMessage(), e); diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index f54d7914c5..54c9f75b4b 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.apache.curator.framework.CuratorFramework; import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.interfaces.FieldNameConverter; import org.apache.metron.common.spout.kafka.SpoutConfig; import org.apache.metron.common.utils.JSONUtils; @@ -37,13 +39,18 @@ import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.integration.utils.TestUtils; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; +import org.apache.zookeeper.KeeperException; import org.junit.Assert; import org.junit.Test; import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient; public abstract class IndexingIntegrationTest extends BaseIntegrationTest { protected String hdfsDir = "target/indexingIntegrationTest/hdfs"; @@ -139,11 +146,22 @@ public void test() throws Exception { inputDocs.add(m); } + final AtomicBoolean isLoaded = new AtomicBoolean(false); ConfigUploadComponent configUploadComponent = new ConfigUploadComponent() .withTopologyProperties(topologyProperties) .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH) .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH) .withIndexingConfigsPath(TestConstants.SAMPLE_CONFIG_PATH) + .withPostStartCallback(component -> { + try { + waitForIndex(component.getTopologyProperties().getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)); + } catch (Exception e) { + e.printStackTrace(); + } + isLoaded.set(true); + return null; + } + ); ; FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() .withTopologyLocation(new File(fluxPath)) @@ -166,10 +184,11 @@ public void test() throws Exception { runner.start(); try { + while(!isLoaded.get()) { + Thread.sleep(100); + } fluxComponent.submitTopology(); - kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, inputMessages); - StringBuffer buffer = new StringBuffer(); List> docs = cleanDocs(runner.process(getProcessor(inputMessages))); Assert.assertEquals(docs.size(), inputMessages.size()); //assert that our input docs are equivalent to the output docs, converting the input docs keys based @@ -184,6 +203,26 @@ public void test() throws Exception { } } + private void waitForIndex(String zookeeperQuorum) throws Exception { + try(CuratorFramework client = getClient(zookeeperQuorum)) { + client.start(); + byte[] bytes = null; + do { + try { + bytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(testSensorType, client); + Thread.sleep(1000); + } + catch(KeeperException.NoNodeException nne) { + //kindly ignore because the path might not exist just yet. + } + } + while(bytes == null || bytes.length == 0); + return; + } + + + } + public List> cleanDocs(ProcessorResult>> result) { List> docs = result.getResult(); StringBuffer buffer = new StringBuffer(); From b2103fdb735854a375a70809e625044f0f200a29 Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 25 Jan 2017 20:12:21 -0500 Subject: [PATCH 2/3] Updating to react to comments. --- .../components/ConfigUploadComponent.java | 9 +-- .../integration/IndexingIntegrationTest.java | 6 +- .../integration/BaseIntegrationTest.java | 10 +-- .../components/ZKServerComponent.java | 69 ++++++++++--------- 4 files changed, 44 insertions(+), 50 deletions(-) diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java index 3338770058..28965121fb 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.function.Consumer; import java.util.function.Function; import static org.apache.metron.common.configuration.ConfigurationsUtils.*; @@ -39,7 +40,7 @@ public class ConfigUploadComponent implements InMemoryComponent { private String enrichmentConfigsPath; private String indexingConfigsPath; private String profilerConfigPath; - private Optional> postStartCallback = Optional.empty(); + private Optional> postStartCallback = Optional.empty(); private Optional globalConfig = Optional.empty(); private Map parserSensorConfigs = new HashMap<>(); public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) { @@ -80,7 +81,7 @@ public ConfigUploadComponent withGlobalConfig(String globalConfig) { return this; } - public ConfigUploadComponent withPostStartCallback(Function f) { + public ConfigUploadComponent withPostStartCallback(Consumer f) { this.postStartCallback = Optional.ofNullable(f); return this; } @@ -109,7 +110,7 @@ public String getProfilerConfigPath() { return profilerConfigPath; } - public Optional> getPostStartCallback() { + public Optional> getPostStartCallback() { return postStartCallback; } @@ -143,7 +144,7 @@ public void start() throws UnableToStartException { writeGlobalConfigToZookeeper(globalConfig.get().getBytes(), zookeeperUrl); } if(postStartCallback.isPresent()) { - postStartCallback.get().apply(this); + postStartCallback.get().accept(this); } } catch (Exception e) { diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 54c9f75b4b..03ae9ffd8f 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -159,10 +159,9 @@ public void test() throws Exception { e.printStackTrace(); } isLoaded.set(true); - return null; } ); - ; + FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() .withTopologyLocation(new File(fluxPath)) .withTopologyName("test") @@ -217,10 +216,7 @@ private void waitForIndex(String zookeeperQuorum) throws Exception { } } while(bytes == null || bytes.length == 0); - return; } - - } public List> cleanDocs(ProcessorResult>> result) { diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java index 7207d7ab88..7ae373b4d3 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java @@ -33,13 +33,7 @@ protected static KafkaComponent getKafkaComponent(final Properties topologyPrope protected static ZKServerComponent getZKServerComponent(final Properties topologyProperties) { return new ZKServerComponent() - .withPostStartCallback(new Function() { - @Nullable - @Override - public Void apply(@Nullable ZKServerComponent zkComponent) { - topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()); - return null; - } - }); + .withPostStartCallback((zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()) + ); } } diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java index a52ad42a66..57d814bfd5 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ZKServerComponent.java @@ -23,40 +23,43 @@ import org.apache.metron.integration.UnableToStartException; import org.apache.curator.test.TestingServer; import java.util.Map; -public class ZKServerComponent implements InMemoryComponent{ - public static final String ZOOKEEPER_PROPERTY = "kafka.zk"; - private TestingServer testZkServer; - private String zookeeperUrl = null; - private Map properties = null; - private Function postStartCallback; - public String getConnectionString() - { - return this.zookeeperUrl; - } - public ZKServerComponent withPostStartCallback(Function f) { - postStartCallback = f; - return this; - } +import java.util.Optional; +import java.util.function.Consumer; - @Override - public void start() throws UnableToStartException { - try { - testZkServer = new TestingServer(true); - zookeeperUrl = testZkServer.getConnectString(); - if(postStartCallback != null) { - postStartCallback.apply(this); - } - }catch(Exception e){ - throw new UnableToStartException("Unable to start TestingServer",e); - } - } +public class ZKServerComponent implements InMemoryComponent { + public static final String ZOOKEEPER_PROPERTY = "kafka.zk"; + private TestingServer testZkServer; + private String zookeeperUrl = null; + private Map properties = null; + private Optional> postStartCallback = Optional.empty(); + public String getConnectionString() + { + return this.zookeeperUrl; + } + public ZKServerComponent withPostStartCallback(Consumer f) { + postStartCallback = Optional.ofNullable(f); + return this; + } - @Override - public void stop() { - try { - if (testZkServer != null) { - testZkServer.close(); - } - }catch(Exception e){} + @Override + public void start() throws UnableToStartException { + try { + testZkServer = new TestingServer(true); + zookeeperUrl = testZkServer.getConnectString(); + if(postStartCallback.isPresent()) { + postStartCallback.get().accept(this); + } + }catch(Exception e){ + throw new UnableToStartException("Unable to start TestingServer",e); } + } + + @Override + public void stop() { + try { + if (testZkServer != null) { + testZkServer.close(); + } + }catch(Exception e){} + } } From 0f6602ccf86c0cb68d851ab5337693346bb284e4 Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 25 Jan 2017 20:25:09 -0500 Subject: [PATCH 3/3] fixed commit. --- .../pcap/integration/PcapTopologyIntegrationTest.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index cc7f82775d..00cc62d6fb 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -215,14 +215,9 @@ public void testTopology(Function updatePropertiesCallback }}; updatePropertiesCallback.apply(topologyProperties); - final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback(new Function() { - @Nullable - @Override - public Void apply(@Nullable ZKServerComponent zkComponent) { - topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()); - return null; - } - }); + final ZKServerComponent zkServerComponent = new ZKServerComponent().withPostStartCallback( + (zkComponent) -> topologyProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString()) + ); final KafkaComponent kafkaComponent = new KafkaComponent().withTopics(new ArrayList() {{ add(new KafkaComponent.Topic(KAFKA_TOPIC, 1)); }}).withTopologyProperties(topologyProperties);