From 28f75bdf848018c16b32b16adea184ac85d3fd05 Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 27 Jan 2016 10:22:01 -0500 Subject: [PATCH 01/11] Updating gitignore to not include intellij cruft. --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index db744ea2af..96b0982069 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ opensoc-streaming/OpenSOC-Topologies/target/ *~ target *dependency-reduced-pom.xml +.idea +*.iml From eaa3514a317723fa0b9fd40e155796d07afc3121 Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 28 Jan 2016 12:43:22 -0500 Subject: [PATCH 02/11] First cut at integration testing framework with in memory Flux and elastic search components. Also, an integration test for the PCap topology. --- .../test/converters/BinaryConverters.java | 23 +++ .../test/converters/HexStringConverter.java | 16 ++ .../metron/test/converters/IConverter.java | 8 + .../metron/test/filereaders/FileReader.java | 17 +- .../test/spouts/GenericInternalTestSpout.java | 19 +- .../Metron_Configs/topologies/pcap/local.yaml | 7 +- .../resources/SampleInput/PCAPExampleOutput | 15 ++ .../integration/pcap/PcapIntegrationTest.java | 156 ++++++++++++++++ .../integration/util/UnitTestHelper.java | 51 ++++++ .../util/integration/ComponentRunner.java | 113 ++++++++++++ .../util/integration/InMemoryComponent.java | 9 + .../util/integration/Processor.java | 9 + .../util/integration/ReadinessState.java | 8 + .../integration/UnableToStartException.java | 13 ++ .../components/ElasticSearchComponent.java | 166 ++++++++++++++++++ .../components/FluxTopologyComponent.java | 114 ++++++++++++ 16 files changed, 731 insertions(+), 13 deletions(-) create mode 100644 metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java create mode 100644 metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java create mode 100644 metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java new file mode 100644 index 0000000000..605f6c9a01 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java @@ -0,0 +1,23 @@ +package org.apache.metron.test.converters; + +/** + * Created by cstella on 1/27/16. + */ +public enum BinaryConverters implements IConverter { + DEFAULT(new IConverter() { + + public byte[] convert(String s) { + return s.getBytes(); + } + }) + , FROM_HEX_STRING(new HexStringConverter()); + IConverter _underlying; + BinaryConverters(IConverter i) { + _underlying = i; + } + + public byte[] convert(String s) { + return _underlying.convert(s); + } + +} diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java new file mode 100644 index 0000000000..cfc001a305 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java @@ -0,0 +1,16 @@ +package org.apache.metron.test.converters; + +/** + * Created by cstella on 1/27/16. + */ +public class HexStringConverter implements IConverter { + public byte[] convert(String s) { + int len = s.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + + Character.digit(s.charAt(i+1), 16)); + } + return data; + } +} diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java new file mode 100644 index 0000000000..502a886f66 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java @@ -0,0 +1,8 @@ +package org.apache.metron.test.converters; + +/** + * Created by cstella on 1/27/16. + */ +public interface IConverter { + public byte[] convert(String s); +} diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java index 998e8c9fd2..96f09696b1 100644 --- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java @@ -17,11 +17,7 @@ package org.apache.metron.test.filereaders; -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.util.LinkedList; import java.util.List; @@ -33,9 +29,14 @@ public List readFromFile(String filename) throws IOException List lines = new LinkedList(); - InputStream stream = Thread.currentThread().getContextClassLoader() - .getResourceAsStream(filename); - + InputStream stream = null; + if(new File(filename).exists()) { + stream = new FileInputStream(filename); + } + else { + stream = Thread.currentThread().getContextClassLoader() + .getResourceAsStream(filename); + } DataInputStream in = new DataInputStream(stream); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String strLine; diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java index 19e4f37b87..53f2a508f7 100644 --- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java @@ -17,10 +17,13 @@ package org.apache.metron.test.spouts; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.metron.test.converters.BinaryConverters; +import org.apache.metron.test.converters.IConverter; import org.apache.metron.test.filereaders.FileReader; import backtype.storm.spout.SpoutOutputCollector; @@ -47,6 +50,7 @@ public class GenericInternalTestSpout extends BaseRichSpout { private boolean _repeating = true; private SpoutOutputCollector _collector; + private IConverter _converter; private FileReader Reader; private int cnt = 0; @@ -67,6 +71,16 @@ public GenericInternalTestSpout withRepeating(Boolean repeating) return this; } + public GenericInternalTestSpout withBinaryConverter(String converter) { + if(converter == null) { + _converter = BinaryConverters.DEFAULT; + } + else { + _converter = BinaryConverters.valueOf(converter); + } + return this; + } + @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, @@ -77,8 +91,7 @@ public void open(Map conf, TopologyContext context, Reader = new FileReader(); jsons = Reader.readFromFile(_filename); - - } catch (IOException e) + } catch (Throwable e) { System.out.println("Could not read sample JSONs"); e.printStackTrace(); @@ -91,7 +104,7 @@ public void nextTuple() { if(cnt < jsons.size()) { - _collector.emit(new Values(jsons.get(cnt).getBytes())); + _collector.emit(new Values(_converter.convert(jsons.get(cnt)))); } cnt ++; diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml index 49c4bf268b..003bd2e21f 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml @@ -70,12 +70,15 @@ spouts: className: "org.apache.metron.test.spouts.GenericInternalTestSpout" parallelism: 1 configMethods: + - name: "withBinaryConverter" + args: + - "${pcap.binary.converter}" - name: "withFilename" args: - - "SampleInput/PCAPExampleOutput" + - "${input.path}SampleInput/PCAPExampleOutput" - name: "withRepeating" args: - - true + - ${testing.repeating} bolts: - id: "parserBolt" diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput index e69de29bb2..2de0edc4ee 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput +++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput @@ -0,0 +1,15 @@ +d4c3b2a1020004000000000000000000ffff000001000000c637a956600004004a0000004a0000005254001235030800277f932d08004500003cf4c7400040112dd80a00020f0a0002038f7000350028184b78fc010000010000000000000377777706676f6f676c6503636f6d0000010001 +d4c3b2a1020004000000000000000000ffff000001000000c637a9564d5304009a0000009a0000000800277f932d52540012350208004500008ca6f800004011bb570a0002030a00020f00358f7000782c3478fc818000010005000000000377777706676f6f676c6503636f6d0000010001c00c000100010000009f00044a7de232c00c000100010000009f00044a7de231c00c000100010000009f00044a7de234c00c000100010000009f00044a7de230c00c000100010000009f00044a7de233 +d4c3b2a1020004000000000000000000ffff000001000000c637a9566462040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800653e380a0001c637a956000000002855040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c637a95658ce040062000000620000000800277f932d525400123502080045000054a6f940003f015bf14a7de2320a00020f00006d3e380a0001c637a956000000002855040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c637a9566dd4040056000000560000005254001235030800277f932d080045000048f4ff400040112d940a00020f0a000203ebbd0035003418579ef201000001000000000000023530033232360331323502373407696e2d61646472046172706100000c0001 +d4c3b2a1020004000000000000000000ffff000001000000c637a956966b05007d0000007d0000000800277f932d52540012350208004500006fa6fa00004011bb720a0002030a00020f0035ebbd005b5ff09ef281800001000100000000023530033232360331323502373407696e2d61646472046172706100000c0001c00c000c000100005591001b0f6c676131357334332d696e2d663138053165313030036e657400 +d4c3b2a1020004000000000000000000ffff000001000000c737a9569562040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800e837380a0002c737a95600000000a45a040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c737a95639e2040062000000620000000800277f932d525400123502080045000054a6fb40003f015bef4a7de2320a00020f0000f037380a0002c737a95600000000a45a040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c837a9565360040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800d538380a0003c837a95600000000b658040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c837a956a6da040062000000620000000800277f932d525400123502080045000054a6fc40003f015bee4a7de2320a00020f0000dd38380a0003c837a95600000000b658040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c937a9563d64040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800a331380a0004c937a95600000000e75e040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c937a9560fd5040062000000620000000800277f932d525400123502080045000054a6fd40003f015bed4a7de2320a00020f0000ab31380a0004c937a95600000000e75e040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000ca37a956fb66040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800132d380a0005ca37a956000000007662040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000ca37a95613e8040062000000620000000800277f932d525400123502080045000054a6fe40003f015bec4a7de2320a00020f00001b2d380a0005ca37a956000000007662040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000cb37a956c4f703002a0000002a0000005254001235030800277f932d08060001080006040001 \ No newline at end of file diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java new file mode 100644 index 0000000000..171c30626e --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java @@ -0,0 +1,156 @@ +package org.apache.metron.integration.pcap; + +import org.apache.metron.integration.util.integration.Processor; +import org.apache.metron.integration.util.integration.ComponentRunner; +import org.apache.metron.integration.util.UnitTestHelper; +import org.apache.metron.integration.util.integration.ReadinessState; +import org.apache.metron.integration.util.integration.components.ElasticSearchComponent; +import org.apache.metron.integration.util.integration.components.FluxTopologyComponent; +import org.apache.metron.parsing.parsers.PcapParser; +import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.test.converters.HexStringConverter; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * Created by cstella on 1/27/16. + */ +public class PcapIntegrationTest { + + private String topologiesDir = "src/main/resources/Metron_Configs/topologies"; + private String targetDir = "target"; + + + @Test + public void testTopology() throws Exception { + if(!new File(topologiesDir).exists()) { + topologiesDir = UnitTestHelper.findDir("topologies"); + } + if(!new File(targetDir).exists()) { + targetDir = UnitTestHelper.findDir("target"); + } + Assert.assertNotNull(topologiesDir); + Assert.assertNotNull(targetDir); + final Set expectedPcapIds= getExpectedPcap(new File(topologiesDir + "/../../SampleInput/PCAPExampleOutput")); + Assert.assertTrue("Expected non-zero number of PCAP Ids from the sample data", expectedPcapIds.size() > 0); + System.out.println("Using topologies directory: " + topologiesDir); + + + ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(targetDir + "/elasticsearch")) + .build(); + Properties topologyProperties = new Properties() {{ + setProperty("input.path", "src/main/resources/"); + setProperty("es.port", "9300"); + setProperty("es.ip", "localhost"); + setProperty("es.clustername", "metron"); + setProperty("pcap.binary.converter", "FROM_HEX_STRING"); + setProperty("testing.repeating", "false"); + setProperty("org.apache.metron.metrics.reporter.graphite", "false"); + setProperty("org.apache.metron.metrics.reporter.console", "false"); + setProperty("org.apache.metron.metrics.reporter.jmx", "false"); + setProperty("org.apache.metron.metrics.TelemetryParserBolt.acks","true"); + setProperty("org.apache.metron.metrics.TelemetryParserBolt.emits", "true"); + setProperty("org.apache.metron.metrics.TelemetryParserBolt.fails","true"); + setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.acks","true"); + setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.emits","true"); + setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.fails","true"); + setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.acks", "true"); + setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.emits","true"); + setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.fails","true"); + }}; + FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() + .withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml")) + .withTopologyName("pcap") + .withTopologyProperties(topologyProperties) + .build(); + ComponentRunner runner = new ComponentRunner.Builder() + .withComponent("elasticsearch", esComponent) + .withComponent("storm", fluxComponent) + .build(); + + final String index = getIndex(); + runner.start(); + List> docs = + runner.process(new Processor>> () { + List> docs = null; + public ReadinessState process(ComponentRunner runner){ + ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class); + if(elasticSearchComponent.hasIndex(index)) { + try { + docs = elasticSearchComponent.getAllIndexedDocs(index); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve indexed documents.", e); + } + if(docs.size() < expectedPcapIds.size()) { + return ReadinessState.NOT_READY; + } + else { + return ReadinessState.READY; + } + } + else { + return ReadinessState.NOT_READY; + } + } + + public List> getResult() { + return docs; + } + }); + checkDocuments(expectedPcapIds, docs); + runner.stop(); + } + + private static void checkDocuments(Set expectedPcapIds, List> documents) { + + boolean mismatch = false; + Set indexedPcapIds = new HashSet(); + for(Map doc : documents) { + String indexedId = (String)doc.get("pcap_id"); + indexedPcapIds.add(indexedId); + if(!expectedPcapIds.contains(indexedId)) { + mismatch = true; + System.out.println("Indexed PCAP ID that I did not expect: " + indexedId); + } + } + for(String expectedId : expectedPcapIds) { + if(!indexedPcapIds.contains(expectedId)) { + mismatch = true; + System.out.println("Expected PCAP ID that I did not index: " + expectedId); + } + } + Assert.assertFalse(mismatch); + } + + private static Set getExpectedPcap(File rawFile) throws IOException { + Set ret = new HashSet(); + BufferedReader br = new BufferedReader(new FileReader(rawFile)); + for(String line = null; (line = br.readLine()) != null;) { + byte[] pcapBytes = new HexStringConverter().convert(line); + List list = PcapParser.parse(pcapBytes); + for(PacketInfo pi : list) { + String string_pcap = pi.getJsonIndexDoc(); + Object obj= JSONValue.parse(string_pcap); + JSONObject header=(JSONObject)obj; + ret.add((String)header.get("pcap_id")); + } + } + return ret; + } + + + private static String getIndex() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd.hh"); + Date d = new Date(); + return "pcap_index_" + sdf.format(d); + } + +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java new file mode 100644 index 0000000000..64c5466fbb --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java @@ -0,0 +1,51 @@ +package org.apache.metron.integration.util; + +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +import java.io.File; +import java.util.Stack; + +/** + * Created by cstella on 1/28/16. + */ +public class UnitTestHelper { + public static String findDir(String name) { + return findDir(new File("."), name); + } + + public static String findDir(File startDir, String name) { + Stack s = new Stack(); + s.push(startDir); + while(!s.empty()) { + File parent = s.pop(); + if(parent.getName().equalsIgnoreCase(name)) { + return parent.getAbsolutePath(); + } + else { + File[] children = parent.listFiles(); + if(children != null) { + for (File child : children) { + s.push(child); + } + } + } + } + return null; + } + + public static void verboseLogging() { + verboseLogging("%d [%p|%c|%C{1}] %m%n", Level.ALL); + } + public static void verboseLogging(String pattern, Level level) { + ConsoleAppender console = new ConsoleAppender(); //create appender + //configure the appender + console.setLayout(new PatternLayout(pattern)); + console.setThreshold(level); + console.activateOptions(); + //add appender to any Logger (here is root) + Logger.getRootLogger().addAppender(console); + } +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java new file mode 100644 index 0000000000..da3344f541 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java @@ -0,0 +1,113 @@ +package org.apache.metron.integration.util.integration; + +import backtype.storm.utils.Utils; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Created by cstella on 1/27/16. + */ +public class ComponentRunner { + public static class Builder { + LinkedHashMap components; + String[] startupOrder; + String[] shutdownOrder; + public Builder() { + components = new LinkedHashMap(); + } + + public Builder withComponent(String name, InMemoryComponent component) { + components.put(name, component); + return this; + } + + public Builder withCustomStartupOrder(String[] startupOrder) { + this.startupOrder = startupOrder; + return this; + } + public Builder withCustomShutdownOrder(String[] shutdownOrder) { + this.shutdownOrder = shutdownOrder; + return this; + } + private static String[] toOrderedList(Map components) { + String[] ret = new String[components.size()]; + int i = 0; + for(String component : components.keySet()) { + ret[i++] = component; + } + return ret; + } + public ComponentRunner build() { + if(shutdownOrder == null) { + shutdownOrder = toOrderedList(components); + } + if(startupOrder == null) { + startupOrder = toOrderedList(components); + } + return new ComponentRunner(components, startupOrder, shutdownOrder); + } + + } + + LinkedHashMap components; + String[] startupOrder; + String[] shutdownOrder; + public ComponentRunner( LinkedHashMap components + , String[] startupOrder + , String[] shutdownOrder + ) + { + this.components = components; + this.startupOrder = startupOrder; + this.shutdownOrder = shutdownOrder; + + } + + public T getComponent(String name, Class clazz) { + return clazz.cast(getComponents().get(name)); + } + + public LinkedHashMap getComponents() { + return components; + } + + public void start() throws UnableToStartException { + for(String componentName : startupOrder) { + components.get(componentName).start(); + } + } + public void stop() { + for(String componentName : shutdownOrder) { + components.get(componentName).stop(); + } + } + + public T process(Processor successState) { + return process(successState, 3, 5000, 120000); + } + + public T process(Processor successState, int numRetries, long timeBetweenAttempts, long maxTimeMs) { + int retryCount = 0; + long start = System.currentTimeMillis(); + while(true) { + long duration = System.currentTimeMillis() - start; + if(duration > maxTimeMs) { + throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs); + } + ReadinessState state = successState.process(this); + if(state == ReadinessState.READY) { + return successState.getResult(); + } + else if(state == ReadinessState.NOT_READY) { + retryCount++; + if(numRetries > 0 && retryCount > numRetries) { + throw new RuntimeException("Too many retries: " + retryCount); + } + } + Utils.sleep(timeBetweenAttempts); + } + } + + +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java new file mode 100644 index 0000000000..f0fc0e0889 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java @@ -0,0 +1,9 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public interface InMemoryComponent { + public void start() throws UnableToStartException; + public void stop(); +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java new file mode 100644 index 0000000000..39d4ad666b --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java @@ -0,0 +1,9 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public interface Processor { + ReadinessState process(ComponentRunner runner); + T getResult(); +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java new file mode 100644 index 0000000000..d7bd0ab5b2 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java @@ -0,0 +1,8 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public enum ReadinessState { + READY, NOT_READY; +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java new file mode 100644 index 0000000000..ba287ca0c7 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java @@ -0,0 +1,13 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public class UnableToStartException extends Exception { + public UnableToStartException(String message) { + super(message); + } + public UnableToStartException(String message, Throwable t) { + super(message, t); + } +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java new file mode 100644 index 0000000000..ecf2ed3416 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java @@ -0,0 +1,166 @@ +package org.apache.metron.integration.util.integration.components; + +import org.apache.commons.io.FileUtils; +import org.apache.metron.integration.util.integration.InMemoryComponent; +import org.apache.metron.integration.util.integration.UnableToStartException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.search.SearchHit; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Created by cstella on 1/28/16. + */ +public class ElasticSearchComponent implements InMemoryComponent { + + public static class Builder{ + private int httpPort; + private File indexDir; + private Map extraElasticSearchSettings = null; + public Builder withHttpPort(int httpPort) { + this.httpPort = httpPort; + return this; + } + public Builder withIndexDir(File indexDir) { + this.indexDir = indexDir; + return this; + } + public Builder withExtraElasticSearchSettings(Map extraElasticSearchSettings) { + this.extraElasticSearchSettings = extraElasticSearchSettings; + return this; + } + public ElasticSearchComponent build() { + return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings); + } + } + + private Client client; + private Node node; + private int httpPort; + private File indexDir; + private Map extraElasticSearchSettings; + + public ElasticSearchComponent(int httpPort, File indexDir) { + this(httpPort, indexDir, null); + } + public ElasticSearchComponent(int httpPort, File indexDir, Map extraElasticSearchSettings) { + this.httpPort = httpPort; + this.indexDir = indexDir; + this.extraElasticSearchSettings = extraElasticSearchSettings; + } + public Client getClient() { + return client; + } + + private void cleanDir(File dir) throws IOException { + if(dir.exists()) { + FileUtils.deleteDirectory(dir); + } + dir.mkdirs(); + } + public void start() throws UnableToStartException { + File logDir= new File(indexDir, "/logs"); + File dataDir= new File(indexDir, "/data"); + try { + cleanDir(logDir); + cleanDir(dataDir); + + } catch (IOException e) { + throw new UnableToStartException("Unable to clean log or data directories", e); + } + ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder() + .put("node.http.enabled", true) + .put("http.port", httpPort) + .put("cluster.name", "metron") + .put("path.logs",logDir.getAbsolutePath()) + .put("path.data",dataDir.getAbsolutePath()) + .put("gateway.type", "none") + .put("index.store.type", "memory") + .put("index.number_of_shards", 1) + .put("node.mode", "network") + .put("index.number_of_replicas", 1); + if(extraElasticSearchSettings != null) { + immutableSettings = immutableSettings.put(extraElasticSearchSettings); + } + Settings settings = immutableSettings.build(); + + node = NodeBuilder.nodeBuilder().settings(settings).node(); + node.start(); + settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", "metron").build(); + client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress("localhost", + 9300)); + + waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000)); + } + + public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException { + try { + ClusterHealthResponse healthResponse = + (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name() + + " and not " + status.name() + + ", from here on, everything will fail!"); + } + } catch (ElasticsearchTimeoutException e) { + throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations"); + } + } + + public List> getAllIndexedDocs(String index) throws IOException { + System.out.println(index); + getClient().admin().indices().refresh(new RefreshRequest()); + SearchResponse response = getClient().prepareSearch(index) + .setTypes("pcap_doc") + .setSource("message") + .setFrom(0) + .setSize(1000) + .execute().actionGet(); + List> ret = new ArrayList>(); + for (SearchHit hit : response.getHits()) { + Object o = hit.getSource().get("message"); + ret.add((Map)(o)); + } + return ret; + } + public boolean hasIndex(String indexName) { + Set indices = getClient().admin() + .indices() + .stats(new IndicesStatsRequest()) + .actionGet() + .getIndices() + .keySet(); + return indices.contains(indexName); + + } + + public void stop() { + node.stop(); + node = null; + client = null; + } +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java new file mode 100644 index 0000000000..ebb123096d --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java @@ -0,0 +1,114 @@ +package org.apache.metron.integration.util.integration.components; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.generated.StormTopology; +import org.apache.metron.integration.util.integration.InMemoryComponent; +import org.apache.metron.integration.util.integration.UnableToStartException; +import org.apache.storm.flux.FluxBuilder; +import org.apache.storm.flux.model.ExecutionContext; +import org.apache.storm.flux.model.TopologyDef; +import org.apache.storm.flux.parser.FluxParser; +import org.apache.thrift7.TException; +import org.junit.Assert; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Properties; + +/** + * Created by cstella on 1/28/16. + */ +public class FluxTopologyComponent implements InMemoryComponent { + LocalCluster stormCluster; + String topologyName; + File topologyLocation; + Properties topologyProperties; + + public static class Builder { + String topologyName; + File topologyLocation; + Properties topologyProperties; + public Builder withTopologyName(String name) { + this.topologyName = name; + return this; + } + public Builder withTopologyLocation(File location) { + this.topologyLocation = location; + return this; + } + public Builder withTopologyProperties(Properties properties) { + this.topologyProperties = properties; + return this; + } + + public FluxTopologyComponent build() { + return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties); + } + } + + public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) { + this.topologyName = topologyName; + this.topologyLocation = topologyLocation; + this.topologyProperties = topologyProperties; + } + + public LocalCluster getStormCluster() { + return stormCluster; + } + + public String getTopologyName() { + return topologyName; + } + + public File getTopologyLocation() { + return topologyLocation; + } + + public Properties getTopologyProperties() { + return topologyProperties; + } + + public void start() throws UnableToStartException{ + try { + startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties()); + } catch (Exception e) { + throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e); + } + } + + public void stop() { + stormCluster.shutdown(); + } + private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException { + TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + Assert.assertNotNull(topology); + topology.validate(); + stormCluster = new LocalCluster(); + stormCluster.submitTopology(topologyName, conf, topology); + } + + private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException { + File tmpFile = File.createTempFile(topologyName, "props"); + FileWriter propWriter = null; + try { + propWriter = new FileWriter(tmpFile); + properties.store(propWriter, topologyName + " properties"); + } + finally { + if(propWriter != null) { + propWriter.close(); + return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); + } + + return null; + } + } + + +} From f3c66bd206fcf03e839c5bbec89fbae62f5ffe7d Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 29 Jan 2016 10:40:05 -0500 Subject: [PATCH 03/11] Updating with HBase bolt added back to Flux PCap topology. --- .../org/apache/metron/hbase/Connector.java | 25 +++++++ .../org/apache/metron/hbase/HBaseBolt.java | 53 ++++++++++++-- .../apache/metron/hbase/HTableConnector.java | 12 +++- .../apache/metron/hbase/TupleTableConfig.java | 42 ++++++++++- metron-streaming/Metron-Topologies/pom.xml | 2 +- .../Metron_Configs/topologies/pcap/local.yaml | 36 ++++++++++ .../integration/pcap/PcapIntegrationTest.java | 71 ++++++++++++++----- .../integration/util/UnitTestHelper.java | 19 +++++ .../components/ElasticSearchComponent.java | 1 - .../components/FluxTopologyComponent.java | 8 ++- .../util/mock/MockHBaseConnector.java | 36 ++++++++++ metron-streaming/pom.xml | 8 ++- 12 files changed, 279 insertions(+), 34 deletions(-) create mode 100644 metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHBaseConnector.java diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java new file mode 100644 index 0000000000..33b4eb8ae9 --- /dev/null +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java @@ -0,0 +1,25 @@ +package org.apache.metron.hbase; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.Serializable; + +/** + * Created by cstella on 1/29/16. + */ +public abstract class Connector { + protected TupleTableConfig tableConf; + protected String _quorum; + protected String _port; + + public Connector(final TupleTableConfig conf, String _quorum, String _port) throws IOException { + this.tableConf = conf; + this._quorum = _quorum; + this._port = _port; + } + public abstract void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException; + public abstract void close(); +} diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java index 62e7d48db1..3ccbd60b09 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java @@ -3,8 +3,10 @@ import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.json.simple.JSONObject; @@ -34,10 +36,10 @@ public class HBaseBolt implements IRichBolt { private static final Logger LOG = Logger.getLogger(HBaseBolt.class); protected OutputCollector collector; - protected HTableConnector connector; protected TupleTableConfig conf; protected boolean autoAck = true; - + protected Connector connector; + private String connectorImpl; private String _quorum; private String _port; @@ -45,7 +47,50 @@ public HBaseBolt(TupleTableConfig conf, String quorum, String port) { this.conf = conf; _quorum = quorum; _port = port; + } + public HBaseBolt withConnector(String connectorImpl) { + this.connectorImpl = connectorImpl; + return this; + } + + public Connector createConnector() throws IOException{ + initialize(); + if(connectorImpl == null) { + return new HTableConnector(conf, _quorum, _port); + } + else { + try { + Class clazz = (Class) Class.forName(connectorImpl); + return clazz.getConstructor(TupleTableConfig.class, String.class, String.class).newInstance(conf, _quorum, _port); + } catch (InstantiationException e) { + throw new IOException("Unable to instantiate connector.", e); + } catch (IllegalAccessException e) { + throw new IOException("Unable to instantiate connector: illegal access", e); + } catch (InvocationTargetException e) { + throw new IOException("Unable to instantiate connector", e); + } catch (NoSuchMethodException e) { + throw new IOException("Unable to instantiate connector: no such method", e); + } catch (ClassNotFoundException e) { + throw new IOException("Unable to instantiate connector: class not found", e); + } + } + } + + public void initialize() { + TupleTableConfig hbaseBoltConfig = conf; + String allColumnFamiliesColumnQualifiers = conf.getFields(); + String[] tokenizedColumnFamiliesWithColumnQualifiers = StringUtils + .split(allColumnFamiliesColumnQualifiers, "\\|"); + for (String tokenizedColumnFamilyWithColumnQualifiers : tokenizedColumnFamiliesWithColumnQualifiers) { + String[] cfCqTokens = StringUtils.split( tokenizedColumnFamilyWithColumnQualifiers, ":"); + String columnFamily = cfCqTokens[0]; + String[] columnQualifiers = StringUtils.split(cfCqTokens[1], ","); + for (String columnQualifier : columnQualifiers) { + hbaseBoltConfig.addColumn(columnFamily, columnQualifier); + } + setAutoAck(true); + } } /** {@inheritDoc} */ @@ -55,7 +100,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll this.collector = collector; try { - this.connector = new HTableConnector(conf, _quorum, _port); + this.connector = createConnector(); } catch (IOException e) { @@ -69,7 +114,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll public void execute(Tuple input) { try { - this.connector.getTable().put(conf.getPutFromTuple(input)); + this.connector.put(conf.getPutFromTuple(input)); } catch (IOException ex) { JSONObject error = ErrorGenerator.generateErrorMessage( diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java index 296d0c27f2..b0ff4a8ec6 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java @@ -1,11 +1,14 @@ package org.apache.metron.hbase; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.Serializable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; @@ -18,7 +21,7 @@ * classpath */ @SuppressWarnings("serial") -public class HTableConnector implements Serializable { +public class HTableConnector extends Connector implements Serializable{ private static final Logger LOG = Logger.getLogger(HTableConnector.class); private Configuration conf; @@ -31,6 +34,7 @@ public class HTableConnector implements Serializable { * @throws IOException */ public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException { + super(conf, _quorum, _port); this.tableName = conf.getTableName(); this.conf = HBaseConfiguration.create(); @@ -93,9 +97,15 @@ public HTable getTable() { return table; } + @Override + public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + table.put(put); + } + /** * Close the table */ + @Override public void close() { try { this.table.close(); diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java index 035e9c39ab..d2c789a264 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java @@ -30,7 +30,8 @@ public class TupleTableConfig implements Serializable { private boolean batch = true; protected Durability durability = Durability.USE_DEFAULT; private long writeBufferSize = 0L; - + private String fields; + /** * Initialize configuration * @@ -62,7 +63,42 @@ public TupleTableConfig(final String table, final String rowKeyField, final Stri this.tupleTimestampField = timestampField; this.columnFamilies = new HashMap>(); } - + + public TupleTableConfig() { + this.columnFamilies = new HashMap>(); + } + + public TupleTableConfig withTable(String table) { + this.tableName = table; + return this; + } + + public TupleTableConfig withRowKeyField(String rowKeyField) { + this.tupleRowKeyField = rowKeyField; + return this; + } + + public TupleTableConfig withTimestampField(String timestampField) { + this.tupleTimestampField = timestampField; + return this; + } + + public TupleTableConfig withFields(String fields) { + this.fields = fields; + return this; + } + + public TupleTableConfig withBatch(Boolean isBatch) { + this.batch = isBatch; + return this; + } + + public String getFields() { + return fields; + } + + + /** * Add column family and column qualifier to be extracted from tuple * @@ -221,7 +257,7 @@ public void setBatch(boolean batch) { } /** - * @param setDurability + * @param durability * Sets whether to write to HBase's edit log. *

* Setting to false will mean fewer operations to perform when diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml index 524813ac39..4c5c4ab4d2 100644 --- a/metron-streaming/Metron-Topologies/pom.xml +++ b/metron-streaming/Metron-Topologies/pom.xml @@ -82,7 +82,7 @@ org.apache.kafka - kafka_2.8.2 + kafka_2.9.2 ${global_kafka_version} + + org.apache.kafka + kafka-clients + ${global_kafka_version} + org.apache.kafka kafka_2.9.2 - 0.8.1.1 + ${global_kafka_version} javax.jms diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml b/metron-streaming/Metron-EnrichmentAdapters/pom.xml index 2a40f517db..a37a032bdd 100644 --- a/metron-streaming/Metron-EnrichmentAdapters/pom.xml +++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml @@ -26,7 +26,7 @@ 1.7.7 0.96.1-hadoop2 0.1.2 - 17.0 + ${global_guava_version} diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml b/metron-streaming/Metron-Pcap_Service/pom.xml index 40a9deacfc..79cfb4707f 100644 --- a/metron-streaming/Metron-Pcap_Service/pom.xml +++ b/metron-streaming/Metron-Pcap_Service/pom.xml @@ -10,13 +10,11 @@ Metron Pcap_Service UTF-8 - 1.4.0.2.0.6.0-76 - 2.2.0.2.0.6.0-76 + ${global_flume_version} + ${global_hadoop_version} 1.7 1.7 - 0.9.2-incubating - 0.8.0 - 1.7.5 + ${global_slf4j_version} 3.4.5.2.0.6.0-76 1.2.15 @@ -35,7 +33,7 @@ 2.1.1 - 4.11 + ${global_junit_version} 1.3 1.9.5 1.3.0 diff --git a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java index f995e3f565..d058ccc6bb 100644 --- a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java +++ b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java @@ -1,10 +1,10 @@ package org.apache.metron.pcapservice; -import org.eclipse.jdt.internal.core.Assert; import org.junit.Test; import org.apache.metron.pcapservice.ConfigurationUtil; import org.apache.metron.pcapservice.ConfigurationUtil.SizeUnit; +import org.springframework.util.Assert; /** * The Class ConfigurationUtilTest. diff --git a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java index 6a25599bbb..bb72133876 100644 --- a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java +++ b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java @@ -46,7 +46,7 @@ void initCluster() throws Exception { * Signals that an I/O exception has occurred. */ private void createTable() throws IOException { - testTable = testUtil.createTable("test_pcaps_local", "cf"); + testTable = testUtil.createTable(Bytes.toBytes("test_pcaps_local"), Bytes.toBytes("cf")); System.out.println("after 'test_pcaps_local' table creation "); // create put Put put = new Put(Bytes.toBytes("1111")); // row key =1111 diff --git a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java index 354a3a7b26..2b766742ed 100644 --- a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java +++ b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java @@ -6,7 +6,6 @@ import java.util.Arrays; import java.util.List; -import org.eclipse.jdt.internal.core.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -17,6 +16,7 @@ import org.apache.metron.pcapservice.PcapHelper; import org.apache.metron.pcapservice.PcapHelper.TimeUnit; +import org.springframework.util.Assert; // TODO: Auto-generated Javadoc /** diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java index 53f2a508f7..13f4c99b54 100644 --- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import org.apache.metron.test.converters.BinaryConverters; import org.apache.metron.test.converters.IConverter; import org.apache.metron.test.filereaders.FileReader; @@ -56,6 +58,9 @@ public class GenericInternalTestSpout extends BaseRichSpout { public GenericInternalTestSpout withFilename(String filename) { + if(filename != null && filename.length() > 0 && filename.charAt(0) == '$') { + filename = Iterables.getLast(Splitter.on("}").split(filename)); + } _filename = filename; return this; } diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml index e3a130cbb7..b37f2a0dfa 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml @@ -84,7 +84,24 @@ components: value: true - name: "startOffsetTime" value: -1 - + - id: "hbaseConfig" + className: "org.apache.metron.hbase.TupleTableConfig" + configMethods: + - name: "withFields" + args: + - "${bolt.hbase.table.fields}" + - name: "withTable" + args: + - "${bolt.hbase.table.name}" + - name: "withRowKeyField" + args: + - "${bolt.hbase.table.key.tuple.field.name}" + - name: "withTimestampField" + args: + - "${bolt.hbase.table.timestamp.tuple.field.name}" + - name: "withBatch" + args: + - ${bolt.hbase.enable.batching} spouts: - id: "kafkaSpout" className: "storm.kafka.KafkaSpout" @@ -92,6 +109,12 @@ spouts: - ref: "kafkaConfig" bolts: + - id: "hbaseBolt" + className: "org.apache.metron.hbase.HBaseBolt" + constructorArgs: + - ref: "hbaseConfig" + - "${kafka.zk.list}" + - "${kafka.zk.port}" - id: "parserBolt" className: "org.apache.metron.parsing.PcapParserBolt" configMethods: @@ -164,6 +187,13 @@ streams: to: "parserBolt" grouping: type: SHUFFLE + - name: "parser -> hbase" + from: "parserBolt" + to: "hbaseBolt" + grouping: + streamId: "pcap_data_stream" + type: FIELDS + args: ["pcap_id"] - name: "parser -> indexing" from: "parserBolt" to: "indexingBolt" diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml index 4df6cc165b..d04862dab2 100644 --- a/metron-streaming/pom.xml +++ b/metron-streaming/pom.xml @@ -23,6 +23,7 @@ @ApacheMetron 0.10.0 0.10.0 + 1.7.1 0.8.2.2 2.7.1 1.1.1 @@ -31,7 +32,7 @@ 1.1.1 3.0.2 4.4 - 18.0 + 17.0 2.2.5 1.7.7 From c6d5e1c27a0ae480ff9c7abac73ca94afd260abb Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 27 Jan 2016 10:22:01 -0500 Subject: [PATCH 06/11] Updating gitignore to not include intellij cruft. --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f6c6cc590e..7da5300c2a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ target *.project *.classpath *.settings + From eb144f78fa06e8d5a536495665ae8e3e4972c963 Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 28 Jan 2016 12:43:22 -0500 Subject: [PATCH 07/11] First cut at integration testing framework with in memory Flux and elastic search components. Also, an integration test for the PCap topology. --- .../test/converters/BinaryConverters.java | 23 +++ .../test/converters/HexStringConverter.java | 16 ++ .../metron/test/converters/IConverter.java | 8 + .../metron/test/filereaders/FileReader.java | 17 +- .../test/spouts/GenericInternalTestSpout.java | 19 +- .../Metron_Configs/topologies/pcap/local.yaml | 7 +- .../resources/SampleInput/PCAPExampleOutput | 15 ++ .../integration/pcap/PcapIntegrationTest.java | 156 ++++++++++++++++ .../integration/util/UnitTestHelper.java | 51 ++++++ .../util/integration/ComponentRunner.java | 113 ++++++++++++ .../util/integration/InMemoryComponent.java | 9 + .../util/integration/Processor.java | 9 + .../util/integration/ReadinessState.java | 8 + .../integration/UnableToStartException.java | 13 ++ .../components/ElasticSearchComponent.java | 166 ++++++++++++++++++ .../components/FluxTopologyComponent.java | 114 ++++++++++++ 16 files changed, 731 insertions(+), 13 deletions(-) create mode 100644 metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java create mode 100644 metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java create mode 100644 metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java new file mode 100644 index 0000000000..605f6c9a01 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/BinaryConverters.java @@ -0,0 +1,23 @@ +package org.apache.metron.test.converters; + +/** + * Created by cstella on 1/27/16. + */ +public enum BinaryConverters implements IConverter { + DEFAULT(new IConverter() { + + public byte[] convert(String s) { + return s.getBytes(); + } + }) + , FROM_HEX_STRING(new HexStringConverter()); + IConverter _underlying; + BinaryConverters(IConverter i) { + _underlying = i; + } + + public byte[] convert(String s) { + return _underlying.convert(s); + } + +} diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java new file mode 100644 index 0000000000..cfc001a305 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/HexStringConverter.java @@ -0,0 +1,16 @@ +package org.apache.metron.test.converters; + +/** + * Created by cstella on 1/27/16. + */ +public class HexStringConverter implements IConverter { + public byte[] convert(String s) { + int len = s.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) { + data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + + Character.digit(s.charAt(i+1), 16)); + } + return data; + } +} diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java new file mode 100644 index 0000000000..502a886f66 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/converters/IConverter.java @@ -0,0 +1,8 @@ +package org.apache.metron.test.converters; + +/** + * Created by cstella on 1/27/16. + */ +public interface IConverter { + public byte[] convert(String s); +} diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java index 998e8c9fd2..96f09696b1 100644 --- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/filereaders/FileReader.java @@ -17,11 +17,7 @@ package org.apache.metron.test.filereaders; -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.util.LinkedList; import java.util.List; @@ -33,9 +29,14 @@ public List readFromFile(String filename) throws IOException List lines = new LinkedList(); - InputStream stream = Thread.currentThread().getContextClassLoader() - .getResourceAsStream(filename); - + InputStream stream = null; + if(new File(filename).exists()) { + stream = new FileInputStream(filename); + } + else { + stream = Thread.currentThread().getContextClassLoader() + .getResourceAsStream(filename); + } DataInputStream in = new DataInputStream(stream); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String strLine; diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java index 19e4f37b87..53f2a508f7 100644 --- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java @@ -17,10 +17,13 @@ package org.apache.metron.test.spouts; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.metron.test.converters.BinaryConverters; +import org.apache.metron.test.converters.IConverter; import org.apache.metron.test.filereaders.FileReader; import backtype.storm.spout.SpoutOutputCollector; @@ -47,6 +50,7 @@ public class GenericInternalTestSpout extends BaseRichSpout { private boolean _repeating = true; private SpoutOutputCollector _collector; + private IConverter _converter; private FileReader Reader; private int cnt = 0; @@ -67,6 +71,16 @@ public GenericInternalTestSpout withRepeating(Boolean repeating) return this; } + public GenericInternalTestSpout withBinaryConverter(String converter) { + if(converter == null) { + _converter = BinaryConverters.DEFAULT; + } + else { + _converter = BinaryConverters.valueOf(converter); + } + return this; + } + @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, @@ -77,8 +91,7 @@ public void open(Map conf, TopologyContext context, Reader = new FileReader(); jsons = Reader.readFromFile(_filename); - - } catch (IOException e) + } catch (Throwable e) { System.out.println("Could not read sample JSONs"); e.printStackTrace(); @@ -91,7 +104,7 @@ public void nextTuple() { if(cnt < jsons.size()) { - _collector.emit(new Values(jsons.get(cnt).getBytes())); + _collector.emit(new Values(_converter.convert(jsons.get(cnt)))); } cnt ++; diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml index 49c4bf268b..003bd2e21f 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml @@ -70,12 +70,15 @@ spouts: className: "org.apache.metron.test.spouts.GenericInternalTestSpout" parallelism: 1 configMethods: + - name: "withBinaryConverter" + args: + - "${pcap.binary.converter}" - name: "withFilename" args: - - "SampleInput/PCAPExampleOutput" + - "${input.path}SampleInput/PCAPExampleOutput" - name: "withRepeating" args: - - true + - ${testing.repeating} bolts: - id: "parserBolt" diff --git a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput index e69de29bb2..2de0edc4ee 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput +++ b/metron-streaming/Metron-Topologies/src/main/resources/SampleInput/PCAPExampleOutput @@ -0,0 +1,15 @@ +d4c3b2a1020004000000000000000000ffff000001000000c637a956600004004a0000004a0000005254001235030800277f932d08004500003cf4c7400040112dd80a00020f0a0002038f7000350028184b78fc010000010000000000000377777706676f6f676c6503636f6d0000010001 +d4c3b2a1020004000000000000000000ffff000001000000c637a9564d5304009a0000009a0000000800277f932d52540012350208004500008ca6f800004011bb570a0002030a00020f00358f7000782c3478fc818000010005000000000377777706676f6f676c6503636f6d0000010001c00c000100010000009f00044a7de232c00c000100010000009f00044a7de231c00c000100010000009f00044a7de234c00c000100010000009f00044a7de230c00c000100010000009f00044a7de233 +d4c3b2a1020004000000000000000000ffff000001000000c637a9566462040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800653e380a0001c637a956000000002855040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c637a95658ce040062000000620000000800277f932d525400123502080045000054a6f940003f015bf14a7de2320a00020f00006d3e380a0001c637a956000000002855040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c637a9566dd4040056000000560000005254001235030800277f932d080045000048f4ff400040112d940a00020f0a000203ebbd0035003418579ef201000001000000000000023530033232360331323502373407696e2d61646472046172706100000c0001 +d4c3b2a1020004000000000000000000ffff000001000000c637a956966b05007d0000007d0000000800277f932d52540012350208004500006fa6fa00004011bb720a0002030a00020f0035ebbd005b5ff09ef281800001000100000000023530033232360331323502373407696e2d61646472046172706100000c0001c00c000c000100005591001b0f6c676131357334332d696e2d663138053165313030036e657400 +d4c3b2a1020004000000000000000000ffff000001000000c737a9569562040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800e837380a0002c737a95600000000a45a040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c737a95639e2040062000000620000000800277f932d525400123502080045000054a6fb40003f015bef4a7de2320a00020f0000f037380a0002c737a95600000000a45a040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c837a9565360040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800d538380a0003c837a95600000000b658040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c837a956a6da040062000000620000000800277f932d525400123502080045000054a6fc40003f015bee4a7de2320a00020f0000dd38380a0003c837a95600000000b658040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c937a9563d64040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800a331380a0004c937a95600000000e75e040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000c937a9560fd5040062000000620000000800277f932d525400123502080045000054a6fd40003f015bed4a7de2320a00020f0000ab31380a0004c937a95600000000e75e040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000ca37a956fb66040062000000620000005254001235020800277f932d08004500005400004000400101eb0a00020f4a7de2320800132d380a0005ca37a956000000007662040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000ca37a95613e8040062000000620000000800277f932d525400123502080045000054a6fe40003f015bec4a7de2320a00020f00001b2d380a0005ca37a956000000007662040000000000101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f3031323334353637 +d4c3b2a1020004000000000000000000ffff000001000000cb37a956c4f703002a0000002a0000005254001235030800277f932d08060001080006040001 \ No newline at end of file diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java new file mode 100644 index 0000000000..171c30626e --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java @@ -0,0 +1,156 @@ +package org.apache.metron.integration.pcap; + +import org.apache.metron.integration.util.integration.Processor; +import org.apache.metron.integration.util.integration.ComponentRunner; +import org.apache.metron.integration.util.UnitTestHelper; +import org.apache.metron.integration.util.integration.ReadinessState; +import org.apache.metron.integration.util.integration.components.ElasticSearchComponent; +import org.apache.metron.integration.util.integration.components.FluxTopologyComponent; +import org.apache.metron.parsing.parsers.PcapParser; +import org.apache.metron.pcap.PacketInfo; +import org.apache.metron.test.converters.HexStringConverter; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.junit.Assert; +import org.junit.Test; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.*; + +/** + * Created by cstella on 1/27/16. + */ +public class PcapIntegrationTest { + + private String topologiesDir = "src/main/resources/Metron_Configs/topologies"; + private String targetDir = "target"; + + + @Test + public void testTopology() throws Exception { + if(!new File(topologiesDir).exists()) { + topologiesDir = UnitTestHelper.findDir("topologies"); + } + if(!new File(targetDir).exists()) { + targetDir = UnitTestHelper.findDir("target"); + } + Assert.assertNotNull(topologiesDir); + Assert.assertNotNull(targetDir); + final Set expectedPcapIds= getExpectedPcap(new File(topologiesDir + "/../../SampleInput/PCAPExampleOutput")); + Assert.assertTrue("Expected non-zero number of PCAP Ids from the sample data", expectedPcapIds.size() > 0); + System.out.println("Using topologies directory: " + topologiesDir); + + + ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(targetDir + "/elasticsearch")) + .build(); + Properties topologyProperties = new Properties() {{ + setProperty("input.path", "src/main/resources/"); + setProperty("es.port", "9300"); + setProperty("es.ip", "localhost"); + setProperty("es.clustername", "metron"); + setProperty("pcap.binary.converter", "FROM_HEX_STRING"); + setProperty("testing.repeating", "false"); + setProperty("org.apache.metron.metrics.reporter.graphite", "false"); + setProperty("org.apache.metron.metrics.reporter.console", "false"); + setProperty("org.apache.metron.metrics.reporter.jmx", "false"); + setProperty("org.apache.metron.metrics.TelemetryParserBolt.acks","true"); + setProperty("org.apache.metron.metrics.TelemetryParserBolt.emits", "true"); + setProperty("org.apache.metron.metrics.TelemetryParserBolt.fails","true"); + setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.acks","true"); + setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.emits","true"); + setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.fails","true"); + setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.acks", "true"); + setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.emits","true"); + setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.fails","true"); + }}; + FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() + .withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml")) + .withTopologyName("pcap") + .withTopologyProperties(topologyProperties) + .build(); + ComponentRunner runner = new ComponentRunner.Builder() + .withComponent("elasticsearch", esComponent) + .withComponent("storm", fluxComponent) + .build(); + + final String index = getIndex(); + runner.start(); + List> docs = + runner.process(new Processor>> () { + List> docs = null; + public ReadinessState process(ComponentRunner runner){ + ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class); + if(elasticSearchComponent.hasIndex(index)) { + try { + docs = elasticSearchComponent.getAllIndexedDocs(index); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve indexed documents.", e); + } + if(docs.size() < expectedPcapIds.size()) { + return ReadinessState.NOT_READY; + } + else { + return ReadinessState.READY; + } + } + else { + return ReadinessState.NOT_READY; + } + } + + public List> getResult() { + return docs; + } + }); + checkDocuments(expectedPcapIds, docs); + runner.stop(); + } + + private static void checkDocuments(Set expectedPcapIds, List> documents) { + + boolean mismatch = false; + Set indexedPcapIds = new HashSet(); + for(Map doc : documents) { + String indexedId = (String)doc.get("pcap_id"); + indexedPcapIds.add(indexedId); + if(!expectedPcapIds.contains(indexedId)) { + mismatch = true; + System.out.println("Indexed PCAP ID that I did not expect: " + indexedId); + } + } + for(String expectedId : expectedPcapIds) { + if(!indexedPcapIds.contains(expectedId)) { + mismatch = true; + System.out.println("Expected PCAP ID that I did not index: " + expectedId); + } + } + Assert.assertFalse(mismatch); + } + + private static Set getExpectedPcap(File rawFile) throws IOException { + Set ret = new HashSet(); + BufferedReader br = new BufferedReader(new FileReader(rawFile)); + for(String line = null; (line = br.readLine()) != null;) { + byte[] pcapBytes = new HexStringConverter().convert(line); + List list = PcapParser.parse(pcapBytes); + for(PacketInfo pi : list) { + String string_pcap = pi.getJsonIndexDoc(); + Object obj= JSONValue.parse(string_pcap); + JSONObject header=(JSONObject)obj; + ret.add((String)header.get("pcap_id")); + } + } + return ret; + } + + + private static String getIndex() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd.hh"); + Date d = new Date(); + return "pcap_index_" + sdf.format(d); + } + +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java new file mode 100644 index 0000000000..64c5466fbb --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java @@ -0,0 +1,51 @@ +package org.apache.metron.integration.util; + +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +import java.io.File; +import java.util.Stack; + +/** + * Created by cstella on 1/28/16. + */ +public class UnitTestHelper { + public static String findDir(String name) { + return findDir(new File("."), name); + } + + public static String findDir(File startDir, String name) { + Stack s = new Stack(); + s.push(startDir); + while(!s.empty()) { + File parent = s.pop(); + if(parent.getName().equalsIgnoreCase(name)) { + return parent.getAbsolutePath(); + } + else { + File[] children = parent.listFiles(); + if(children != null) { + for (File child : children) { + s.push(child); + } + } + } + } + return null; + } + + public static void verboseLogging() { + verboseLogging("%d [%p|%c|%C{1}] %m%n", Level.ALL); + } + public static void verboseLogging(String pattern, Level level) { + ConsoleAppender console = new ConsoleAppender(); //create appender + //configure the appender + console.setLayout(new PatternLayout(pattern)); + console.setThreshold(level); + console.activateOptions(); + //add appender to any Logger (here is root) + Logger.getRootLogger().addAppender(console); + } +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java new file mode 100644 index 0000000000..da3344f541 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java @@ -0,0 +1,113 @@ +package org.apache.metron.integration.util.integration; + +import backtype.storm.utils.Utils; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Created by cstella on 1/27/16. + */ +public class ComponentRunner { + public static class Builder { + LinkedHashMap components; + String[] startupOrder; + String[] shutdownOrder; + public Builder() { + components = new LinkedHashMap(); + } + + public Builder withComponent(String name, InMemoryComponent component) { + components.put(name, component); + return this; + } + + public Builder withCustomStartupOrder(String[] startupOrder) { + this.startupOrder = startupOrder; + return this; + } + public Builder withCustomShutdownOrder(String[] shutdownOrder) { + this.shutdownOrder = shutdownOrder; + return this; + } + private static String[] toOrderedList(Map components) { + String[] ret = new String[components.size()]; + int i = 0; + for(String component : components.keySet()) { + ret[i++] = component; + } + return ret; + } + public ComponentRunner build() { + if(shutdownOrder == null) { + shutdownOrder = toOrderedList(components); + } + if(startupOrder == null) { + startupOrder = toOrderedList(components); + } + return new ComponentRunner(components, startupOrder, shutdownOrder); + } + + } + + LinkedHashMap components; + String[] startupOrder; + String[] shutdownOrder; + public ComponentRunner( LinkedHashMap components + , String[] startupOrder + , String[] shutdownOrder + ) + { + this.components = components; + this.startupOrder = startupOrder; + this.shutdownOrder = shutdownOrder; + + } + + public T getComponent(String name, Class clazz) { + return clazz.cast(getComponents().get(name)); + } + + public LinkedHashMap getComponents() { + return components; + } + + public void start() throws UnableToStartException { + for(String componentName : startupOrder) { + components.get(componentName).start(); + } + } + public void stop() { + for(String componentName : shutdownOrder) { + components.get(componentName).stop(); + } + } + + public T process(Processor successState) { + return process(successState, 3, 5000, 120000); + } + + public T process(Processor successState, int numRetries, long timeBetweenAttempts, long maxTimeMs) { + int retryCount = 0; + long start = System.currentTimeMillis(); + while(true) { + long duration = System.currentTimeMillis() - start; + if(duration > maxTimeMs) { + throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs); + } + ReadinessState state = successState.process(this); + if(state == ReadinessState.READY) { + return successState.getResult(); + } + else if(state == ReadinessState.NOT_READY) { + retryCount++; + if(numRetries > 0 && retryCount > numRetries) { + throw new RuntimeException("Too many retries: " + retryCount); + } + } + Utils.sleep(timeBetweenAttempts); + } + } + + +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java new file mode 100644 index 0000000000..f0fc0e0889 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java @@ -0,0 +1,9 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public interface InMemoryComponent { + public void start() throws UnableToStartException; + public void stop(); +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java new file mode 100644 index 0000000000..39d4ad666b --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java @@ -0,0 +1,9 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public interface Processor { + ReadinessState process(ComponentRunner runner); + T getResult(); +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java new file mode 100644 index 0000000000..d7bd0ab5b2 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java @@ -0,0 +1,8 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public enum ReadinessState { + READY, NOT_READY; +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java new file mode 100644 index 0000000000..ba287ca0c7 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java @@ -0,0 +1,13 @@ +package org.apache.metron.integration.util.integration; + +/** + * Created by cstella on 1/28/16. + */ +public class UnableToStartException extends Exception { + public UnableToStartException(String message) { + super(message); + } + public UnableToStartException(String message, Throwable t) { + super(message, t); + } +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java new file mode 100644 index 0000000000..ecf2ed3416 --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java @@ -0,0 +1,166 @@ +package org.apache.metron.integration.util.integration.components; + +import org.apache.commons.io.FileUtils; +import org.apache.metron.integration.util.integration.InMemoryComponent; +import org.apache.metron.integration.util.integration.UnableToStartException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.search.SearchHit; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Created by cstella on 1/28/16. + */ +public class ElasticSearchComponent implements InMemoryComponent { + + public static class Builder{ + private int httpPort; + private File indexDir; + private Map extraElasticSearchSettings = null; + public Builder withHttpPort(int httpPort) { + this.httpPort = httpPort; + return this; + } + public Builder withIndexDir(File indexDir) { + this.indexDir = indexDir; + return this; + } + public Builder withExtraElasticSearchSettings(Map extraElasticSearchSettings) { + this.extraElasticSearchSettings = extraElasticSearchSettings; + return this; + } + public ElasticSearchComponent build() { + return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings); + } + } + + private Client client; + private Node node; + private int httpPort; + private File indexDir; + private Map extraElasticSearchSettings; + + public ElasticSearchComponent(int httpPort, File indexDir) { + this(httpPort, indexDir, null); + } + public ElasticSearchComponent(int httpPort, File indexDir, Map extraElasticSearchSettings) { + this.httpPort = httpPort; + this.indexDir = indexDir; + this.extraElasticSearchSettings = extraElasticSearchSettings; + } + public Client getClient() { + return client; + } + + private void cleanDir(File dir) throws IOException { + if(dir.exists()) { + FileUtils.deleteDirectory(dir); + } + dir.mkdirs(); + } + public void start() throws UnableToStartException { + File logDir= new File(indexDir, "/logs"); + File dataDir= new File(indexDir, "/data"); + try { + cleanDir(logDir); + cleanDir(dataDir); + + } catch (IOException e) { + throw new UnableToStartException("Unable to clean log or data directories", e); + } + ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder() + .put("node.http.enabled", true) + .put("http.port", httpPort) + .put("cluster.name", "metron") + .put("path.logs",logDir.getAbsolutePath()) + .put("path.data",dataDir.getAbsolutePath()) + .put("gateway.type", "none") + .put("index.store.type", "memory") + .put("index.number_of_shards", 1) + .put("node.mode", "network") + .put("index.number_of_replicas", 1); + if(extraElasticSearchSettings != null) { + immutableSettings = immutableSettings.put(extraElasticSearchSettings); + } + Settings settings = immutableSettings.build(); + + node = NodeBuilder.nodeBuilder().settings(settings).node(); + node.start(); + settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", "metron").build(); + client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress("localhost", + 9300)); + + waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000)); + } + + public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException { + try { + ClusterHealthResponse healthResponse = + (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet(); + if (healthResponse != null && healthResponse.isTimedOut()) { + throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name() + + " and not " + status.name() + + ", from here on, everything will fail!"); + } + } catch (ElasticsearchTimeoutException e) { + throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations"); + } + } + + public List> getAllIndexedDocs(String index) throws IOException { + System.out.println(index); + getClient().admin().indices().refresh(new RefreshRequest()); + SearchResponse response = getClient().prepareSearch(index) + .setTypes("pcap_doc") + .setSource("message") + .setFrom(0) + .setSize(1000) + .execute().actionGet(); + List> ret = new ArrayList>(); + for (SearchHit hit : response.getHits()) { + Object o = hit.getSource().get("message"); + ret.add((Map)(o)); + } + return ret; + } + public boolean hasIndex(String indexName) { + Set indices = getClient().admin() + .indices() + .stats(new IndicesStatsRequest()) + .actionGet() + .getIndices() + .keySet(); + return indices.contains(indexName); + + } + + public void stop() { + node.stop(); + node = null; + client = null; + } +} diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java new file mode 100644 index 0000000000..ebb123096d --- /dev/null +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java @@ -0,0 +1,114 @@ +package org.apache.metron.integration.util.integration.components; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.generated.StormTopology; +import org.apache.metron.integration.util.integration.InMemoryComponent; +import org.apache.metron.integration.util.integration.UnableToStartException; +import org.apache.storm.flux.FluxBuilder; +import org.apache.storm.flux.model.ExecutionContext; +import org.apache.storm.flux.model.TopologyDef; +import org.apache.storm.flux.parser.FluxParser; +import org.apache.thrift7.TException; +import org.junit.Assert; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Properties; + +/** + * Created by cstella on 1/28/16. + */ +public class FluxTopologyComponent implements InMemoryComponent { + LocalCluster stormCluster; + String topologyName; + File topologyLocation; + Properties topologyProperties; + + public static class Builder { + String topologyName; + File topologyLocation; + Properties topologyProperties; + public Builder withTopologyName(String name) { + this.topologyName = name; + return this; + } + public Builder withTopologyLocation(File location) { + this.topologyLocation = location; + return this; + } + public Builder withTopologyProperties(Properties properties) { + this.topologyProperties = properties; + return this; + } + + public FluxTopologyComponent build() { + return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties); + } + } + + public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) { + this.topologyName = topologyName; + this.topologyLocation = topologyLocation; + this.topologyProperties = topologyProperties; + } + + public LocalCluster getStormCluster() { + return stormCluster; + } + + public String getTopologyName() { + return topologyName; + } + + public File getTopologyLocation() { + return topologyLocation; + } + + public Properties getTopologyProperties() { + return topologyProperties; + } + + public void start() throws UnableToStartException{ + try { + startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties()); + } catch (Exception e) { + throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e); + } + } + + public void stop() { + stormCluster.shutdown(); + } + private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException { + TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties); + Config conf = FluxBuilder.buildConfig(topologyDef); + ExecutionContext context = new ExecutionContext(topologyDef, conf); + StormTopology topology = FluxBuilder.buildTopology(context); + Assert.assertNotNull(topology); + topology.validate(); + stormCluster = new LocalCluster(); + stormCluster.submitTopology(topologyName, conf, topology); + } + + private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException { + File tmpFile = File.createTempFile(topologyName, "props"); + FileWriter propWriter = null; + try { + propWriter = new FileWriter(tmpFile); + properties.store(propWriter, topologyName + " properties"); + } + finally { + if(propWriter != null) { + propWriter.close(); + return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false); + } + + return null; + } + } + + +} From f92e8205a21ea735e002f43bafb1f64273df71a5 Mon Sep 17 00:00:00 2001 From: cstella Date: Fri, 29 Jan 2016 10:40:05 -0500 Subject: [PATCH 08/11] Updating with HBase bolt added back to Flux PCap topology. --- .../org/apache/metron/hbase/Connector.java | 25 +++++++ .../org/apache/metron/hbase/HBaseBolt.java | 53 ++++++++++++-- .../apache/metron/hbase/HTableConnector.java | 12 +++- .../apache/metron/hbase/TupleTableConfig.java | 42 ++++++++++- metron-streaming/Metron-Topologies/pom.xml | 2 +- .../Metron_Configs/topologies/pcap/local.yaml | 36 ++++++++++ .../integration/pcap/PcapIntegrationTest.java | 71 ++++++++++++++----- .../integration/util/UnitTestHelper.java | 19 +++++ .../components/ElasticSearchComponent.java | 1 - .../components/FluxTopologyComponent.java | 8 ++- .../util/mock/MockHBaseConnector.java | 36 ++++++++++ metron-streaming/pom.xml | 8 ++- 12 files changed, 279 insertions(+), 34 deletions(-) create mode 100644 metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java create mode 100644 metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHBaseConnector.java diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java new file mode 100644 index 0000000000..33b4eb8ae9 --- /dev/null +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/Connector.java @@ -0,0 +1,25 @@ +package org.apache.metron.hbase; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.Serializable; + +/** + * Created by cstella on 1/29/16. + */ +public abstract class Connector { + protected TupleTableConfig tableConf; + protected String _quorum; + protected String _port; + + public Connector(final TupleTableConfig conf, String _quorum, String _port) throws IOException { + this.tableConf = conf; + this._quorum = _quorum; + this._port = _port; + } + public abstract void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException; + public abstract void close(); +} diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java index 62e7d48db1..3ccbd60b09 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java @@ -3,8 +3,10 @@ import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.json.simple.JSONObject; @@ -34,10 +36,10 @@ public class HBaseBolt implements IRichBolt { private static final Logger LOG = Logger.getLogger(HBaseBolt.class); protected OutputCollector collector; - protected HTableConnector connector; protected TupleTableConfig conf; protected boolean autoAck = true; - + protected Connector connector; + private String connectorImpl; private String _quorum; private String _port; @@ -45,7 +47,50 @@ public HBaseBolt(TupleTableConfig conf, String quorum, String port) { this.conf = conf; _quorum = quorum; _port = port; + } + public HBaseBolt withConnector(String connectorImpl) { + this.connectorImpl = connectorImpl; + return this; + } + + public Connector createConnector() throws IOException{ + initialize(); + if(connectorImpl == null) { + return new HTableConnector(conf, _quorum, _port); + } + else { + try { + Class clazz = (Class) Class.forName(connectorImpl); + return clazz.getConstructor(TupleTableConfig.class, String.class, String.class).newInstance(conf, _quorum, _port); + } catch (InstantiationException e) { + throw new IOException("Unable to instantiate connector.", e); + } catch (IllegalAccessException e) { + throw new IOException("Unable to instantiate connector: illegal access", e); + } catch (InvocationTargetException e) { + throw new IOException("Unable to instantiate connector", e); + } catch (NoSuchMethodException e) { + throw new IOException("Unable to instantiate connector: no such method", e); + } catch (ClassNotFoundException e) { + throw new IOException("Unable to instantiate connector: class not found", e); + } + } + } + + public void initialize() { + TupleTableConfig hbaseBoltConfig = conf; + String allColumnFamiliesColumnQualifiers = conf.getFields(); + String[] tokenizedColumnFamiliesWithColumnQualifiers = StringUtils + .split(allColumnFamiliesColumnQualifiers, "\\|"); + for (String tokenizedColumnFamilyWithColumnQualifiers : tokenizedColumnFamiliesWithColumnQualifiers) { + String[] cfCqTokens = StringUtils.split( tokenizedColumnFamilyWithColumnQualifiers, ":"); + String columnFamily = cfCqTokens[0]; + String[] columnQualifiers = StringUtils.split(cfCqTokens[1], ","); + for (String columnQualifier : columnQualifiers) { + hbaseBoltConfig.addColumn(columnFamily, columnQualifier); + } + setAutoAck(true); + } } /** {@inheritDoc} */ @@ -55,7 +100,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll this.collector = collector; try { - this.connector = new HTableConnector(conf, _quorum, _port); + this.connector = createConnector(); } catch (IOException e) { @@ -69,7 +114,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll public void execute(Tuple input) { try { - this.connector.getTable().put(conf.getPutFromTuple(input)); + this.connector.put(conf.getPutFromTuple(input)); } catch (IOException ex) { JSONObject error = ErrorGenerator.generateErrorMessage( diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java index 296d0c27f2..b0ff4a8ec6 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java @@ -1,11 +1,14 @@ package org.apache.metron.hbase; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.Serializable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; @@ -18,7 +21,7 @@ * classpath */ @SuppressWarnings("serial") -public class HTableConnector implements Serializable { +public class HTableConnector extends Connector implements Serializable{ private static final Logger LOG = Logger.getLogger(HTableConnector.class); private Configuration conf; @@ -31,6 +34,7 @@ public class HTableConnector implements Serializable { * @throws IOException */ public HTableConnector(final TupleTableConfig conf, String _quorum, String _port) throws IOException { + super(conf, _quorum, _port); this.tableName = conf.getTableName(); this.conf = HBaseConfiguration.create(); @@ -93,9 +97,15 @@ public HTable getTable() { return table; } + @Override + public void put(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + table.put(put); + } + /** * Close the table */ + @Override public void close() { try { this.table.close(); diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java index 035e9c39ab..d2c789a264 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/TupleTableConfig.java @@ -30,7 +30,8 @@ public class TupleTableConfig implements Serializable { private boolean batch = true; protected Durability durability = Durability.USE_DEFAULT; private long writeBufferSize = 0L; - + private String fields; + /** * Initialize configuration * @@ -62,7 +63,42 @@ public TupleTableConfig(final String table, final String rowKeyField, final Stri this.tupleTimestampField = timestampField; this.columnFamilies = new HashMap>(); } - + + public TupleTableConfig() { + this.columnFamilies = new HashMap>(); + } + + public TupleTableConfig withTable(String table) { + this.tableName = table; + return this; + } + + public TupleTableConfig withRowKeyField(String rowKeyField) { + this.tupleRowKeyField = rowKeyField; + return this; + } + + public TupleTableConfig withTimestampField(String timestampField) { + this.tupleTimestampField = timestampField; + return this; + } + + public TupleTableConfig withFields(String fields) { + this.fields = fields; + return this; + } + + public TupleTableConfig withBatch(Boolean isBatch) { + this.batch = isBatch; + return this; + } + + public String getFields() { + return fields; + } + + + /** * Add column family and column qualifier to be extracted from tuple * @@ -221,7 +257,7 @@ public void setBatch(boolean batch) { } /** - * @param setDurability + * @param durability * Sets whether to write to HBase's edit log. *

* Setting to false will mean fewer operations to perform when diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml index 524813ac39..4c5c4ab4d2 100644 --- a/metron-streaming/Metron-Topologies/pom.xml +++ b/metron-streaming/Metron-Topologies/pom.xml @@ -82,7 +82,7 @@ org.apache.kafka - kafka_2.8.2 + kafka_2.9.2 ${global_kafka_version} + + org.apache.kafka + kafka-clients + ${global_kafka_version} + org.apache.kafka kafka_2.9.2 - 0.8.1.1 + ${global_kafka_version} javax.jms diff --git a/metron-streaming/Metron-EnrichmentAdapters/pom.xml b/metron-streaming/Metron-EnrichmentAdapters/pom.xml index 2a40f517db..a37a032bdd 100644 --- a/metron-streaming/Metron-EnrichmentAdapters/pom.xml +++ b/metron-streaming/Metron-EnrichmentAdapters/pom.xml @@ -26,7 +26,7 @@ 1.7.7 0.96.1-hadoop2 0.1.2 - 17.0 + ${global_guava_version} diff --git a/metron-streaming/Metron-Pcap_Service/pom.xml b/metron-streaming/Metron-Pcap_Service/pom.xml index 40a9deacfc..79cfb4707f 100644 --- a/metron-streaming/Metron-Pcap_Service/pom.xml +++ b/metron-streaming/Metron-Pcap_Service/pom.xml @@ -10,13 +10,11 @@ Metron Pcap_Service UTF-8 - 1.4.0.2.0.6.0-76 - 2.2.0.2.0.6.0-76 + ${global_flume_version} + ${global_hadoop_version} 1.7 1.7 - 0.9.2-incubating - 0.8.0 - 1.7.5 + ${global_slf4j_version} 3.4.5.2.0.6.0-76 1.2.15 @@ -35,7 +33,7 @@ 2.1.1 - 4.11 + ${global_junit_version} 1.3 1.9.5 1.3.0 diff --git a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java index f995e3f565..d058ccc6bb 100644 --- a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java +++ b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/ConfigurationUtilTest.java @@ -1,10 +1,10 @@ package org.apache.metron.pcapservice; -import org.eclipse.jdt.internal.core.Assert; import org.junit.Test; import org.apache.metron.pcapservice.ConfigurationUtil; import org.apache.metron.pcapservice.ConfigurationUtil.SizeUnit; +import org.springframework.util.Assert; /** * The Class ConfigurationUtilTest. diff --git a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java index 6a25599bbb..bb72133876 100644 --- a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java +++ b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/HBaseIntegrationTest.java @@ -46,7 +46,7 @@ void initCluster() throws Exception { * Signals that an I/O exception has occurred. */ private void createTable() throws IOException { - testTable = testUtil.createTable("test_pcaps_local", "cf"); + testTable = testUtil.createTable(Bytes.toBytes("test_pcaps_local"), Bytes.toBytes("cf")); System.out.println("after 'test_pcaps_local' table creation "); // create put Put put = new Put(Bytes.toBytes("1111")); // row key =1111 diff --git a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java index 354a3a7b26..2b766742ed 100644 --- a/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java +++ b/metron-streaming/Metron-Pcap_Service/src/test/java/org/apache/metron/pcapservice/PcapHelperTest.java @@ -6,7 +6,6 @@ import java.util.Arrays; import java.util.List; -import org.eclipse.jdt.internal.core.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -17,6 +16,7 @@ import org.apache.metron.pcapservice.PcapHelper; import org.apache.metron.pcapservice.PcapHelper.TimeUnit; +import org.springframework.util.Assert; // TODO: Auto-generated Javadoc /** diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java index 53f2a508f7..13f4c99b54 100644 --- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java +++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/test/spouts/GenericInternalTestSpout.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import org.apache.metron.test.converters.BinaryConverters; import org.apache.metron.test.converters.IConverter; import org.apache.metron.test.filereaders.FileReader; @@ -56,6 +58,9 @@ public class GenericInternalTestSpout extends BaseRichSpout { public GenericInternalTestSpout withFilename(String filename) { + if(filename != null && filename.length() > 0 && filename.charAt(0) == '$') { + filename = Iterables.getLast(Splitter.on("}").split(filename)); + } _filename = filename; return this; } diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml index 3ea364dfb8..bd4867d914 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml @@ -84,7 +84,24 @@ components: value: true - name: "startOffsetTime" value: -1 - + - id: "hbaseConfig" + className: "org.apache.metron.hbase.TupleTableConfig" + configMethods: + - name: "withFields" + args: + - "${bolt.hbase.table.fields}" + - name: "withTable" + args: + - "${bolt.hbase.table.name}" + - name: "withRowKeyField" + args: + - "${bolt.hbase.table.key.tuple.field.name}" + - name: "withTimestampField" + args: + - "${bolt.hbase.table.timestamp.tuple.field.name}" + - name: "withBatch" + args: + - ${bolt.hbase.enable.batching} spouts: - id: "kafkaSpout" className: "storm.kafka.KafkaSpout" @@ -92,6 +109,12 @@ spouts: - ref: "kafkaConfig" bolts: + - id: "hbaseBolt" + className: "org.apache.metron.hbase.HBaseBolt" + constructorArgs: + - ref: "hbaseConfig" + - "${kafka.zk.list}" + - "${kafka.zk.port}" - id: "parserBolt" className: "org.apache.metron.parsing.PcapParserBolt" configMethods: @@ -164,6 +187,13 @@ streams: to: "parserBolt" grouping: type: SHUFFLE + - name: "parser -> hbase" + from: "parserBolt" + to: "hbaseBolt" + grouping: + streamId: "pcap_data_stream" + type: FIELDS + args: ["pcap_id"] - name: "parser -> indexing" from: "parserBolt" to: "indexingBolt" diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml index 4df6cc165b..d04862dab2 100644 --- a/metron-streaming/pom.xml +++ b/metron-streaming/pom.xml @@ -23,6 +23,7 @@ @ApacheMetron 0.10.0 0.10.0 + 1.7.1 0.8.2.2 2.7.1 1.1.1 @@ -31,7 +32,7 @@ 1.1.1 3.0.2 4.4 - 18.0 + 17.0 2.2.5 1.7.7 From 776209f55e75bbce0a1f9cb864b41c8956659e14 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 1 Feb 2016 15:41:59 -0500 Subject: [PATCH 11/11] Updated ansible scripts to properly create config for pcap topology, updated local and remote topologies to use the kafka zookeeper list format to specify the zookeeper quorum for hbase, and updated the HBaseBolt to accept a kafka-style list instead of a host and port separated. --- .../roles/metron_streaming/tasks/main.yml | 1 + .../org/apache/metron/hbase/HBaseBolt.java | 21 +++++++++++++++++++ .../apache/metron/hbase/HTableConnector.java | 10 ++++++++- .../Metron_Configs/etc/env/config.properties | 12 +++++++++++ .../Metron_Configs/topologies/pcap/local.yaml | 3 +-- .../topologies/pcap/remote.yaml | 3 +-- .../integration/pcap/PcapIntegrationTest.java | 3 +-- 7 files changed, 46 insertions(+), 7 deletions(-) diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml index d42e2126f5..de387cf7db 100644 --- a/deployment/roles/metron_streaming/tasks/main.yml +++ b/deployment/roles/metron_streaming/tasks/main.yml @@ -49,6 +49,7 @@ - { regexp: "bolt.hdfs.file.system.url=", line: "bolt.hdfs.file.system.url={{ hdfs_url }}" } - { regexp: "spout.kafka.topic.pcap=", line: "spout.kafka.topic.pcap={{ pycapa_topic }}" } - { regexp: "spout.kafka.topic.bro=", line: "spout.kafka.topic.bro={{ bro_topic }}" } + - { regexp: "bolt.hbase.table.name=", line: "bolt.hbase.table.name={{ pcap_hbase_table }}" } - name: Add Elasticsearch templates for topologies uri: diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java index 7a8c5fafe3..64f3531a5e 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java @@ -6,6 +6,10 @@ import java.lang.reflect.InvocationTargetException; import java.util.Map; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.json.simple.JSONObject; @@ -34,6 +38,7 @@ @SuppressWarnings("serial") public class HBaseBolt implements IRichBolt { private static final Logger LOG = Logger.getLogger(HBaseBolt.class); + private static final String DEFAULT_ZK_PORT = "2181"; protected OutputCollector collector; protected TupleTableConfig conf; @@ -48,7 +53,23 @@ public HBaseBolt(TupleTableConfig conf, String quorum, String port) { _quorum = quorum; _port = port; } + public HBaseBolt(final TupleTableConfig conf, String zkConnectString) throws IOException { + this(conf, zkConnectStringToHosts(zkConnectString), zkConnectStringToPort(zkConnectString)); + } + public static String zkConnectStringToHosts(String connString) { + Iterable hostPortPairs = Splitter.on(',').split(connString); + return Joiner.on(',').join(Iterables.transform(hostPortPairs, new Function() { + @Override + public String apply(String hostPortPair) { + return Iterables.getFirst(Splitter.on(':').split(hostPortPair), ""); + } + })); + } + public static String zkConnectStringToPort(String connString) { + String hostPortPair = Iterables.getFirst(Splitter.on(",").split(connString), ""); + return Iterables.getLast(Splitter.on(":").split(hostPortPair),DEFAULT_ZK_PORT); + } public HBaseBolt withConnector(String connectorImpl) { this.connectorImpl = connectorImpl; return this; diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java index b0ff4a8ec6..5302882eb8 100644 --- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java +++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableConnector.java @@ -4,6 +4,10 @@ import java.io.InterruptedIOException; import java.io.Serializable; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; @@ -14,6 +18,8 @@ import backtype.storm.generated.Bolt; +import javax.annotation.Nullable; + /** * HTable connector for Storm {@link Bolt} *

@@ -23,11 +29,13 @@ @SuppressWarnings("serial") public class HTableConnector extends Connector implements Serializable{ private static final Logger LOG = Logger.getLogger(HTableConnector.class); - private Configuration conf; protected HTable table; private String tableName; + + + /** * Initialize HTable connection * @param conf The {@link TupleTableConfig} diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties index 3c8c1a0881..142f8f83eb 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties @@ -68,3 +68,15 @@ bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020 bolt.hdfs.wip.file.path=/paloalto/wip bolt.hdfs.finished.file.path=/paloalto/rotated bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec + +##### HBase ##### +bolt.hbase.table.name=pcap_test +bolt.hbase.table.fields=t:pcap +bolt.hbase.table.key.tuple.field.name=pcap_id +bolt.hbase.table.timestamp.tuple.field.name=timestamp +bolt.hbase.enable.batching=false +bolt.hbase.write.buffer.size.in.bytes=2000000 +bolt.hbase.durability=SKIP_WAL +bolt.hbase.partitioner.region.info.refresh.interval.mins=60 + + diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml index 53299e9bfc..20f2b34d0e 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/local.yaml @@ -104,8 +104,7 @@ bolts: className: "org.apache.metron.hbase.HBaseBolt" constructorArgs: - ref: "hbaseConfig" - - "${kafka.zk.list}" - - "${kafka.zk.port}" + - "${kafka.zk}" configMethods: - name: "withConnector" args: diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml index bd4867d914..c003c13e40 100644 --- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml +++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/pcap/remote.yaml @@ -113,8 +113,7 @@ bolts: className: "org.apache.metron.hbase.HBaseBolt" constructorArgs: - ref: "hbaseConfig" - - "${kafka.zk.list}" - - "${kafka.zk.port}" + - "${kafka.zk}" - id: "parserBolt" className: "org.apache.metron.parsing.PcapParserBolt" configMethods: diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java index cb81948456..edcbbef97f 100644 --- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java +++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java @@ -74,8 +74,7 @@ public void testTopology() throws Exception { setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.acks", "true"); setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.emits","true"); setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.fails","true"); - setProperty("kafka.zk.list", "localhost"); - setProperty("kafka.zk.port", "2000"); + setProperty("kafka.zk", "localhost:2000,localhost:2000"); setProperty("bolt.hbase.table.name", "pcap_test"); setProperty("bolt.hbase.table.fields", "t:pcap"); setProperty("bolt.hbase.table.key.tuple.field.name", "pcap_id");