From c9ef4f3d9bf023801754267716f108af96827077 Mon Sep 17 00:00:00 2001 From: merrimanr Date: Thu, 18 May 2017 10:17:11 -0500 Subject: [PATCH 1/5] added logging for unexpected cache issues --- .../metron/enrichment/bolt/JoinBolt.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java index 3bbb3f50fd..ca9a94bc47 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java @@ -21,6 +21,9 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.Sets; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; @@ -90,11 +93,24 @@ public Map load(String key) throws Exception { } }; cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize) - .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) + .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES).removalListener(new JoinRemoveListener()) .build(loader); prepare(map, topologyContext); } + class JoinRemoveListener implements RemovalListener> { + + @Override + public void onRemoval(RemovalNotification> removalNotification) { + if (removalNotification.getCause() == RemovalCause.SIZE) { + LOG.error("Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to " + this.getClass().getSimpleName() + "."); + } + if (removalNotification.getCause() == RemovalCause.EXPIRED) { + LOG.error("Message was in the join cache too long which may be caused by slow enrichments/threatintels. Increase the maxTimeRetain setting."); + } + } + } + @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { From 8c99f70c871a150d7d763eb5c16716f01f3bec06 Mon Sep 17 00:00:00 2001 From: merrimanr Date: Mon, 22 May 2017 15:05:23 -0500 Subject: [PATCH 2/5] saving tuples instead of messages --- .../enrichment/bolt/EnrichmentJoinBolt.java | 7 ++++-- .../metron/enrichment/bolt/JoinBolt.java | 23 ++++++++++--------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java index 2adf4301a9..4b88399752 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBolt.java @@ -17,11 +17,13 @@ */ package org.apache.metron.enrichment.bolt; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.storm.task.TopologyContext; import com.google.common.base.Joiner; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler; import org.apache.metron.common.utils.MessageUtils; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +68,11 @@ public Set getStreamIds(JSONObject message) { @Override - public JSONObject joinMessages(Map streamMessageMap) { + public JSONObject joinMessages(Map streamMessageMap, MessageGetStrategy messageGetStrategy) { JSONObject message = new JSONObject(); for (String key : streamMessageMap.keySet()) { - JSONObject obj = streamMessageMap.get(key); + Tuple tuple = streamMessageMap.get(key); + JSONObject obj = (JSONObject) messageGetStrategy.get(tuple); message.putAll(obj); } List emptyKeys = new ArrayList<>(); diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java index ca9a94bc47..a59fbb720a 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java @@ -51,8 +51,8 @@ public abstract class JoinBolt extends ConfiguredEnrichmentBolt { .getLogger(JoinBolt.class); protected OutputCollector collector; - protected transient CacheLoader> loader; - protected transient LoadingCache> cache; + protected transient CacheLoader> loader; + protected transient LoadingCache> cache; private transient MessageGetStrategy keyGetStrategy; private transient MessageGetStrategy subgroupGetStrategy; private transient MessageGetStrategy messageGetStrategy; @@ -86,9 +86,9 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou if (this.maxTimeRetain == null) { throw new IllegalStateException("maxTimeRetain must be specified"); } - loader = new CacheLoader>() { + loader = new CacheLoader>() { @Override - public Map load(String key) throws Exception { + public Map load(String key) throws Exception { return new HashMap<>(); } }; @@ -98,10 +98,10 @@ public Map load(String key) throws Exception { prepare(map, topologyContext); } - class JoinRemoveListener implements RemovalListener> { + class JoinRemoveListener implements RemovalListener> { @Override - public void onRemoval(RemovalNotification> removalNotification) { + public void onRemoval(RemovalNotification> removalNotification) { if (removalNotification.getCause() == RemovalCause.SIZE) { LOG.error("Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to " + this.getClass().getSimpleName() + "."); } @@ -120,12 +120,12 @@ public void execute(Tuple tuple) { streamId = Joiner.on(":").join("" + streamId, subgroup == null?"":subgroup); V message = (V) messageGetStrategy.get(tuple); try { - Map streamMessageMap = cache.get(key); + Map streamMessageMap = cache.get(key); if (streamMessageMap.containsKey(streamId)) { LOG.warn(String.format("Received key %s twice for " + "stream %s", key, streamId)); } - streamMessageMap.put(streamId, message); + streamMessageMap.put(streamId, tuple); Set streamIds = getStreamIds(message); Set streamMessageKeys = streamMessageMap.keySet(); if ( streamMessageKeys.size() == streamIds.size() @@ -135,11 +135,12 @@ public void execute(Tuple tuple) { collector.emit( "message" , tuple , new Values( key - , joinMessages(streamMessageMap) + , joinMessages(streamMessageMap, this.messageGetStrategy) ) ); cache.invalidate(key); - collector.ack(tuple); + Tuple messageTuple = streamMessageMap.get("message:"); + collector.ack(messageTuple); LOG.trace("Emitted message for key: {}", key); } else { cache.put(key, streamMessageMap); @@ -171,5 +172,5 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { public abstract Set getStreamIds(V value); - public abstract V joinMessages(Map streamMessageMap); + public abstract V joinMessages(Map streamMessageMap, MessageGetStrategy messageGetStrategy); } From 99ee88697d853aae0f57904660f3ebaf9ea39dee Mon Sep 17 00:00:00 2001 From: merrimanr Date: Mon, 22 May 2017 15:15:30 -0500 Subject: [PATCH 3/5] resolve compile errors --- .../enrichment/bolt/ThreatIntelJoinBolt.java | 6 ++- .../bolt/EnrichmentJoinBoltTest.java | 6 +-- .../metron/enrichment/bolt/JoinBoltTest.java | 4 +- .../bolt/ThreatIntelJoinBoltTest.java | 42 +++++++++---------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java index 4d924c30ba..d4865e201a 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java @@ -27,11 +27,13 @@ import org.apache.metron.common.dsl.Context; import org.apache.metron.common.dsl.StellarFunctions; import org.apache.metron.common.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.utils.ConversionUtils; import org.apache.metron.common.utils.MessageUtils; import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; import org.apache.metron.threatintel.triage.ThreatTriageProcessor; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,8 +134,8 @@ public Map getFieldMap(String sourceType) { } @Override - public JSONObject joinMessages(Map streamMessageMap) { - JSONObject ret = super.joinMessages(streamMessageMap); + public JSONObject joinMessages(Map streamMessageMap, MessageGetStrategy messageGetStrategy) { + JSONObject ret = super.joinMessages(streamMessageMap, messageGetStrategy); LOG.trace("Received joined messages: {}", ret); boolean isAlert = ret.containsKey("is_alert"); if(!isAlert) { diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java index 56ddf08d25..503c089377 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java @@ -78,8 +78,8 @@ public void test() throws IOException { Map streamMessageMap = new HashMap<>(); streamMessageMap.put("message", sampleMessage); streamMessageMap.put("enriched", enrichedMessage); - JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap); - removeTimingFields(joinedMessage); - Assert.assertEquals(expectedJoinedMessage, joinedMessage); +// JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap); +// removeTimingFields(joinedMessage); +// Assert.assertEquals(expectedJoinedMessage, joinedMessage); } } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java index 9f12fcd858..0f9bd0dc52 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java @@ -21,9 +21,11 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; import org.apache.metron.test.error.MetronErrorJSONMatcher; import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -69,7 +71,7 @@ public Set getStreamIds(JSONObject value) { } @Override - public JSONObject joinMessages(Map streamMessageMap) { + public JSONObject joinMessages(Map streamMessageMap, MessageGetStrategy messageGetStrategy) { return joinedMessage; } } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java index 6fe318ece6..ea3c6072da 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java @@ -187,26 +187,26 @@ public void test(String threatTriageConfig, boolean badConfig) throws IOExceptio fieldMap = threatIntelJoinBolt.getFieldMap(sensorType); Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel")); - Map streamMessageMap = new HashMap<>(); - streamMessageMap.put("message", message); - JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); - Assert.assertFalse(joinedMessage.containsKey("is_alert")); - - streamMessageMap.put("message", messageWithTiming); - joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); - Assert.assertFalse(joinedMessage.containsKey("is_alert")); - - streamMessageMap.put("message", alertMessage); - joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); - Assert.assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert"))); - - if(withThreatTriage && !badConfig) { - Assert.assertTrue(joinedMessage.containsKey("threat.triage.score")); - Double score = (Double) joinedMessage.get("threat.triage.score"); - Assert.assertTrue(Math.abs(10d - score) < 1e-10); - } - else { - Assert.assertFalse(joinedMessage.containsKey("threat.triage.score")); - } +// Map streamMessageMap = new HashMap<>(); +// streamMessageMap.put("message", message); +// JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); +// Assert.assertFalse(joinedMessage.containsKey("is_alert")); +// +// streamMessageMap.put("message", messageWithTiming); +// joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); +// Assert.assertFalse(joinedMessage.containsKey("is_alert")); +// +// streamMessageMap.put("message", alertMessage); +// joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); +// Assert.assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert"))); +// +// if(withThreatTriage && !badConfig) { +// Assert.assertTrue(joinedMessage.containsKey("threat.triage.score")); +// Double score = (Double) joinedMessage.get("threat.triage.score"); +// Assert.assertTrue(Math.abs(10d - score) < 1e-10); +// } +// else { +// Assert.assertFalse(joinedMessage.containsKey("threat.triage.score")); +// } } } From b87889f8b3814b8b98569fd2441fc0aa6dbb58e5 Mon Sep 17 00:00:00 2001 From: merrimanr Date: Wed, 31 May 2017 14:59:09 -0500 Subject: [PATCH 4/5] updated unit tests --- .../metron/enrichment/bolt/JoinBolt.java | 6 +- .../bolt/EnrichmentJoinBoltTest.java | 22 +++-- .../metron/enrichment/bolt/JoinBoltTest.java | 94 ++++++++++++++----- .../bolt/ThreatIntelJoinBoltTest.java | 56 ++++++----- 4 files changed, 119 insertions(+), 59 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java index a59fbb720a..91534d24ef 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java @@ -53,9 +53,9 @@ public abstract class JoinBolt extends ConfiguredEnrichmentBolt { protected transient CacheLoader> loader; protected transient LoadingCache> cache; - private transient MessageGetStrategy keyGetStrategy; - private transient MessageGetStrategy subgroupGetStrategy; - private transient MessageGetStrategy messageGetStrategy; + protected transient MessageGetStrategy keyGetStrategy; + protected transient MessageGetStrategy subgroupGetStrategy; + protected transient MessageGetStrategy messageGetStrategy; protected Long maxCacheSize; protected Long maxTimeRetain; diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java index 503c089377..77dd4cf345 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java @@ -18,7 +18,9 @@ package org.apache.metron.enrichment.bolt; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -32,6 +34,9 @@ import java.util.Map; import java.util.Set; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest { /** @@ -75,11 +80,16 @@ public void test() throws IOException { enrichmentJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector); Set actualStreamIds = enrichmentJoinBolt.getStreamIds(sampleMessage); Assert.assertEquals(joinStreamIds, actualStreamIds); - Map streamMessageMap = new HashMap<>(); - streamMessageMap.put("message", sampleMessage); - streamMessageMap.put("enriched", enrichedMessage); -// JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap); -// removeTimingFields(joinedMessage); -// Assert.assertEquals(expectedJoinedMessage, joinedMessage); + Map streamMessageMap = new HashMap<>(); + MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class); + Tuple sampleTuple = mock(Tuple.class); + when(messageGetStrategy.get(sampleTuple)).thenReturn(sampleMessage); + Tuple enrichedTuple = mock(Tuple.class); + when(messageGetStrategy.get(enrichedTuple)).thenReturn(enrichedMessage); + streamMessageMap.put("message", sampleTuple); + streamMessageMap.put("enriched", enrichedTuple); + JSONObject joinedMessage = enrichmentJoinBolt.joinMessages(streamMessageMap, messageGetStrategy); + removeTimingFields(joinedMessage); + Assert.assertEquals(expectedJoinedMessage, joinedMessage); } } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java index 0f9bd0dc52..e03dc719ea 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java @@ -46,6 +46,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class JoinBoltTest extends BaseEnrichmentBoltTest { @@ -67,6 +68,7 @@ public Set getStreamIds(JSONObject value) { for(String s : streamIds) { ret.add(s + ":"); } + ret.add("message:"); return ret; } @@ -85,6 +87,7 @@ public JSONObject joinMessages(Map streamMessageMap, MessageGetSt private String joinedMessageString; private JSONObject joinedMessage; + private JoinBolt joinBolt; @Before public void parseMessages() { @@ -94,13 +97,13 @@ public void parseMessages() { } catch (ParseException e) { e.printStackTrace(); } + joinBolt = new StandAloneJoinBolt("zookeeperUrl"); + joinBolt.setCuratorFramework(client); + joinBolt.setTreeCache(cache); } @Test - public void test() throws Exception { - StandAloneJoinBolt joinBolt = new StandAloneJoinBolt("zookeeperUrl"); - joinBolt.setCuratorFramework(client); - joinBolt.setTreeCache(cache); + public void testPrepare() { try { joinBolt.prepare(new HashMap(), topologyContext, outputCollector); fail("Should fail if a maxCacheSize property is not set"); @@ -112,38 +115,79 @@ public void test() throws Exception { } catch(IllegalStateException e) {} joinBolt.withMaxTimeRetain(10000); joinBolt.prepare(new HashMap(), topologyContext, outputCollector); + } + + @Test + public void testDeclareOutputFields() { joinBolt.declareOutputFields(declarer); verify(declarer, times(1)).declareStream(eq("message"), argThat(new FieldsMatcher("key", "message"))); - when(tuple.getValueByField("key")).thenReturn(key); - when(tuple.getSourceStreamId()).thenReturn("geo"); - when(tuple.getValueByField("message")).thenReturn(geoMessage); - joinBolt.execute(tuple); - verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), any(Values.class)); - verify(outputCollector, times(0)).ack(tuple); - when(tuple.getSourceStreamId()).thenReturn("host"); - when(tuple.getValueByField("message")).thenReturn(hostMessage); - joinBolt.execute(tuple); - verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), any(Values.class)); - verify(outputCollector, times(0)).ack(tuple); - when(tuple.getSourceStreamId()).thenReturn("hbaseEnrichment"); - when(tuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage); - joinBolt.execute(tuple); - when(tuple.getSourceStreamId()).thenReturn("stellar"); - when(tuple.getValueByField("message")).thenReturn(new JSONObject()); - verify(outputCollector, times(0)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage))); - joinBolt.execute(tuple); + verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message"))); + verifyNoMoreInteractions(declarer); + } + + @Test + public void testExecute() { + joinBolt.withMaxCacheSize(100); + joinBolt.withMaxTimeRetain(10000); + joinBolt.prepare(new HashMap(), topologyContext, outputCollector); + + Tuple geoTuple = mock(Tuple.class); + when(geoTuple.getValueByField("key")).thenReturn(key); + when(geoTuple.getSourceStreamId()).thenReturn("geo"); + when(geoTuple.getValueByField("message")).thenReturn(geoMessage); + joinBolt.execute(geoTuple); + + Tuple messageTuple = mock(Tuple.class); + when(messageTuple.getValueByField("key")).thenReturn(key); + when(messageTuple.getSourceStreamId()).thenReturn("message"); + when(messageTuple.getValueByField("message")).thenReturn(sampleMessage); + joinBolt.execute(messageTuple); + + Tuple hostTuple = mock(Tuple.class); + when(hostTuple.getValueByField("key")).thenReturn(key); + when(hostTuple.getSourceStreamId()).thenReturn("host"); + when(hostTuple.getValueByField("message")).thenReturn(hostMessage); + joinBolt.execute(hostTuple); + + Tuple hbaseEnrichmentTuple = mock(Tuple.class); + when(hbaseEnrichmentTuple.getValueByField("key")).thenReturn(key); + when(hbaseEnrichmentTuple.getSourceStreamId()).thenReturn("hbaseEnrichment"); + when(hbaseEnrichmentTuple.getValueByField("message")).thenReturn(hbaseEnrichmentMessage); + joinBolt.execute(hbaseEnrichmentTuple); + + Tuple stellarTuple = mock(Tuple.class); + when(stellarTuple.getValueByField("key")).thenReturn(key); + when(stellarTuple.getSourceStreamId()).thenReturn("stellar"); + when(stellarTuple.getValueByField("message")).thenReturn(new JSONObject()); + joinBolt.execute(stellarTuple); + verify(outputCollector, times(1)).emit(eq("message"), any(tuple.getClass()), eq(new Values(key, joinedMessage))); - verify(outputCollector, times(1)).ack(tuple); + verify(outputCollector, times(1)).ack(messageTuple); + + verifyNoMoreInteractions(outputCollector); + } + @SuppressWarnings("unchecked") + @Test + public void testExecuteShouldReportError() throws ExecutionException { + joinBolt.withMaxCacheSize(100); + joinBolt.withMaxTimeRetain(10000); + joinBolt.prepare(new HashMap(), topologyContext, outputCollector); + when(tuple.getValueByField("key")).thenReturn(key); + when(tuple.getValueByField("message")).thenReturn(new JSONObject()); joinBolt.cache = mock(LoadingCache.class); when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception"))); - joinBolt.execute(tuple); + joinBolt.execute(tuple); + ExecutionException expectedExecutionException = new ExecutionException(new Exception("join exception")); MetronError error = new MetronError() .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) .withMessage("Joining problem: {}") - .withThrowable(new ExecutionException(new Exception("join exception"))) + .withThrowable(expectedExecutionException) .addRawMessage(new JSONObject()); verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); + verify(outputCollector, times(1)).reportError(any(ExecutionException.class)); + verify(outputCollector, times(1)).ack(eq(tuple)); + verifyNoMoreInteractions(outputCollector); } } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java index ea3c6072da..0f3cc8cac4 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java @@ -19,16 +19,15 @@ import com.fasterxml.jackson.databind.JsonMappingException; import junit.framework.Assert; -import junit.framework.TestCase; import org.adrianwalker.multilinestring.Multiline; -import org.apache.hadoop.fs.Path; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; -import org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore; import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig; +import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; import org.apache.metron.test.bolt.BaseEnrichmentBoltTest; import org.apache.metron.test.utils.UnitTestHelper; +import org.apache.storm.tuple.Tuple; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; @@ -39,9 +38,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest { /** @@ -187,26 +190,29 @@ public void test(String threatTriageConfig, boolean badConfig) throws IOExceptio fieldMap = threatIntelJoinBolt.getFieldMap(sensorType); Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel")); -// Map streamMessageMap = new HashMap<>(); -// streamMessageMap.put("message", message); -// JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); -// Assert.assertFalse(joinedMessage.containsKey("is_alert")); -// -// streamMessageMap.put("message", messageWithTiming); -// joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); -// Assert.assertFalse(joinedMessage.containsKey("is_alert")); -// -// streamMessageMap.put("message", alertMessage); -// joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap); -// Assert.assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert"))); -// -// if(withThreatTriage && !badConfig) { -// Assert.assertTrue(joinedMessage.containsKey("threat.triage.score")); -// Double score = (Double) joinedMessage.get("threat.triage.score"); -// Assert.assertTrue(Math.abs(10d - score) < 1e-10); -// } -// else { -// Assert.assertFalse(joinedMessage.containsKey("threat.triage.score")); -// } + MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class); + Tuple messageTuple = mock(Tuple.class); + when(messageGetStrategy.get(messageTuple)).thenReturn(message); + Map streamMessageMap = new HashMap<>(); + streamMessageMap.put("message", messageTuple); + JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy); + assertFalse(joinedMessage.containsKey("is_alert")); + + when(messageGetStrategy.get(messageTuple)).thenReturn(messageWithTiming); + joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy); + assertFalse(joinedMessage.containsKey("is_alert")); + + when(messageGetStrategy.get(messageTuple)).thenReturn(alertMessage); + joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy); + assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert"))); + + if(withThreatTriage && !badConfig) { + assertTrue(joinedMessage.containsKey("threat.triage.score")); + Double score = (Double) joinedMessage.get("threat.triage.score"); + assertTrue(Math.abs(10d - score) < 1e-10); + } + else { + assertFalse(joinedMessage.containsKey("threat.triage.score")); + } } } From ed5a19728998afb97eb5fc7f11e1a8a1b806be2b Mon Sep 17 00:00:00 2001 From: merrimanr Date: Thu, 1 Jun 2017 09:26:59 -0500 Subject: [PATCH 5/5] Resolved merge conflicts and changed default join cache size to 100000 --- .../metron-enrichment/src/main/flux/enrichment/remote.yaml | 4 ++-- .../main/java/org/apache/metron/enrichment/bolt/JoinBolt.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml index e4f119e65a..0e50f77b3a 100644 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml @@ -312,7 +312,7 @@ bolts: - "${kafka.zk}" configMethods: - name: "withMaxCacheSize" - args: [10000] + args: [100000] - name: "withMaxTimeRetain" args: [10] - id: "enrichmentErrorOutputBolt" @@ -366,7 +366,7 @@ bolts: - "${kafka.zk}" configMethods: - name: "withMaxCacheSize" - args: [10000] + args: [100000] - name: "withMaxTimeRetain" args: [10] - id: "threatIntelErrorOutputBolt" diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java index c2300b05de..f3fe52c9f1 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java @@ -98,10 +98,10 @@ public Map load(String key) throws Exception { prepare(map, topologyContext); } - class JoinRemoveListener implements RemovalListener> { + class JoinRemoveListener implements RemovalListener> { @Override - public void onRemoval(RemovalNotification> removalNotification) { + public void onRemoval(RemovalNotification> removalNotification) { if (removalNotification.getCause() == RemovalCause.SIZE) { String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt."; Exception exception = new Exception(errorMessage);