diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index a1f431b660..e2b947b505 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -341,7 +341,7 @@ org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,ht
org.eclipse.persistence:org.eclipse.persistence.core:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
org.eclipse.persistence:org.eclipse.persistence.jpa.jpql:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
-
+com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE
com.google.code.gson:gson:jar:2.2:compile
org.codehaus.plexus:plexus-classworlds:jar:2.4:compile
org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile
@@ -356,4 +356,4 @@ com.google.code.gson:gson:jar:2.2:compile
org.sonatype.aether:aether-util:jar:1.12:compile
org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
- org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
\ No newline at end of file
+ org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 0c2fff960a..265d5952e9 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -258,6 +258,7 @@ This package installs the Metron Enrichment files
%{metron_home}/config/zookeeper/enrichments/yaf.json
%{metron_home}/config/zookeeper/enrichments/asa.json
%{metron_home}/flux/enrichment/remote.yaml
+%{metron_home}/flux/enrichment/remote-unified.yaml
%attr(0644,root,root) %{metron_home}/lib/metron-enrichment-%{full_version}-uber.jar
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
index 11a4852444..369ba8c6ea 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java
@@ -19,6 +19,10 @@
import java.util.*;
+/**
+ * This is the core logic of how to configure enrichments. The default type of enrichment configuration is a simple list
+ * however more complex enrichment adapters require more complex configuration (e.g. stellar).
+ */
public class ConfigHandler {
private Object config;
private Configs type = Configs.LIST;
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
index b73228f247..56c6490cac 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java
@@ -25,8 +25,8 @@ public class BytesFromPosition implements MessageGetStrategy {
public BytesFromPosition() {};
- public BytesFromPosition(int position) {
- this.position = position;
+ public BytesFromPosition(Integer position) {
+ this.position = position == null?0:position;
}
@Override
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
new file mode 100644
index 0000000000..a0d4b7dbad
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.message;
+
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+/**
+ * This retrieves the JSONObject from the field name by reference.
+ * This is in contrast to JSONFromField, which clones the JSON object and passes by value.
+ */
+public class JSONFromFieldByReference implements MessageGetStrategy {
+ private String messageFieldName;
+ public JSONFromFieldByReference(String messageFieldName) {
+ this.messageFieldName = messageFieldName;
+ }
+
+ @Override
+ public JSONObject get(Tuple tuple) {
+ return (JSONObject) tuple.getValueByField(messageFieldName);
+ }
+}
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
index c91a262d1b..15f0447113 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java
@@ -35,8 +35,8 @@ protected JSONParser initialValue() {
public JSONFromPosition() {};
- public JSONFromPosition(int position) {
- this.position = position;
+ public JSONFromPosition(Integer position) {
+ this.position = position == null?0:position;
}
@Override
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
index 7004d78ed2..46bb406f1c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java
@@ -41,6 +41,7 @@ public enum MessageGetters {
BYTES_FROM_POSITION((String arg) -> new BytesFromPosition(ConversionUtils.convert(arg, Integer.class))),
JSON_FROM_POSITION((String arg) -> new JSONFromPosition(ConversionUtils.convert(arg, Integer.class))),
JSON_FROM_FIELD((String arg) -> new JSONFromField(arg)),
+ JSON_FROM_FIELD_BY_REFERENCE((String arg) -> new JSONFromFieldByReference(arg)),
OBJECT_FROM_FIELD((String arg) -> new ObjectFromField(arg)),
DEFAULT_BYTES_FROM_POSITION(new BytesFromPosition()),
DEFAULT_JSON_FROM_POSITION(new JSONFromPosition()),
diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md
index d742046ad1..aa6fc99e47 100644
--- a/metron-platform/metron-enrichment/README.md
+++ b/metron-platform/metron-enrichment/README.md
@@ -33,6 +33,49 @@ data format (e.g. a JSON Map structure with `original_message` and
![Architecture](enrichment_arch.png)
+### Unified Enrichment Topology
+
+There is an experimental unified enrichment topology which is shipped.
+Currently the architecture, as described above, has a split/join in
+order to perform enrichments in parallel. This poses some issues in
+terms of ease of tuning and reasoning about performance.
+
+In order to deal with these issues, there is an alternative enrichment topology which
+uses data parallelism as opposed to the split/join task parallelism.
+This architecture uses a worker pool to fully enrich any message within
+a worker. This results in
+* Fewer bolts in the topology
+* Each bolt fully operates on a message.
+* Fewer network hops
+
+![Unified Architecture](unified_enrichment_arch.svg)
+
+This architecture is fully backwards compatible; the only difference is
+how the enrichment will operate on each message (in one bolt where the
+split/join is done in a threadpool as opposed
+to split across multiple bolts).
+
+#### Using It
+
+In order to use this, you will need to
+* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-unified.yaml` instead of `remote.yaml`
+* Restart the enrichment topology.
+
+#### Configuring It
+
+There are two parameters which you might want to tune in this topology.
+Both of them are topology configuration adjustable in the flux file
+`$METRON_HOME/config/flux/enrichment/remote-unified.yaml`:
+* `metron.threadpool.size` : The size of the threadpool. This can take a number or a multiple of the number of cores (e.g. `5C` to 5 times the number of cores). The default is `2C`.
+* `metron.threadpool.type` : The type of threadpool. (note: descriptions taken from [here](https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/)).
+ * `FIXED` is a fixed threadpool of size `n`. `n` threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks. This is the default.
+ * `WORK_STEALING` is a work stealing threadpool. This will create and shut down threads dynamically to accommodate the required parallelism level. It also tries to reduce the contention on the task queue, so can be really good in heavily loaded environments. Also good when your tasks create more tasks for the executor, like recursive tasks.
+
+In order to configure the parallelism for the enrichment bolt and threat
+intel bolt, the configurations will be taken from the respective join bolt
+parallelism. When proper ambari support for this is added, we will add
+its own property.
+
## Enrichment Configuration
The configuration for the `enrichment` topology, the topology primarily
@@ -371,3 +414,5 @@ Now we need to start the topologies and send some data:
* Ensure that the documents have new fields `foo`, `bar` and `ALL_CAPS` with values as described above.
Note that we could have used any Stellar statements here, including calling out to HBase via `ENRICHMENT_GET` and `ENRICHMENT_EXISTS` or even calling a machine learning model via [Model as a Service](../../metron-analytics/metron-maas-service).
+
+
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index e82b86bf2d..bcfb41b6ea 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -67,6 +67,12 @@
testtest-jar
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.6.2
+
+
org.apache.metronmetron-profiler-client
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
new file mode 100644
index 0000000000..ddc5ffcb4b
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
@@ -0,0 +1,378 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is a drop-in replacement for the existing split/join enrichment topology.
+# Instead of a fan-out/fan-in architecture, this adopts a data-parallelism strategy
+# whereby a message is fully enriched inside of a UnifiedEnrichmentBolt. This simplifies
+# the architecture greatly and cuts down on network hops. It has unknown performance
+# characteristics, so caveat emptor.
+
+name: "enrichment"
+config:
+ topology.workers: ${enrichment.workers}
+ topology.acker.executors: ${enrichment.acker.executors}
+ topology.worker.childopts: ${topology.worker.childopts}
+ topology.auto-credentials: ${topology.auto-credentials}
+ topology.max.spout.pending: ${topology.max.spout.pending}
+ # Change this if you want to adjust the threadpool size
+ metron.threadpool.size: "2C" # Either a number (e.g. 5) or multiple of cores (e.g. 5C = 5 times the number of cores)
+ # Change this if you want to adjust the threadpool type
+ metron.threadpool.type: "FIXED" # FIXED or WORK_STEALING
+components:
+
+# Enrichment
+ - id: "stellarEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
+ configMethods:
+ - name: "ofType"
+ args:
+ - "ENRICHMENT"
+
+ # Any kafka props for the producer go here.
+ - id: "kafkaWriterProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+
+ - id: "stellarEnrichment"
+ className: "org.apache.metron.enrichment.configuration.Enrichment"
+ constructorArgs:
+ - "stellar"
+ - ref: "stellarEnrichmentAdapter"
+
+ - id: "geoEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
+ - id: "geoEnrichment"
+ className: "org.apache.metron.enrichment.configuration.Enrichment"
+ constructorArgs:
+ - "geo"
+ - ref: "geoEnrichmentAdapter"
+ - id: "hostEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
+ constructorArgs:
+ - '${enrichment.host.known_hosts}'
+ - id: "hostEnrichment"
+ className: "org.apache.metron.enrichment.configuration.Enrichment"
+ constructorArgs:
+ - "host"
+ - ref: "hostEnrichmentAdapter"
+
+ - id: "simpleHBaseEnrichmentConfig"
+ className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
+ configMethods:
+ - name: "withProviderImpl"
+ args:
+ - "${hbase.provider.impl}"
+ - name: "withHBaseTable"
+ args:
+ - "${enrichment.simple.hbase.table}"
+ - name: "withHBaseCF"
+ args:
+ - "${enrichment.simple.hbase.cf}"
+ - id: "simpleHBaseEnrichmentAdapter"
+ className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
+ configMethods:
+ - name: "withConfig"
+ args:
+ - ref: "simpleHBaseEnrichmentConfig"
+ - id: "simpleHBaseEnrichment"
+ className: "org.apache.metron.enrichment.configuration.Enrichment"
+ constructorArgs:
+ - "hbaseEnrichment"
+ - ref: "simpleHBaseEnrichmentAdapter"
+ - id: "enrichments"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "geoEnrichment"
+ - name: "add"
+ args:
+ - ref: "hostEnrichment"
+ - name: "add"
+ args:
+ - ref: "simpleHBaseEnrichment"
+ - name: "add"
+ args:
+ - ref: "stellarEnrichment"
+
+ #enrichment error
+ - id: "enrichmentErrorKafkaWriter"
+ className: "org.apache.metron.writer.kafka.KafkaWriter"
+ configMethods:
+ - name: "withTopic"
+ args:
+ - "${enrichment.error.topic}"
+ - name: "withZkQuorum"
+ args:
+ - "${kafka.zk}"
+ - name: "withProducerConfigs"
+ args:
+ - ref: "kafkaWriterProps"
+
+# Threat Intel
+ - id: "stellarThreatIntelAdapter"
+ className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
+ configMethods:
+ - name: "ofType"
+ args:
+ - "THREAT_INTEL"
+ - id: "stellarThreatIntelEnrichment"
+ className: "org.apache.metron.enrichment.configuration.Enrichment"
+ constructorArgs:
+ - "stellar"
+ - ref: "stellarThreatIntelAdapter"
+ - id: "simpleHBaseThreatIntelConfig"
+ className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
+ configMethods:
+ - name: "withProviderImpl"
+ args:
+ - "${hbase.provider.impl}"
+ - name: "withTrackerHBaseTable"
+ args:
+ - "${threat.intel.tracker.table}"
+ - name: "withTrackerHBaseCF"
+ args:
+ - "${threat.intel.tracker.cf}"
+ - name: "withHBaseTable"
+ args:
+ - "${threat.intel.simple.hbase.table}"
+ - name: "withHBaseCF"
+ args:
+ - "${threat.intel.simple.hbase.cf}"
+ - id: "simpleHBaseThreatIntelAdapter"
+ className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
+ configMethods:
+ - name: "withConfig"
+ args:
+ - ref: "simpleHBaseThreatIntelConfig"
+ - id: "simpleHBaseThreatIntelEnrichment"
+ className: "org.apache.metron.enrichment.configuration.Enrichment"
+ constructorArgs:
+ - "hbaseThreatIntel"
+ - ref: "simpleHBaseThreatIntelAdapter"
+
+ - id: "threatIntels"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - ref: "simpleHBaseThreatIntelEnrichment"
+ - name: "add"
+ args:
+ - ref: "stellarThreatIntelEnrichment"
+
+ #threatintel error
+ - id: "threatIntelErrorKafkaWriter"
+ className: "org.apache.metron.writer.kafka.KafkaWriter"
+ configMethods:
+ - name: "withTopic"
+ args:
+ - "${threat.intel.error.topic}"
+ - name: "withZkQuorum"
+ args:
+ - "${kafka.zk}"
+ - name: "withProducerConfigs"
+ args:
+ - ref: "kafkaWriterProps"
+#indexing
+ - id: "kafkaWriter"
+ className: "org.apache.metron.writer.kafka.KafkaWriter"
+ configMethods:
+ - name: "withTopic"
+ args:
+ - "${enrichment.output.topic}"
+ - name: "withZkQuorum"
+ args:
+ - "${kafka.zk}"
+ - name: "withProducerConfigs"
+ args:
+ - ref: "kafkaWriterProps"
+
+#kafka/zookeeper
+ # Any kafka props for the consumer go here.
+ - id: "kafkaProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "value.deserializer"
+ - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+ - name: "put"
+ args:
+ - "key.deserializer"
+ - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+ - name: "put"
+ args:
+ - "group.id"
+ - "enrichments"
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+
+
+ # The fields to pull out of the kafka messages
+ - id: "fields"
+ className: "java.util.ArrayList"
+ configMethods:
+ - name: "add"
+ args:
+ - "value"
+
+ - id: "kafkaConfig"
+ className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+ constructorArgs:
+ - ref: "kafkaProps"
+ # topic name
+ - "${enrichment.input.topic}"
+ - "${kafka.zk}"
+ - ref: "fields"
+ configMethods:
+ - name: "setFirstPollOffsetStrategy"
+ args:
+ - "${kafka.start}"
+
+
+spouts:
+ - id: "kafkaSpout"
+ className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
+ constructorArgs:
+ - ref: "kafkaConfig"
+ parallelism: ${kafka.spout.parallelism}
+
+bolts:
+# Enrichment Bolts
+ - id: "enrichmentBolt"
+ className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "enrichments"
+ - name: "withMaxCacheSize"
+ args: [${enrichment.join.cache.size}]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - name: "withCaptureCacheStats"
+ args: [true]
+ - name: "withStrategy"
+ args:
+ - "ENRICHMENT"
+ - name: "withMessageGetter"
+ args: ["JSON_FROM_POSITION"]
+ parallelism: ${enrichment.join.parallelism}
+
+ - id: "enrichmentErrorOutputBolt"
+ className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withMessageWriter"
+ args:
+ - ref: "enrichmentErrorKafkaWriter"
+
+
+# Threat Intel Bolts
+ - id: "threatIntelBolt"
+ className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withEnrichments"
+ args:
+ - ref: "threatIntels"
+ - name: "withMaxCacheSize"
+ args: [${enrichment.join.cache.size}]
+ - name: "withMaxTimeRetain"
+ args: [10]
+ - name: "withCaptureCacheStats"
+ args: [true]
+ - name: "withStrategy"
+ args:
+ - "THREAT_INTEL"
+ - name: "withMessageFieldName"
+ args: ["message"]
+ - name: "withMessageGetter"
+ args: ["JSON_FROM_FIELD_BY_REFERENCE"]
+ parallelism: ${threat.intel.join.parallelism}
+
+ - id: "threatIntelErrorOutputBolt"
+ className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withMessageWriter"
+ args:
+ - ref: "threatIntelErrorKafkaWriter"
+
+# Indexing Bolts
+ - id: "outputBolt"
+ className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withMessageWriter"
+ args:
+ - ref: "kafkaWriter"
+ parallelism: ${kafka.writer.parallelism}
+
+
+streams:
+#parser
+ - name: "spout -> enrichmentBolt"
+ from: "kafkaSpout"
+ to: "enrichmentBolt"
+ grouping:
+ type: LOCAL_OR_SHUFFLE
+
+ # Error output
+ - name: "enrichmentBolt -> enrichmentErrorOutputBolt"
+ from: "enrichmentBolt"
+ to: "enrichmentErrorOutputBolt"
+ grouping:
+ streamId: "error"
+ type: LOCAL_OR_SHUFFLE
+
+#threat intel
+ - name: "enrichmentBolt -> threatIntelBolt"
+ from: "enrichmentBolt"
+ to: "threatIntelBolt"
+ grouping:
+ streamId: "message"
+ type: LOCAL_OR_SHUFFLE
+
+#output
+ - name: "threatIntelBolt -> output"
+ from: "threatIntelBolt"
+ to: "outputBolt"
+ grouping:
+ streamId: "message"
+ type: LOCAL_OR_SHUFFLE
+
+ # Error output
+ - name: "threatIntelBolt -> threatIntelErrorOutputBolt"
+ from: "threatIntelBolt"
+ to: "threatIntelErrorOutputBolt"
+ grouping:
+ streamId: "error"
+ type: LOCAL_OR_SHUFFLE
+
+
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
index d3f7820e69..0aabd38bf3 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java
@@ -22,6 +22,7 @@
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -173,7 +174,9 @@ public JSONObject enrich(CacheKey value) {
if(_PERF_LOG.isDebugEnabled()) {
slowLogThreshold = ConversionUtils.convert(globalConfig.getOrDefault(STELLAR_SLOW_LOG, STELLAR_SLOW_LOG_DEFAULT), Long.class);
}
- Map message = value.getValue(Map.class);
+ //Ensure that you clone the message, because process will modify the message. If the message object is modified
+ //then cache misses will happen because the cache will be modified.
+ Map message = new HashMap<>(value.getValue(Map.class));
VariableResolver resolver = new MapVariableResolver(message, sensorConfig, globalConfig);
StellarProcessor processor = new StellarProcessor();
JSONObject enriched = process(message
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 9b84193fe5..dbbb7b6b2b 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -34,6 +34,7 @@
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.enrichment.configuration.Enrichment;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.utils.EnrichmentUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
import org.apache.storm.task.OutputCollector;
@@ -245,16 +246,7 @@ public void execute(Tuple tuple) {
continue;
}
}
- if ( !enrichedField.isEmpty()) {
- for (Object enrichedKey : enrichedField.keySet()) {
- if(!StringUtils.isEmpty(prefix)) {
- enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey));
- }
- else {
- enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey));
- }
- }
- }
+ enrichedMessage = EnrichmentUtils.adjustKeys(enrichedMessage, enrichedField, field, prefix);
}
}
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
index 6e24d659de..1ce0b167b5 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java
@@ -30,6 +30,7 @@
import org.apache.metron.common.message.MessageGetStrategy;
import org.apache.metron.common.utils.MessageUtils;
import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.utils.ThreatIntelUtils;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunctions;
import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
@@ -45,35 +46,6 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- /**
- * The message key under which the overall threat triage score is stored.
- */
- public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score";
-
- /**
- * The prefix of the message keys that record the threat triage rules that fired.
- */
- public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules";
-
- /**
- * The portion of the message key used to record the 'name' field of a rule.
- */
- public static final String THREAT_TRIAGE_RULE_NAME = "name";
-
- /**
- * The portion of the message key used to record the 'comment' field of a rule.
- */
- public static final String THREAT_TRIAGE_RULE_COMMENT = "comment";
-
- /**
- * The portion of the message key used to record the 'score' field of a rule.
- */
- public static final String THREAT_TRIAGE_RULE_SCORE = "score";
-
- /**
- * The portion of the message key used to record the 'reason' field of a rule.
- */
- public static final String THREAT_TRIAGE_RULE_REASON = "reason";
/**
* The Stellar function resolver.
@@ -133,70 +105,12 @@ public Map getFieldMap(String sourceType) {
}
}
+
@Override
public JSONObject joinMessages(Map streamMessageMap, MessageGetStrategy messageGetStrategy) {
JSONObject ret = super.joinMessages(streamMessageMap, messageGetStrategy);
- LOG.trace("Received joined messages: {}", ret);
- boolean isAlert = ret.containsKey("is_alert");
- if(!isAlert) {
- for (Object key : ret.keySet()) {
- if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) {
- isAlert = true;
- break;
- }
- }
- }
- else {
- Object isAlertObj = ret.get("is_alert");
- isAlert = ConversionUtils.convert(isAlertObj, Boolean.class);
- if(!isAlert) {
- ret.remove("is_alert");
- }
- }
- if(isAlert) {
- ret.put("is_alert" , "true");
- String sourceType = MessageUtils.getSensorType(ret);
- SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
- ThreatTriageConfig triageConfig = null;
- if(config != null) {
- triageConfig = config.getThreatIntel().getTriageConfig();
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found sensor enrichment config.", sourceType);
- }
- }
- else {
- LOG.debug("{}: Unable to find threat config.", sourceType );
- }
- if(triageConfig != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig);
- }
-
- if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) {
- LOG.debug("{}: Empty rules!", sourceType);
- }
-
- // triage the threat
- ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext);
- ThreatScore score = threatTriageProcessor.apply(ret);
-
- if(LOG.isDebugEnabled()) {
- String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules());
- LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(),
- rules);
- }
-
- // attach the triage threat score to the message
- if(score.getRuleScores().size() > 0) {
- appendThreatScore(score, ret);
- }
- }
- else {
- LOG.debug("{}: Unable to find threat triage config!", sourceType);
- }
- }
-
- return ret;
+ String sourceType = MessageUtils.getSensorType(ret);
+ return ThreatIntelUtils.triage(ret, getConfigurations().getSensorEnrichmentConfig(sourceType), functionResolver, stellarContext);
}
@Override
@@ -207,24 +121,5 @@ public void reloadCallback(String name, ConfigurationType type) {
}
}
- /**
- * Appends the threat score to the telemetry message.
- * @param threatScore The threat triage score
- * @param message The telemetry message being triaged.
- */
- private void appendThreatScore(ThreatScore threatScore, JSONObject message) {
-
- // append the overall threat score
- message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore());
-
- // append each of the rules - each rule is 'flat'
- Joiner joiner = Joiner.on(".");
- int i = 0;
- for(RuleScore score: threatScore.getRuleScores()) {
- message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName());
- message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment());
- message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore());
- message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason());
- }
- }
+
}
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
new file mode 100644
index 0000000000..c258fb08d5
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment.bolt;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.performance.PerformanceLogger;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase;
+import org.apache.metron.enrichment.configuration.Enrichment;
+import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.enrichment.parallel.EnrichmentContext;
+import org.apache.metron.enrichment.parallel.EnrichmentStrategies;
+import org.apache.metron.enrichment.parallel.ParallelEnricher;
+import org.apache.metron.enrichment.parallel.ConcurrencyContext;
+import org.apache.metron.enrichment.parallel.WorkerPoolStrategies;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This bolt is a unified enrichment/threat intel bolt. In contrast to the split/enrich/join
+ * bolts above, this handles the entire enrichment lifecycle in one bolt using a threadpool to
+ * enrich in parallel.
+ *
+ * From an architectural perspective, this is a divergence from the polymorphism based strategy we have
+ * used in the split/join bolts. Rather, this bolt is provided a strategy to use, either enrichment or threat intel,
+ * through composition. This allows us to move most of the implementation into components independent
+ * from Storm. This will greater facilitate reuse.
+ */
+public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt {
+
+ public static class Perf {} // used for performance logging
+ private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String STELLAR_CONTEXT_CONF = "stellarContext";
+
+ /**
+ * The number of threads in the threadpool. One threadpool is created per process.
+ * This is a topology-level configuration
+ */
+ public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = "metron.threadpool.size";
+ /**
+ * The type of threadpool to create. This is a topology-level configuration.
+ */
+ public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = "metron.threadpool.type";
+
+ /**
+ * The enricher implementation to use. This will do the parallel enrichment via a thread pool.
+ */
+ protected ParallelEnricher enricher;
+
+ /**
+ * The strategy to use for this enrichment bolt. Practically speaking this is either
+ * enrichment or threat intel. It is configured in the topology itself.
+ */
+ protected EnrichmentStrategies strategy;
+ /**
+ * Determine the way to retrieve the message. This must be specified in the topology.
+ */
+ protected MessageGetStrategy messageGetter;
+ protected MessageGetters getterStrategy;
+ protected OutputCollector collector;
+ private Context stellarContext;
+ /**
+ * An enrichment type to adapter map. This is configured externally.
+ */
+ protected Map> enrichmentsByType = new HashMap<>();
+
+ /**
+ * The total number of elements in a LRU cache. This cache is used for the enrichments; if an
+ * element is in the cache, then the result is returned instead of computed.
+ */
+ protected Long maxCacheSize;
+ /**
+ * The total amount of time in minutes since write to keep an element in the cache.
+ */
+ protected Long maxTimeRetain;
+ /**
+ * If the bolt is reloaded, invalidate the cache?
+ */
+ protected boolean invalidateCacheOnReload = false;
+
+ /**
+ * The message field to use. If this is set, then this indicates the field to use to retrieve the message object.
+ * IF this is unset, then we presume that the message is coming in as a string version of a JSON blob on the first
+ * element of the tuple.
+ */
+ protected String messageFieldName;
+ protected EnrichmentContext enrichmentContext;
+ protected boolean captureCacheStats = true;
+
+ public UnifiedEnrichmentBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ /**
+ * Specify the enrichments to support.
+ * @param enrichments enrichment
+ * @return Instance of this class
+ */
+ public UnifiedEnrichmentBolt withEnrichments(List enrichments) {
+ for(Enrichment e : enrichments) {
+ enrichmentsByType.put(e.getType(), e.getAdapter());
+ }
+ return this;
+ }
+
+ public UnifiedEnrichmentBolt withCaptureCacheStats(boolean captureCacheStats) {
+ this.captureCacheStats = captureCacheStats;
+ return this;
+ }
+
+ /**
+ * Determine the message get strategy (One of the enums from MessageGetters).
+ * @param getter
+ * @return
+ */
+ public UnifiedEnrichmentBolt withMessageGetter(String getter) {
+ this.getterStrategy = MessageGetters.valueOf(getter);
+ return this;
+ }
+
+ /**
+ * Figure out how many threads to use in the thread pool. The user can pass an arbitrary object, so parse it
+ * according to some rules. If it's a number, then cast to an int. IF it's a string and ends with "C", then strip
+ * the C and treat it as an integral multiple of the number of cores. If it's a string and does not end with a C, then treat
+ * it as a number in string form.
+ * @param numThreads
+ * @return
+ */
+ private static int getNumThreads(Object numThreads) {
+ if(numThreads instanceof Number) {
+ return ((Number)numThreads).intValue();
+ }
+ else if(numThreads instanceof String) {
+ String numThreadsStr = ((String)numThreads).trim().toUpperCase();
+ if(numThreadsStr.endsWith("C")) {
+ Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
+ return factor*Runtime.getRuntime().availableProcessors();
+ }
+ else {
+ return Integer.parseInt(numThreadsStr);
+ }
+ }
+ return 2*Runtime.getRuntime().availableProcessors();
+ }
+
+ /**
+ * The strategy to use. This indicates which part of the config that this bolt uses
+ * to enrich, threat intel or enrichment. This must conform to one of the EnrichmentStrategies
+ * enum.
+ * @param strategy
+ * @return
+ */
+ public UnifiedEnrichmentBolt withStrategy(String strategy) {
+ this.strategy = EnrichmentStrategies.valueOf(strategy);
+ return this;
+ }
+
+ /**
+ * @param maxCacheSize Maximum size of cache before flushing
+ * @return Instance of this class
+ */
+ public UnifiedEnrichmentBolt withMaxCacheSize(long maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ return this;
+ }
+
+ /**
+ * @param maxTimeRetain Maximum time to retain cached entry before expiring
+ * @return Instance of this class
+ */
+
+ public UnifiedEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) {
+ this.maxTimeRetain = maxTimeRetain;
+ return this;
+ }
+
+ /**
+ * Invalidate the cache on reload of bolt. By default, we do not.
+ * @param cacheInvalidationOnReload
+ * @return
+ */
+ public UnifiedEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload) {
+ this.invalidateCacheOnReload= cacheInvalidationOnReload;
+ return this;
+ }
+
+
+ @Override
+ public void reloadCallback(String name, ConfigurationType type) {
+ if(invalidateCacheOnReload) {
+ if(strategy != null && ConcurrencyContext.get(strategy).getCache() != null) {
+ ConcurrencyContext.get(strategy).getCache().invalidateAll();
+ }
+ }
+ if(type == ConfigurationType.GLOBAL && enrichmentsByType != null) {
+ for(EnrichmentAdapter adapter : enrichmentsByType.values()) {
+ adapter.updateAdapter(getConfigurations().getGlobalConfig());
+ }
+ }
+ }
+
+
+ /**
+ * Fully enrich a message based on the strategy which was used to configure the bolt.
+ * Each enrichment is done in parallel and the results are joined together. Each enrichment
+ * will use a cache so computation is avoided if the result has been computed before.
+ *
+ * Errors in the enrichment result in an error message being sent on the "error" stream.
+ * The successful enrichments will be joined with the original message and the message will
+ * be sent along the "message" stream.
+ *
+ * @param input The input tuple to be processed.
+ */
+ @Override
+ public void execute(Tuple input) {
+ JSONObject message = generateMessage(input);
+ try {
+ String sourceType = MessageUtils.getSensorType(message);
+ SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType);
+ if(config == null) {
+ LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType);
+ config = new SensorEnrichmentConfig();
+ }
+ //This is an existing kludge for the stellar adapter to pass information along.
+ //We should figure out if this can be rearchitected a bit. This smells.
+ config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext);
+ String guid = getGUID(input, message);
+
+ // enrich the message
+ ParallelEnricher.EnrichmentResult result = enricher.apply(message, strategy, config, perfLog);
+ JSONObject enriched = result.getResult();
+ enriched = strategy.postProcess(enriched, config, enrichmentContext);
+
+ //we can emit the message now
+ collector.emit("message",
+ input,
+ new Values(guid, enriched));
+ //and handle each of the errors in turn. If any adapter errored out, we will have one message per.
+ for(Map.Entry