From 1d95b8316a18097747be116a0276c56b894fb79c Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 7 Mar 2018 09:16:45 -0500 Subject: [PATCH] METRON-1460: Create a complementary non-split-join enrichment topology closes apache/metron#940 --- dependencies_with_url.csv | 4 +- .../docker/rpm-docker/SPECS/metron.spec | 1 + .../enrichment/handler/ConfigHandler.java | 4 + .../common/message/BytesFromPosition.java | 4 +- .../message/JSONFromFieldByReference.java | 37 ++ .../common/message/JSONFromPosition.java | 4 +- .../metron/common/message/MessageGetters.java | 1 + metron-platform/metron-enrichment/README.md | 45 ++ metron-platform/metron-enrichment/pom.xml | 6 + .../main/flux/enrichment/remote-unified.yaml | 378 ++++++++++++++++ .../adapters/stellar/StellarAdapter.java | 5 +- .../bolt/GenericEnrichmentBolt.java | 12 +- .../enrichment/bolt/ThreatIntelJoinBolt.java | 115 +---- .../bolt/UnifiedEnrichmentBolt.java | 412 ++++++++++++++++++ .../parallel/ConcurrencyContext.java | 96 ++++ .../parallel/EnrichmentCallable.java | 66 +++ .../parallel/EnrichmentContext.java | 43 ++ .../parallel/EnrichmentStrategies.java | 108 +++++ .../parallel/EnrichmentStrategy.java | 71 +++ .../enrichment/parallel/ParallelEnricher.java | 281 ++++++++++++ .../parallel/WorkerPoolStrategies.java | 45 ++ .../enrichment/utils/EnrichmentUtils.java | 16 + .../enrichment/utils/ThreatIntelUtils.java | 127 ++++++ .../EnrichmentIntegrationTest.java | 42 +- .../UnifiedEnrichmentIntegrationTest.java | 25 ++ .../parallel/ParallelEnricherTest.java | 157 +++++++ .../unified_enrichment_arch.svg | 14 + .../unified_enrichment_arch_diagram.xml | 14 + 28 files changed, 1983 insertions(+), 150 deletions(-) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java create mode 100644 metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java create mode 100644 metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java create mode 100644 metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java create mode 100644 metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java create mode 100644 metron-platform/metron-enrichment/unified_enrichment_arch.svg create mode 100644 metron-platform/metron-enrichment/unified_enrichment_arch_diagram.xml diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index a1f431b660..e2b947b505 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -341,7 +341,7 @@ org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,ht org.eclipse.persistence:org.eclipse.persistence.core:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink org.eclipse.persistence:org.eclipse.persistence.jpa.jpql:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink - +com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE com.google.code.gson:gson:jar:2.2:compile org.codehaus.plexus:plexus-classworlds:jar:2.4:compile org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile @@ -356,4 +356,4 @@ com.google.code.gson:gson:jar:2.2:compile org.sonatype.aether:aether-util:jar:1.12:compile org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile - org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile \ No newline at end of file + org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec index 0c2fff960a..265d5952e9 100644 --- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec +++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec @@ -258,6 +258,7 @@ This package installs the Metron Enrichment files %{metron_home}/config/zookeeper/enrichments/yaf.json %{metron_home}/config/zookeeper/enrichments/asa.json %{metron_home}/flux/enrichment/remote.yaml +%{metron_home}/flux/enrichment/remote-unified.yaml %attr(0644,root,root) %{metron_home}/lib/metron-enrichment-%{full_version}-uber.jar # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java index 11a4852444..369ba8c6ea 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/handler/ConfigHandler.java @@ -19,6 +19,10 @@ import java.util.*; +/** + * This is the core logic of how to configure enrichments. The default type of enrichment configuration is a simple list + * however more complex enrichment adapters require more complex configuration (e.g. stellar). + */ public class ConfigHandler { private Object config; private Configs type = Configs.LIST; diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java index b73228f247..56c6490cac 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/BytesFromPosition.java @@ -25,8 +25,8 @@ public class BytesFromPosition implements MessageGetStrategy { public BytesFromPosition() {}; - public BytesFromPosition(int position) { - this.position = position; + public BytesFromPosition(Integer position) { + this.position = position == null?0:position; } @Override diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java new file mode 100644 index 0000000000..a0d4b7dbad --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromFieldByReference.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.message; + +import org.apache.storm.tuple.Tuple; +import org.json.simple.JSONObject; + +/** + * This retrieves the JSONObject from the field name by reference. + * This is in contrast to JSONFromField, which clones the JSON object and passes by value. + */ +public class JSONFromFieldByReference implements MessageGetStrategy { + private String messageFieldName; + public JSONFromFieldByReference(String messageFieldName) { + this.messageFieldName = messageFieldName; + } + + @Override + public JSONObject get(Tuple tuple) { + return (JSONObject) tuple.getValueByField(messageFieldName); + } +} diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java index c91a262d1b..15f0447113 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/JSONFromPosition.java @@ -35,8 +35,8 @@ protected JSONParser initialValue() { public JSONFromPosition() {}; - public JSONFromPosition(int position) { - this.position = position; + public JSONFromPosition(Integer position) { + this.position = position == null?0:position; } @Override diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java index 7004d78ed2..46bb406f1c 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/message/MessageGetters.java @@ -41,6 +41,7 @@ public enum MessageGetters { BYTES_FROM_POSITION((String arg) -> new BytesFromPosition(ConversionUtils.convert(arg, Integer.class))), JSON_FROM_POSITION((String arg) -> new JSONFromPosition(ConversionUtils.convert(arg, Integer.class))), JSON_FROM_FIELD((String arg) -> new JSONFromField(arg)), + JSON_FROM_FIELD_BY_REFERENCE((String arg) -> new JSONFromFieldByReference(arg)), OBJECT_FROM_FIELD((String arg) -> new ObjectFromField(arg)), DEFAULT_BYTES_FROM_POSITION(new BytesFromPosition()), DEFAULT_JSON_FROM_POSITION(new JSONFromPosition()), diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md index d742046ad1..aa6fc99e47 100644 --- a/metron-platform/metron-enrichment/README.md +++ b/metron-platform/metron-enrichment/README.md @@ -33,6 +33,49 @@ data format (e.g. a JSON Map structure with `original_message` and ![Architecture](enrichment_arch.png) +### Unified Enrichment Topology + +There is an experimental unified enrichment topology which is shipped. +Currently the architecture, as described above, has a split/join in +order to perform enrichments in parallel. This poses some issues in +terms of ease of tuning and reasoning about performance. + +In order to deal with these issues, there is an alternative enrichment topology which +uses data parallelism as opposed to the split/join task parallelism. +This architecture uses a worker pool to fully enrich any message within +a worker. This results in +* Fewer bolts in the topology +* Each bolt fully operates on a message. +* Fewer network hops + +![Unified Architecture](unified_enrichment_arch.svg) + +This architecture is fully backwards compatible; the only difference is +how the enrichment will operate on each message (in one bolt where the +split/join is done in a threadpool as opposed +to split across multiple bolts). + +#### Using It + +In order to use this, you will need to +* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-unified.yaml` instead of `remote.yaml` +* Restart the enrichment topology. + +#### Configuring It + +There are two parameters which you might want to tune in this topology. +Both of them are topology configuration adjustable in the flux file +`$METRON_HOME/config/flux/enrichment/remote-unified.yaml`: +* `metron.threadpool.size` : The size of the threadpool. This can take a number or a multiple of the number of cores (e.g. `5C` to 5 times the number of cores). The default is `2C`. +* `metron.threadpool.type` : The type of threadpool. (note: descriptions taken from [here](https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/)). + * `FIXED` is a fixed threadpool of size `n`. `n` threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks. This is the default. + * `WORK_STEALING` is a work stealing threadpool. This will create and shut down threads dynamically to accommodate the required parallelism level. It also tries to reduce the contention on the task queue, so can be really good in heavily loaded environments. Also good when your tasks create more tasks for the executor, like recursive tasks. + +In order to configure the parallelism for the enrichment bolt and threat +intel bolt, the configurations will be taken from the respective join bolt +parallelism. When proper ambari support for this is added, we will add +its own property. + ## Enrichment Configuration The configuration for the `enrichment` topology, the topology primarily @@ -371,3 +414,5 @@ Now we need to start the topologies and send some data: * Ensure that the documents have new fields `foo`, `bar` and `ALL_CAPS` with values as described above. Note that we could have used any Stellar statements here, including calling out to HBase via `ENRICHMENT_GET` and `ENRICHMENT_EXISTS` or even calling a machine learning model via [Model as a Service](../../metron-analytics/metron-maas-service). + + diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml index e82b86bf2d..bcfb41b6ea 100644 --- a/metron-platform/metron-enrichment/pom.xml +++ b/metron-platform/metron-enrichment/pom.xml @@ -67,6 +67,12 @@ test test-jar + + com.github.ben-manes.caffeine + caffeine + 2.6.2 + + org.apache.metron metron-profiler-client diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml new file mode 100644 index 0000000000..ddc5ffcb4b --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml @@ -0,0 +1,378 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is a drop-in replacement for the existing split/join enrichment topology. +# Instead of a fan-out/fan-in architecture, this adopts a data-parallelism strategy +# whereby a message is fully enriched inside of a UnifiedEnrichmentBolt. This simplifies +# the architecture greatly and cuts down on network hops. It has unknown performance +# characteristics, so caveat emptor. + +name: "enrichment" +config: + topology.workers: ${enrichment.workers} + topology.acker.executors: ${enrichment.acker.executors} + topology.worker.childopts: ${topology.worker.childopts} + topology.auto-credentials: ${topology.auto-credentials} + topology.max.spout.pending: ${topology.max.spout.pending} + # Change this if you want to adjust the threadpool size + metron.threadpool.size: "2C" # Either a number (e.g. 5) or multiple of cores (e.g. 5C = 5 times the number of cores) + # Change this if you want to adjust the threadpool type + metron.threadpool.type: "FIXED" # FIXED or WORK_STEALING +components: + +# Enrichment + - id: "stellarEnrichmentAdapter" + className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" + configMethods: + - name: "ofType" + args: + - "ENRICHMENT" + + # Any kafka props for the producer go here. + - id: "kafkaWriterProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + + - id: "stellarEnrichment" + className: "org.apache.metron.enrichment.configuration.Enrichment" + constructorArgs: + - "stellar" + - ref: "stellarEnrichmentAdapter" + + - id: "geoEnrichmentAdapter" + className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter" + - id: "geoEnrichment" + className: "org.apache.metron.enrichment.configuration.Enrichment" + constructorArgs: + - "geo" + - ref: "geoEnrichmentAdapter" + - id: "hostEnrichmentAdapter" + className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter" + constructorArgs: + - '${enrichment.host.known_hosts}' + - id: "hostEnrichment" + className: "org.apache.metron.enrichment.configuration.Enrichment" + constructorArgs: + - "host" + - ref: "hostEnrichmentAdapter" + + - id: "simpleHBaseEnrichmentConfig" + className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" + configMethods: + - name: "withProviderImpl" + args: + - "${hbase.provider.impl}" + - name: "withHBaseTable" + args: + - "${enrichment.simple.hbase.table}" + - name: "withHBaseCF" + args: + - "${enrichment.simple.hbase.cf}" + - id: "simpleHBaseEnrichmentAdapter" + className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" + configMethods: + - name: "withConfig" + args: + - ref: "simpleHBaseEnrichmentConfig" + - id: "simpleHBaseEnrichment" + className: "org.apache.metron.enrichment.configuration.Enrichment" + constructorArgs: + - "hbaseEnrichment" + - ref: "simpleHBaseEnrichmentAdapter" + - id: "enrichments" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - ref: "geoEnrichment" + - name: "add" + args: + - ref: "hostEnrichment" + - name: "add" + args: + - ref: "simpleHBaseEnrichment" + - name: "add" + args: + - ref: "stellarEnrichment" + + #enrichment error + - id: "enrichmentErrorKafkaWriter" + className: "org.apache.metron.writer.kafka.KafkaWriter" + configMethods: + - name: "withTopic" + args: + - "${enrichment.error.topic}" + - name: "withZkQuorum" + args: + - "${kafka.zk}" + - name: "withProducerConfigs" + args: + - ref: "kafkaWriterProps" + +# Threat Intel + - id: "stellarThreatIntelAdapter" + className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" + configMethods: + - name: "ofType" + args: + - "THREAT_INTEL" + - id: "stellarThreatIntelEnrichment" + className: "org.apache.metron.enrichment.configuration.Enrichment" + constructorArgs: + - "stellar" + - ref: "stellarThreatIntelAdapter" + - id: "simpleHBaseThreatIntelConfig" + className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" + configMethods: + - name: "withProviderImpl" + args: + - "${hbase.provider.impl}" + - name: "withTrackerHBaseTable" + args: + - "${threat.intel.tracker.table}" + - name: "withTrackerHBaseCF" + args: + - "${threat.intel.tracker.cf}" + - name: "withHBaseTable" + args: + - "${threat.intel.simple.hbase.table}" + - name: "withHBaseCF" + args: + - "${threat.intel.simple.hbase.cf}" + - id: "simpleHBaseThreatIntelAdapter" + className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" + configMethods: + - name: "withConfig" + args: + - ref: "simpleHBaseThreatIntelConfig" + - id: "simpleHBaseThreatIntelEnrichment" + className: "org.apache.metron.enrichment.configuration.Enrichment" + constructorArgs: + - "hbaseThreatIntel" + - ref: "simpleHBaseThreatIntelAdapter" + + - id: "threatIntels" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - ref: "simpleHBaseThreatIntelEnrichment" + - name: "add" + args: + - ref: "stellarThreatIntelEnrichment" + + #threatintel error + - id: "threatIntelErrorKafkaWriter" + className: "org.apache.metron.writer.kafka.KafkaWriter" + configMethods: + - name: "withTopic" + args: + - "${threat.intel.error.topic}" + - name: "withZkQuorum" + args: + - "${kafka.zk}" + - name: "withProducerConfigs" + args: + - ref: "kafkaWriterProps" +#indexing + - id: "kafkaWriter" + className: "org.apache.metron.writer.kafka.KafkaWriter" + configMethods: + - name: "withTopic" + args: + - "${enrichment.output.topic}" + - name: "withZkQuorum" + args: + - "${kafka.zk}" + - name: "withProducerConfigs" + args: + - ref: "kafkaWriterProps" + +#kafka/zookeeper + # Any kafka props for the consumer go here. + - id: "kafkaProps" + className: "java.util.HashMap" + configMethods: + - name: "put" + args: + - "value.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "key.deserializer" + - "org.apache.kafka.common.serialization.ByteArrayDeserializer" + - name: "put" + args: + - "group.id" + - "enrichments" + - name: "put" + args: + - "security.protocol" + - "${kafka.security.protocol}" + + + # The fields to pull out of the kafka messages + - id: "fields" + className: "java.util.ArrayList" + configMethods: + - name: "add" + args: + - "value" + + - id: "kafkaConfig" + className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" + constructorArgs: + - ref: "kafkaProps" + # topic name + - "${enrichment.input.topic}" + - "${kafka.zk}" + - ref: "fields" + configMethods: + - name: "setFirstPollOffsetStrategy" + args: + - "${kafka.start}" + + +spouts: + - id: "kafkaSpout" + className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" + constructorArgs: + - ref: "kafkaConfig" + parallelism: ${kafka.spout.parallelism} + +bolts: +# Enrichment Bolts + - id: "enrichmentBolt" + className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withEnrichments" + args: + - ref: "enrichments" + - name: "withMaxCacheSize" + args: [${enrichment.join.cache.size}] + - name: "withMaxTimeRetain" + args: [10] + - name: "withCaptureCacheStats" + args: [true] + - name: "withStrategy" + args: + - "ENRICHMENT" + - name: "withMessageGetter" + args: ["JSON_FROM_POSITION"] + parallelism: ${enrichment.join.parallelism} + + - id: "enrichmentErrorOutputBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withMessageWriter" + args: + - ref: "enrichmentErrorKafkaWriter" + + +# Threat Intel Bolts + - id: "threatIntelBolt" + className: "org.apache.metron.enrichment.bolt.UnifiedEnrichmentBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withEnrichments" + args: + - ref: "threatIntels" + - name: "withMaxCacheSize" + args: [${enrichment.join.cache.size}] + - name: "withMaxTimeRetain" + args: [10] + - name: "withCaptureCacheStats" + args: [true] + - name: "withStrategy" + args: + - "THREAT_INTEL" + - name: "withMessageFieldName" + args: ["message"] + - name: "withMessageGetter" + args: ["JSON_FROM_FIELD_BY_REFERENCE"] + parallelism: ${threat.intel.join.parallelism} + + - id: "threatIntelErrorOutputBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withMessageWriter" + args: + - ref: "threatIntelErrorKafkaWriter" + +# Indexing Bolts + - id: "outputBolt" + className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" + constructorArgs: + - "${kafka.zk}" + configMethods: + - name: "withMessageWriter" + args: + - ref: "kafkaWriter" + parallelism: ${kafka.writer.parallelism} + + +streams: +#parser + - name: "spout -> enrichmentBolt" + from: "kafkaSpout" + to: "enrichmentBolt" + grouping: + type: LOCAL_OR_SHUFFLE + + # Error output + - name: "enrichmentBolt -> enrichmentErrorOutputBolt" + from: "enrichmentBolt" + to: "enrichmentErrorOutputBolt" + grouping: + streamId: "error" + type: LOCAL_OR_SHUFFLE + +#threat intel + - name: "enrichmentBolt -> threatIntelBolt" + from: "enrichmentBolt" + to: "threatIntelBolt" + grouping: + streamId: "message" + type: LOCAL_OR_SHUFFLE + +#output + - name: "threatIntelBolt -> output" + from: "threatIntelBolt" + to: "outputBolt" + grouping: + streamId: "message" + type: LOCAL_OR_SHUFFLE + + # Error output + - name: "threatIntelBolt -> threatIntelErrorOutputBolt" + from: "threatIntelBolt" + to: "threatIntelErrorOutputBolt" + grouping: + streamId: "error" + type: LOCAL_OR_SHUFFLE + + diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java index d3f7820e69..0aabd38bf3 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/adapters/stellar/StellarAdapter.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -173,7 +174,9 @@ public JSONObject enrich(CacheKey value) { if(_PERF_LOG.isDebugEnabled()) { slowLogThreshold = ConversionUtils.convert(globalConfig.getOrDefault(STELLAR_SLOW_LOG, STELLAR_SLOW_LOG_DEFAULT), Long.class); } - Map message = value.getValue(Map.class); + //Ensure that you clone the message, because process will modify the message. If the message object is modified + //then cache misses will happen because the cache will be modified. + Map message = new HashMap<>(value.getValue(Map.class)); VariableResolver resolver = new MapVariableResolver(message, sensorConfig, globalConfig); StellarProcessor processor = new StellarProcessor(); JSONObject enriched = process(message diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java index 9b84193fe5..dbbb7b6b2b 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java @@ -34,6 +34,7 @@ import org.apache.metron.common.utils.ErrorUtils; import org.apache.metron.enrichment.configuration.Enrichment; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.apache.metron.enrichment.utils.EnrichmentUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.storm.task.OutputCollector; @@ -245,16 +246,7 @@ public void execute(Tuple tuple) { continue; } } - if ( !enrichedField.isEmpty()) { - for (Object enrichedKey : enrichedField.keySet()) { - if(!StringUtils.isEmpty(prefix)) { - enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey)); - } - else { - enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey)); - } - } - } + enrichedMessage = EnrichmentUtils.adjustKeys(enrichedMessage, enrichedField, field, prefix); } } diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java index 6e24d659de..1ce0b167b5 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBolt.java @@ -30,6 +30,7 @@ import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.utils.MessageUtils; import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; +import org.apache.metron.enrichment.utils.ThreatIntelUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; @@ -45,35 +46,6 @@ public class ThreatIntelJoinBolt extends EnrichmentJoinBolt { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - /** - * The message key under which the overall threat triage score is stored. - */ - public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score"; - - /** - * The prefix of the message keys that record the threat triage rules that fired. - */ - public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules"; - - /** - * The portion of the message key used to record the 'name' field of a rule. - */ - public static final String THREAT_TRIAGE_RULE_NAME = "name"; - - /** - * The portion of the message key used to record the 'comment' field of a rule. - */ - public static final String THREAT_TRIAGE_RULE_COMMENT = "comment"; - - /** - * The portion of the message key used to record the 'score' field of a rule. - */ - public static final String THREAT_TRIAGE_RULE_SCORE = "score"; - - /** - * The portion of the message key used to record the 'reason' field of a rule. - */ - public static final String THREAT_TRIAGE_RULE_REASON = "reason"; /** * The Stellar function resolver. @@ -133,70 +105,12 @@ public Map getFieldMap(String sourceType) { } } + @Override public JSONObject joinMessages(Map streamMessageMap, MessageGetStrategy messageGetStrategy) { JSONObject ret = super.joinMessages(streamMessageMap, messageGetStrategy); - LOG.trace("Received joined messages: {}", ret); - boolean isAlert = ret.containsKey("is_alert"); - if(!isAlert) { - for (Object key : ret.keySet()) { - if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) { - isAlert = true; - break; - } - } - } - else { - Object isAlertObj = ret.get("is_alert"); - isAlert = ConversionUtils.convert(isAlertObj, Boolean.class); - if(!isAlert) { - ret.remove("is_alert"); - } - } - if(isAlert) { - ret.put("is_alert" , "true"); - String sourceType = MessageUtils.getSensorType(ret); - SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType); - ThreatTriageConfig triageConfig = null; - if(config != null) { - triageConfig = config.getThreatIntel().getTriageConfig(); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found sensor enrichment config.", sourceType); - } - } - else { - LOG.debug("{}: Unable to find threat config.", sourceType ); - } - if(triageConfig != null) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig); - } - - if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) { - LOG.debug("{}: Empty rules!", sourceType); - } - - // triage the threat - ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext); - ThreatScore score = threatTriageProcessor.apply(ret); - - if(LOG.isDebugEnabled()) { - String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules()); - LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(), - rules); - } - - // attach the triage threat score to the message - if(score.getRuleScores().size() > 0) { - appendThreatScore(score, ret); - } - } - else { - LOG.debug("{}: Unable to find threat triage config!", sourceType); - } - } - - return ret; + String sourceType = MessageUtils.getSensorType(ret); + return ThreatIntelUtils.triage(ret, getConfigurations().getSensorEnrichmentConfig(sourceType), functionResolver, stellarContext); } @Override @@ -207,24 +121,5 @@ public void reloadCallback(String name, ConfigurationType type) { } } - /** - * Appends the threat score to the telemetry message. - * @param threatScore The threat triage score - * @param message The telemetry message being triaged. - */ - private void appendThreatScore(ThreatScore threatScore, JSONObject message) { - - // append the overall threat score - message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore()); - - // append each of the rules - each rule is 'flat' - Joiner joiner = Joiner.on("."); - int i = 0; - for(RuleScore score: threatScore.getRuleScores()) { - message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName()); - message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment()); - message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore()); - message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason()); - } - } + } diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java new file mode 100644 index 0000000000..c258fb08d5 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/UnifiedEnrichmentBolt.java @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.bolt; + +import org.apache.metron.common.Constants; +import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.message.MessageGetStrategy; +import org.apache.metron.common.message.MessageGetters; +import org.apache.metron.common.performance.PerformanceLogger; +import org.apache.metron.common.utils.ErrorUtils; +import org.apache.metron.common.utils.MessageUtils; +import org.apache.metron.enrichment.adapters.geo.GeoLiteDatabase; +import org.apache.metron.enrichment.configuration.Enrichment; +import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.apache.metron.enrichment.parallel.EnrichmentContext; +import org.apache.metron.enrichment.parallel.EnrichmentStrategies; +import org.apache.metron.enrichment.parallel.ParallelEnricher; +import org.apache.metron.enrichment.parallel.ConcurrencyContext; +import org.apache.metron.enrichment.parallel.WorkerPoolStrategies; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * This bolt is a unified enrichment/threat intel bolt. In contrast to the split/enrich/join + * bolts above, this handles the entire enrichment lifecycle in one bolt using a threadpool to + * enrich in parallel. + * + * From an architectural perspective, this is a divergence from the polymorphism based strategy we have + * used in the split/join bolts. Rather, this bolt is provided a strategy to use, either enrichment or threat intel, + * through composition. This allows us to move most of the implementation into components independent + * from Storm. This will greater facilitate reuse. + */ +public class UnifiedEnrichmentBolt extends ConfiguredEnrichmentBolt { + + public static class Perf {} // used for performance logging + private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String STELLAR_CONTEXT_CONF = "stellarContext"; + + /** + * The number of threads in the threadpool. One threadpool is created per process. + * This is a topology-level configuration + */ + public static final String THREADPOOL_NUM_THREADS_TOPOLOGY_CONF = "metron.threadpool.size"; + /** + * The type of threadpool to create. This is a topology-level configuration. + */ + public static final String THREADPOOL_TYPE_TOPOLOGY_CONF = "metron.threadpool.type"; + + /** + * The enricher implementation to use. This will do the parallel enrichment via a thread pool. + */ + protected ParallelEnricher enricher; + + /** + * The strategy to use for this enrichment bolt. Practically speaking this is either + * enrichment or threat intel. It is configured in the topology itself. + */ + protected EnrichmentStrategies strategy; + /** + * Determine the way to retrieve the message. This must be specified in the topology. + */ + protected MessageGetStrategy messageGetter; + protected MessageGetters getterStrategy; + protected OutputCollector collector; + private Context stellarContext; + /** + * An enrichment type to adapter map. This is configured externally. + */ + protected Map> enrichmentsByType = new HashMap<>(); + + /** + * The total number of elements in a LRU cache. This cache is used for the enrichments; if an + * element is in the cache, then the result is returned instead of computed. + */ + protected Long maxCacheSize; + /** + * The total amount of time in minutes since write to keep an element in the cache. + */ + protected Long maxTimeRetain; + /** + * If the bolt is reloaded, invalidate the cache? + */ + protected boolean invalidateCacheOnReload = false; + + /** + * The message field to use. If this is set, then this indicates the field to use to retrieve the message object. + * IF this is unset, then we presume that the message is coming in as a string version of a JSON blob on the first + * element of the tuple. + */ + protected String messageFieldName; + protected EnrichmentContext enrichmentContext; + protected boolean captureCacheStats = true; + + public UnifiedEnrichmentBolt(String zookeeperUrl) { + super(zookeeperUrl); + } + + /** + * Specify the enrichments to support. + * @param enrichments enrichment + * @return Instance of this class + */ + public UnifiedEnrichmentBolt withEnrichments(List enrichments) { + for(Enrichment e : enrichments) { + enrichmentsByType.put(e.getType(), e.getAdapter()); + } + return this; + } + + public UnifiedEnrichmentBolt withCaptureCacheStats(boolean captureCacheStats) { + this.captureCacheStats = captureCacheStats; + return this; + } + + /** + * Determine the message get strategy (One of the enums from MessageGetters). + * @param getter + * @return + */ + public UnifiedEnrichmentBolt withMessageGetter(String getter) { + this.getterStrategy = MessageGetters.valueOf(getter); + return this; + } + + /** + * Figure out how many threads to use in the thread pool. The user can pass an arbitrary object, so parse it + * according to some rules. If it's a number, then cast to an int. IF it's a string and ends with "C", then strip + * the C and treat it as an integral multiple of the number of cores. If it's a string and does not end with a C, then treat + * it as a number in string form. + * @param numThreads + * @return + */ + private static int getNumThreads(Object numThreads) { + if(numThreads instanceof Number) { + return ((Number)numThreads).intValue(); + } + else if(numThreads instanceof String) { + String numThreadsStr = ((String)numThreads).trim().toUpperCase(); + if(numThreadsStr.endsWith("C")) { + Integer factor = Integer.parseInt(numThreadsStr.replace("C", "")); + return factor*Runtime.getRuntime().availableProcessors(); + } + else { + return Integer.parseInt(numThreadsStr); + } + } + return 2*Runtime.getRuntime().availableProcessors(); + } + + /** + * The strategy to use. This indicates which part of the config that this bolt uses + * to enrich, threat intel or enrichment. This must conform to one of the EnrichmentStrategies + * enum. + * @param strategy + * @return + */ + public UnifiedEnrichmentBolt withStrategy(String strategy) { + this.strategy = EnrichmentStrategies.valueOf(strategy); + return this; + } + + /** + * @param maxCacheSize Maximum size of cache before flushing + * @return Instance of this class + */ + public UnifiedEnrichmentBolt withMaxCacheSize(long maxCacheSize) { + this.maxCacheSize = maxCacheSize; + return this; + } + + /** + * @param maxTimeRetain Maximum time to retain cached entry before expiring + * @return Instance of this class + */ + + public UnifiedEnrichmentBolt withMaxTimeRetain(long maxTimeRetain) { + this.maxTimeRetain = maxTimeRetain; + return this; + } + + /** + * Invalidate the cache on reload of bolt. By default, we do not. + * @param cacheInvalidationOnReload + * @return + */ + public UnifiedEnrichmentBolt withCacheInvalidationOnReload(boolean cacheInvalidationOnReload) { + this.invalidateCacheOnReload= cacheInvalidationOnReload; + return this; + } + + + @Override + public void reloadCallback(String name, ConfigurationType type) { + if(invalidateCacheOnReload) { + if(strategy != null && ConcurrencyContext.get(strategy).getCache() != null) { + ConcurrencyContext.get(strategy).getCache().invalidateAll(); + } + } + if(type == ConfigurationType.GLOBAL && enrichmentsByType != null) { + for(EnrichmentAdapter adapter : enrichmentsByType.values()) { + adapter.updateAdapter(getConfigurations().getGlobalConfig()); + } + } + } + + + /** + * Fully enrich a message based on the strategy which was used to configure the bolt. + * Each enrichment is done in parallel and the results are joined together. Each enrichment + * will use a cache so computation is avoided if the result has been computed before. + * + * Errors in the enrichment result in an error message being sent on the "error" stream. + * The successful enrichments will be joined with the original message and the message will + * be sent along the "message" stream. + * + * @param input The input tuple to be processed. + */ + @Override + public void execute(Tuple input) { + JSONObject message = generateMessage(input); + try { + String sourceType = MessageUtils.getSensorType(message); + SensorEnrichmentConfig config = getConfigurations().getSensorEnrichmentConfig(sourceType); + if(config == null) { + LOG.debug("Unable to find SensorEnrichmentConfig for sourceType: {}", sourceType); + config = new SensorEnrichmentConfig(); + } + //This is an existing kludge for the stellar adapter to pass information along. + //We should figure out if this can be rearchitected a bit. This smells. + config.getConfiguration().putIfAbsent(STELLAR_CONTEXT_CONF, stellarContext); + String guid = getGUID(input, message); + + // enrich the message + ParallelEnricher.EnrichmentResult result = enricher.apply(message, strategy, config, perfLog); + JSONObject enriched = result.getResult(); + enriched = strategy.postProcess(enriched, config, enrichmentContext); + + //we can emit the message now + collector.emit("message", + input, + new Values(guid, enriched)); + //and handle each of the errors in turn. If any adapter errored out, we will have one message per. + for(Map.Entry t : result.getEnrichmentErrors()) { + LOG.error("[Metron] Unable to enrich message: {}", message, t); + MetronError error = new MetronError() + .withErrorType(strategy.getErrorType()) + .withMessage(t.getValue().getMessage()) + .withThrowable(t.getValue()) + .addRawMessage(t.getKey()); + ErrorUtils.handleError(collector, error); + } + } catch (Exception e) { + //If something terrible and unexpected happens then we want to send an error along, but this + //really shouldn't be happening. + LOG.error("[Metron] Unable to enrich message: {}", message, e); + MetronError error = new MetronError() + .withErrorType(strategy.getErrorType()) + .withMessage(e.getMessage()) + .withThrowable(e) + .addRawMessage(message); + ErrorUtils.handleError(collector, error); + } + finally { + collector.ack(input); + } + } + + /** + * The message field name. If this is set, then use this field to retrieve the message. + * @param messageFieldName + * @return + */ + public UnifiedEnrichmentBolt withMessageFieldName(String messageFieldName) { + this.messageFieldName = messageFieldName; + return this; + } + + /** + * Take the tuple and construct the message. + * @param tuple + * @return + */ + public JSONObject generateMessage(Tuple tuple) { + return (JSONObject) messageGetter.get(tuple); + } + + @Override + public final void prepare(Map map, TopologyContext topologyContext, + OutputCollector outputCollector) { + super.prepare(map, topologyContext, outputCollector); + collector = outputCollector; + if (this.maxCacheSize == null) { + throw new IllegalStateException("MAX_CACHE_SIZE_OBJECTS_NUM must be specified"); + } + if (this.maxTimeRetain == null) { + throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified"); + } + if (this.enrichmentsByType.isEmpty()) { + throw new IllegalStateException("Adapter must be specified"); + } + + for(Map.Entry> adapterKv : enrichmentsByType.entrySet()) { + boolean success = adapterKv.getValue().initializeAdapter(getConfigurations().getGlobalConfig()); + if (!success) { + LOG.error("[Metron] Could not initialize adapter: " + adapterKv.getKey()); + throw new IllegalStateException("Could not initialize adapter: " + adapterKv.getKey()); + } + } + WorkerPoolStrategies workerPoolStrategy = WorkerPoolStrategies.FIXED; + if(map.containsKey(THREADPOOL_TYPE_TOPOLOGY_CONF)) { + workerPoolStrategy = WorkerPoolStrategies.valueOf(map.get(THREADPOOL_TYPE_TOPOLOGY_CONF) + ""); + } + if(map.containsKey(THREADPOOL_NUM_THREADS_TOPOLOGY_CONF)) { + int numThreads = getNumThreads(map.get(THREADPOOL_NUM_THREADS_TOPOLOGY_CONF)); + ConcurrencyContext.get(strategy).initialize(numThreads, maxCacheSize, maxTimeRetain, workerPoolStrategy, LOG, captureCacheStats); + } + else { + throw new IllegalStateException("You must pass " + THREADPOOL_NUM_THREADS_TOPOLOGY_CONF + " via storm config."); + } + messageGetter = this.getterStrategy.get(messageFieldName); + enricher = new ParallelEnricher(enrichmentsByType, ConcurrencyContext.get(strategy), captureCacheStats); + perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName()); + GeoLiteDatabase.INSTANCE.update((String)getConfigurations().getGlobalConfig().get(GeoLiteDatabase.GEO_HDFS_FILE)); + initializeStellar(); + enrichmentContext = new EnrichmentContext(StellarFunctions.FUNCTION_RESOLVER(), stellarContext); + } + + + protected void initializeStellar() { + stellarContext = new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) + .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) + .build(); + StellarFunctions.initialize(stellarContext); + } + + /** + * Return the GUID from either the tuple or the message. + * + * @param tuple + * @param message + * @return + */ + public String getGUID(Tuple tuple, JSONObject message) { + String key = null, guid = null; + try { + key = tuple.getStringByField("key"); + guid = (String)message.get(Constants.GUID); + } + catch(Throwable t) { + //swallowing this just in case. + } + if(key != null) { + return key; + } + else if(guid != null) { + return guid; + } + else { + return UUID.randomUUID().toString(); + } + } + + /** + * Declare the output schema for all the streams of this topology. + * We declare two streams: error and message. + * + * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream + */ + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream("message", new Fields("key", "message")); + declarer.declareStream("error", new Fields("message")); + } +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java new file mode 100644 index 0000000000..ed5985ac88 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ConcurrencyContext.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.metron.enrichment.bolt.CacheKey; +import org.json.simple.JSONObject; +import org.slf4j.Logger; + +import java.util.EnumMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * This provides the parallel infrastructure, the thread pool and the cache. + * The threadpool is static and the cache is instance specific. + */ +public class ConcurrencyContext { + private static Executor executor; + private Cache cache; + + private static EnumMap strategyToInfrastructure + = new EnumMap(EnrichmentStrategies.class) {{ + for(EnrichmentStrategies e : EnrichmentStrategies.values()) { + put(e, new ConcurrencyContext()); + } + }}; + + public static ConcurrencyContext get(EnrichmentStrategies strategy) { + return strategyToInfrastructure.get(strategy); + } + + protected ConcurrencyContext() { } + + /* + * Initialize the thread pool and cache. The threadpool is static and the cache is per strategy. + * + * @param numThreads The number of threads in the threadpool. + * @param maxCacheSize The maximum size of the cache, beyond which and keys are evicted. + * @param maxTimeRetain The maximum time to retain an element in the cache (in minutes) + * @param poolStrategy The strategy for creating a threadpool + * @param log The logger to use + * @param logStats Should we record stats in the cache? + */ + public synchronized void initialize( int numThreads + , long maxCacheSize + , long maxTimeRetain + , WorkerPoolStrategies poolStrategy + , Logger log + , boolean logStats + ) { + if(executor == null) { + if (log != null) { + log.info("Creating new threadpool of size {}", numThreads); + } + executor = (poolStrategy == null? WorkerPoolStrategies.FIXED:poolStrategy).create(numThreads); + } + if(cache == null) { + if (log != null) { + log.info("Creating new cache with maximum size {}, and expiration after write of {} minutes", maxCacheSize, maxTimeRetain); + } + Caffeine builder = Caffeine.newBuilder().maximumSize(maxCacheSize) + .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) + .executor(executor) + ; + if(logStats) { + builder = builder.recordStats(); + } + cache = builder.build(); + } + } + + public static Executor getExecutor() { + return executor; + } + + public Cache getCache() { + return cache; + } +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java new file mode 100644 index 0000000000..da4d57404d --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentCallable.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import org.apache.metron.enrichment.bolt.CacheKey; +import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.json.simple.JSONObject; + +import java.util.concurrent.Callable; +import java.util.function.Function; + +/** + * Enrich based on a key and enrichment adapter. The CacheKey contains all necessary input information for an enrichment. + */ +public class EnrichmentCallable implements Callable, Function { + CacheKey key; + EnrichmentAdapter adapter; + + public EnrichmentCallable( CacheKey key + , EnrichmentAdapter adapter + ) + { + this.key = key; + this.adapter = adapter; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public JSONObject call() throws Exception { + //Log access for this key. + adapter.logAccess(key); + return adapter.enrich(key); + } + + /** + * Applies this function to the given argument. + * + * @param cacheKey the function argument + * @return the function result + */ + @Override + public JSONObject apply(CacheKey cacheKey) { + adapter.logAccess(key); + return adapter.enrich(cacheKey); + } +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java new file mode 100644 index 0000000000..d2a9fe7381 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentContext.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; + +/** + * The full context needed for an enrichment. This is an abstraction to pass information from the underlying + * environment (e.g. a storm bolt) to the set of storm independent enrichment infrastructure. + */ +public class EnrichmentContext { + private FunctionResolver functionResolver; + private Context stellarContext; + + public EnrichmentContext(FunctionResolver functionResolver, Context stellarContext) { + this.functionResolver = functionResolver; + this.stellarContext = stellarContext; + } + + public FunctionResolver getFunctionResolver() { + return functionResolver; + } + + public Context getStellarContext() { + return stellarContext; + } +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java new file mode 100644 index 0000000000..3683407279 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategies.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.enrichment.utils.EnrichmentUtils; +import org.apache.metron.enrichment.utils.ThreatIntelUtils; +import org.json.simple.JSONObject; + +/** + * The specific strategies to interact with the sensor enrichment config. + * The approach presented here, in contrast to the inheritance-based approach + * in the bolts, allows for an abstraction through composition whereby we + * localize all the interactions with the sensor enrichment config in a strategy + * rather than bind the abstraction to Storm, our distributed processing engine. + */ +public enum EnrichmentStrategies implements EnrichmentStrategy { + /** + * Interact with the enrichment portion of the enrichment config + */ + ENRICHMENT(new EnrichmentStrategy() { + @Override + public EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config) { + return config.getEnrichment(); + } + + @Override + public Constants.ErrorType getErrorType() { + return Constants.ErrorType.ENRICHMENT_ERROR; + } + + @Override + public String fieldToEnrichmentKey(String type, String field) { + return EnrichmentUtils.getEnrichmentKey(type, field); + } + }), + /** + * Interact with the threat intel portion of the enrichment config. + */ + THREAT_INTEL(new EnrichmentStrategy() { + @Override + public EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config) { + return config.getThreatIntel(); + } + + @Override + public Constants.ErrorType getErrorType() { + return Constants.ErrorType.THREAT_INTEL_ERROR; + } + + @Override + public String fieldToEnrichmentKey(String type, String field) { + return ThreatIntelUtils.getThreatIntelKey(type, field); + } + + @Override + public JSONObject postProcess(JSONObject message, SensorEnrichmentConfig config, EnrichmentContext context) { + return ThreatIntelUtils.triage(message, config, context.getFunctionResolver(), context.getStellarContext()); + } + }) + ; + + EnrichmentStrategy enrichmentStrategy; + EnrichmentStrategies(EnrichmentStrategy enrichmentStrategy) { + this.enrichmentStrategy = enrichmentStrategy; + } + + /** + * Get the underlying enrichment config. If this is provided, then we need not retrieve + * @return + */ + @Override + public EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config) { + return enrichmentStrategy.getUnderlyingConfig(config); + } + + public String fieldToEnrichmentKey(String type, String field) { + return enrichmentStrategy.fieldToEnrichmentKey(type, field); + } + + + public JSONObject postProcess(JSONObject message, SensorEnrichmentConfig config, EnrichmentContext context) { + return enrichmentStrategy.postProcess(message, config, context); + } + + @Override + public Constants.ErrorType getErrorType() { + return enrichmentStrategy.getErrorType(); + } + +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java new file mode 100644 index 0000000000..1d32749217 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/EnrichmentStrategy.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler; +import org.json.simple.JSONObject; +import org.slf4j.Logger; + +import java.util.Map; + +/** + * Enrichment strategy. This interface provides a mechanism to interface with the enrichment config and any + * post processing steps that are needed to be done after-the-fact. + * + * The reasoning behind this is that the key difference between enrichments and threat intel is that they pull + * their configurations from different parts of the SensorEnrichmentConfig object and as a post-join step, they differ + * slightly. + * + */ +public interface EnrichmentStrategy { + + /** + * Get the underlying configuration for this phase from the sensor enrichment config. + * @return + */ + EnrichmentConfig getUnderlyingConfig(SensorEnrichmentConfig config); + + /** + * Retrieves the error type, so that error messages can be constructed appropriately. + */ + Constants.ErrorType getErrorType(); + + /** + * Takes the enrichment type and the field and returns a unique key to prefix the output of the enrichment. For + * less adaptable enrichments than Stellar, this is important to allow for namespacing in the new fields created. + * @param type The enrichment type name + * @param field The input field + * @return + */ + String fieldToEnrichmentKey(String type, String field); + + + /** + * Post-process callback after messages are enriched and joined. By default, this is noop. + * @param message The input message. + * @param config The enrichment configuration + * @param context The enrichment context + * @return + */ + default JSONObject postProcess(JSONObject message, SensorEnrichmentConfig config, EnrichmentContext context) { + return message; + } +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java new file mode 100644 index 0000000000..2238c92382 --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler; +import org.apache.metron.common.performance.PerformanceLogger; +import org.apache.metron.common.utils.MessageUtils; +import org.apache.metron.enrichment.bolt.CacheKey; +import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.apache.metron.enrichment.utils.EnrichmentUtils; +import org.json.simple.JSONObject; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; + +/** + * This is an independent component which will accept a message and a set of enrichment adapters as well as a config which defines + * how those enrichments should be performed and fully enrich the message. The result will be the enriched message + * unified together and a list of errors which happened. + */ +public class ParallelEnricher { + + private Map> enrichmentsByType = new HashMap<>(); + private EnumMap cacheStats = new EnumMap<>(EnrichmentStrategies.class); + + /** + * The result of an enrichment. + */ + public static class EnrichmentResult { + private JSONObject result; + private List> enrichmentErrors; + + public EnrichmentResult(JSONObject result, List> enrichmentErrors) { + this.result = result; + this.enrichmentErrors = enrichmentErrors; + } + + /** + * The unified fully enriched result. + * @return + */ + public JSONObject getResult() { + return result; + } + + /** + * The errors that happened in the course of enriching. + * @return + */ + public List> getEnrichmentErrors() { + return enrichmentErrors; + } + } + + private ConcurrencyContext concurrencyContext; + + /** + * Construct a parallel enricher with a set of enrichment adapters associated with their enrichment types. + * @param enrichmentsByType + */ + public ParallelEnricher( Map> enrichmentsByType + , ConcurrencyContext concurrencyContext + , boolean logStats + ) + { + this.enrichmentsByType = enrichmentsByType; + this.concurrencyContext = concurrencyContext; + if(logStats) { + for(EnrichmentStrategies s : EnrichmentStrategies.values()) { + cacheStats.put(s, null); + } + } + } + + /** + * Fully enriches a message. Each enrichment is done in parallel via a threadpool. + * Each enrichment is fronted with a LRU cache. + * + * @param message the message to enrich + * @param strategy The enrichment strategy to use (e.g. enrichment or threat intel) + * @param config The sensor enrichment config + * @param perfLog The performance logger. We log the performance for this call, the split portion and the enrichment portion. + * @return the enrichment result + */ + public EnrichmentResult apply( JSONObject message + , EnrichmentStrategies strategy + , SensorEnrichmentConfig config + , PerformanceLogger perfLog + ) throws ExecutionException, InterruptedException { + if(message == null) { + return null; + } + if(perfLog != null) { + perfLog.mark("execute"); + if(perfLog.isDebugEnabled() && !cacheStats.isEmpty()) { + CacheStats before = cacheStats.get(strategy); + CacheStats after = concurrencyContext.getCache().stats(); + if(before != null && after != null) { + CacheStats delta = after.minus(before); + perfLog.log("cache", delta.toString()); + } + cacheStats.put(strategy, after); + } + } + String sensorType = MessageUtils.getSensorType(message); + message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis()); + // Split the message into individual tasks. + // + // A task will either correspond to an enrichment adapter or, + // in the case of Stellar, a stellar subgroup. The tasks will be grouped by enrichment type (the key of the + //tasks map). Each JSONObject will correspond to a unit of work. + Map> tasks = splitMessage( message + , strategy + , config + ); + message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis()); + message.put(getClass().getSimpleName().toLowerCase() + ".enrich.begin.ts", "" + System.currentTimeMillis()); + if(perfLog != null) { + perfLog.mark("enrich"); + } + List> taskList = new ArrayList<>(); + List> errors = Collections.synchronizedList(new ArrayList<>()); + for(Map.Entry> task : tasks.entrySet()) { + //task is the list of enrichment tasks for the task.getKey() adapter + EnrichmentAdapter adapter = enrichmentsByType.get(task.getKey()); + for(JSONObject m : task.getValue()) { + /* now for each unit of work (each of these only has one element in them) + * the key is the field name and the value is value associated with that field. + * + * In the case of stellar enrichment, the field name is the subgroup name or empty string. + * The value is the subset of the message needed for the enrichment. + * + * In the case of another enrichment (e.g. hbase), the field name is the field name being enriched. + * The value is the corresponding value. + */ + for(Object o : m.keySet()) { + String field = (String) o; + Object value = m.get(o); + CacheKey cacheKey = new CacheKey(field, value, config); + String prefix = adapter.getOutputPrefix(cacheKey); + Supplier supplier = () -> { + try { + JSONObject ret = concurrencyContext.getCache().get(cacheKey, new EnrichmentCallable(cacheKey, adapter)); + if(ret == null) { + ret = new JSONObject(); + } + //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields. + return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + } catch (Throwable e) { + JSONObject errorMessage = new JSONObject(); + errorMessage.putAll(m); + errorMessage.put(Constants.SENSOR_TYPE, sensorType ); + errors.add(new AbstractMap.SimpleEntry<>(errorMessage, new IllegalStateException(strategy + " error with " + task.getKey() + " failed: " + e.getMessage(), e))); + return new JSONObject(); + } + }; + //add the Future to the task list + taskList.add(CompletableFuture.supplyAsync( supplier, ConcurrencyContext.getExecutor())); + } + } + } + if(taskList.isEmpty()) { + return new EnrichmentResult(message, errors); + } + + EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors); + message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); + if(perfLog != null) { + String key = message.get(Constants.GUID) + ""; + perfLog.log("enrich", "key={}, elapsed time to enrich", key); + perfLog.log("execute", "key={}, elapsed time to run execute", key); + } + return ret; + } + + private static JSONObject join(JSONObject left, JSONObject right) { + JSONObject message = new JSONObject(); + message.putAll(left); + message.putAll(right); + List emptyKeys = new ArrayList<>(); + for(Object key : message.keySet()) { + Object value = message.get(key); + if(value == null || value.toString().length() == 0) { + emptyKeys.add(key); + } + } + for(Object o : emptyKeys) { + message.remove(o); + } + return message; + } + + + /** + * Wait until all the futures complete and join the resulting JSONObjects using the supplied binary operator + * and identity object. + * + * @param futures + * @param identity + * @param reduceOp + * @return + */ + public static CompletableFuture all( + List> futures + , JSONObject identity + , BinaryOperator reduceOp + ) { + CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]); + CompletableFuture future = CompletableFuture.allOf(cfs); + return future.thenApply(aVoid -> futures.stream().map(CompletableFuture::join).reduce(identity, reduceOp)); + } + + /** + * Take a message and a config and return a list of tasks indexed by adapter enrichment types. + * @param message + * @param enrichmentStrategy + * @param config + * @return + */ + public Map> splitMessage( JSONObject message + , EnrichmentStrategy enrichmentStrategy + , SensorEnrichmentConfig config + ) { + Map> streamMessageMap = new HashMap<>(); + Map enrichmentFieldMap = enrichmentStrategy.getUnderlyingConfig(config).getFieldMap(); + + Map fieldToHandler = enrichmentStrategy.getUnderlyingConfig(config).getEnrichmentConfigs(); + + Set enrichmentTypes = new HashSet<>(enrichmentFieldMap.keySet()); + + //the set of enrichments configured + enrichmentTypes.addAll(fieldToHandler.keySet()); + + //For each of these enrichment types, we're going to construct JSONObjects + //which represent the individual enrichment tasks. + for (String enrichmentType : enrichmentTypes) { + Object fields = enrichmentFieldMap.get(enrichmentType); + ConfigHandler retriever = fieldToHandler.get(enrichmentType); + + //How this is split depends on the ConfigHandler + List enrichmentObject = retriever.getType() + .splitByFields( message + , fields + , field -> enrichmentStrategy.fieldToEnrichmentKey(enrichmentType, field) + , retriever + ); + streamMessageMap.put(enrichmentType, enrichmentObject); + } + return streamMessageMap; + } + +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java new file mode 100644 index 0000000000..5f82b1c41d --- /dev/null +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/WorkerPoolStrategies.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; + +/** + * The strategy to use to construct the thread pool. + */ +public enum WorkerPoolStrategies { + /** + * Fixed thread pool + */ + FIXED(numThreads -> Executors.newFixedThreadPool(numThreads)), + /** + * Work stealing thread pool. + */ + WORK_STEALING(numThreads -> Executors.newWorkStealingPool(numThreads)) + ; + Function creator; + WorkerPoolStrategies(Function creator) { + this.creator = creator; + } + + public ExecutorService create(int numThreads) { + return creator.apply(numThreads); + } +} diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java index ab3d462d3b..63d39c5407 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java @@ -21,6 +21,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; @@ -28,6 +29,7 @@ import org.apache.metron.enrichment.lookup.handler.KeyWithContext; import org.apache.metron.hbase.TableProvider; import org.apache.metron.enrichment.converter.EnrichmentKey; +import org.json.simple.JSONObject; import sun.management.Sensor; import javax.annotation.Nullable; @@ -118,4 +120,18 @@ public static TableProvider getTableProvider(String connectorImpl, TableProvider } } + public static JSONObject adjustKeys(JSONObject enrichedMessage, JSONObject enrichedField, String field, String prefix) { + if ( !enrichedField.isEmpty()) { + for (Object enrichedKey : enrichedField.keySet()) { + if(!StringUtils.isEmpty(prefix)) { + enrichedMessage.put(field + "." + enrichedKey, enrichedField.get(enrichedKey)); + } + else { + enrichedMessage.put(enrichedKey, enrichedField.get(enrichedKey)); + } + } + } + return enrichedMessage; + } + } diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java index 7898ccd1b9..870d709021 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/ThreatIntelUtils.java @@ -18,15 +18,142 @@ package org.apache.metron.enrichment.utils; import com.google.common.base.Joiner; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.threatintel.RuleScore; +import org.apache.metron.common.configuration.enrichment.threatintel.ThreatScore; +import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig; +import org.apache.metron.common.utils.MessageUtils; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.threatintel.triage.ThreatTriageProcessor; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; public class ThreatIntelUtils { + public static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final String KEY_PREFIX = "threatintels"; + /** + * The message key under which the overall threat triage score is stored. + */ + public static final String THREAT_TRIAGE_SCORE_KEY = "threat.triage.score"; + + /** + * The prefix of the message keys that record the threat triage rules that fired. + */ + public static final String THREAT_TRIAGE_RULES_KEY = "threat.triage.rules"; + + /** + * The portion of the message key used to record the 'name' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_NAME = "name"; + + /** + * The portion of the message key used to record the 'comment' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_COMMENT = "comment"; + + /** + * The portion of the message key used to record the 'score' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_SCORE = "score"; + + /** + * The portion of the message key used to record the 'reason' field of a rule. + */ + public static final String THREAT_TRIAGE_RULE_REASON = "reason"; + public static String getThreatIntelKey(String threatIntelName, String field) { return Joiner.on(".").join(new String[]{KEY_PREFIX, threatIntelName, field}); } +public static JSONObject triage(JSONObject ret, SensorEnrichmentConfig config, FunctionResolver functionResolver, Context stellarContext) { + LOG.trace("Received joined messages: {}", ret); + boolean isAlert = ret.containsKey("is_alert"); + if(!isAlert) { + for (Object key : ret.keySet()) { + if (key.toString().startsWith("threatintels") && !key.toString().endsWith(".ts")) { + isAlert = true; + break; + } + } + } + else { + Object isAlertObj = ret.get("is_alert"); + isAlert = ConversionUtils.convert(isAlertObj, Boolean.class); + if(!isAlert) { + ret.remove("is_alert"); + } + } + if(isAlert) { + ret.put("is_alert" , "true"); + String sourceType = MessageUtils.getSensorType(ret); + ThreatTriageConfig triageConfig = null; + if(config != null) { + triageConfig = config.getThreatIntel().getTriageConfig(); + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found sensor enrichment config.", sourceType); + } + } + else { + LOG.debug("{}: Unable to find threat config.", sourceType ); + } + if(triageConfig != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: Found threat triage config: {}", sourceType, triageConfig); + } + + if(LOG.isDebugEnabled() && (triageConfig.getRiskLevelRules() == null || triageConfig.getRiskLevelRules().isEmpty())) { + LOG.debug("{}: Empty rules!", sourceType); + } + + // triage the threat + ThreatTriageProcessor threatTriageProcessor = new ThreatTriageProcessor(config, functionResolver, stellarContext); + ThreatScore score = threatTriageProcessor.apply(ret); + + if(LOG.isDebugEnabled()) { + String rules = Joiner.on('\n').join(triageConfig.getRiskLevelRules()); + LOG.debug("Marked {} as triage level {} with rules {}", sourceType, score.getScore(), + rules); + } + + // attach the triage threat score to the message + if(score.getRuleScores().size() > 0) { + appendThreatScore(score, ret); + } + } + else { + LOG.debug("{}: Unable to find threat triage config!", sourceType); + } + } + + return ret; + } + + /** + * Appends the threat score to the telemetry message. + * @param threatScore The threat triage score + * @param message The telemetry message being triaged. + */ + private static void appendThreatScore(ThreatScore threatScore, JSONObject message) { + // append the overall threat score + message.put(THREAT_TRIAGE_SCORE_KEY, threatScore.getScore()); + + // append each of the rules - each rule is 'flat' + Joiner joiner = Joiner.on("."); + int i = 0; + for(RuleScore score: threatScore.getRuleScores()) { + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_NAME), score.getRule().getName()); + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_COMMENT), score.getRule().getComment()); + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i, THREAT_TRIAGE_RULE_SCORE), score.getRule().getScore()); + message.put(joiner.join(THREAT_TRIAGE_RULES_KEY, i++, THREAT_TRIAGE_RULE_REASON), score.getReason()); + } + } } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index 828f4e3825..267ca6203d 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -17,13 +17,6 @@ */ package org.apache.metron.enrichment.integration; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULES_KEY; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_COMMENT; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_NAME; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_REASON; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_RULE_SCORE; -import static org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt.THREAT_TRIAGE_SCORE_KEY; - import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; @@ -54,6 +47,7 @@ import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator; import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions; +import org.apache.metron.enrichment.utils.ThreatIntelUtils; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; @@ -89,13 +83,15 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { public static final String DEFAULT_DMACODE= "test dmaCode"; public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE); - protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/remote.yaml"; protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2"; protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; private final List inputMessages = getInputMessages(sampleParsedPath); private static File geoHdfsFile; + protected String fluxPath() { + return "../metron-enrichment/src/main/flux/enrichment/remote.yaml"; + } private static List getInputMessages(String path){ try{ @@ -190,7 +186,7 @@ public void test() throws Exception { }}); FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() - .withTopologyLocation(new File(fluxPath)) + .withTopologyLocation(new File(fluxPath())) .withTopologyName("test") .withTemplateLocation(new File(templatePath)) .withTopologyProperties(topologyProperties) @@ -247,8 +243,8 @@ public static void validateAll(List> docs) { protected void validateErrors(List> errors) { for(Map error : errors) { - Assert.assertEquals("java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.MESSAGE.getName())); - Assert.assertEquals("com.google.common.util.concurrent.UncheckedExecutionException: java.lang.ArithmeticException: / by zero", error.get(Constants.ErrorFields.EXCEPTION.getName())); + Assert.assertTrue(error.get(Constants.ErrorFields.MESSAGE.getName()).toString(), error.get(Constants.ErrorFields.MESSAGE.getName()).toString().contains("/ by zero") ); + Assert.assertTrue(error.get(Constants.ErrorFields.EXCEPTION.getName()).toString().contains("/ by zero")); Assert.assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName())); Assert.assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName())); } @@ -399,17 +395,17 @@ private static void threatIntelValidation(Map indexedDoc) { Assert.assertEquals(indexedDoc.getOrDefault("is_alert",""), "true"); // validate threat triage score - Assert.assertTrue(indexedDoc.containsKey(THREAT_TRIAGE_SCORE_KEY)); - Double score = (Double) indexedDoc.get(THREAT_TRIAGE_SCORE_KEY); + Assert.assertTrue(indexedDoc.containsKey(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY)); + Double score = (Double) indexedDoc.get(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY); Assert.assertEquals(score, 10d, 1e-7); // validate threat triage rules Joiner joiner = Joiner.on("."); Stream.of( - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_NAME), - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_COMMENT), - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_REASON), - joiner.join(THREAT_TRIAGE_RULES_KEY, 0, THREAT_TRIAGE_RULE_SCORE)) + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_NAME), + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_COMMENT), + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_REASON), + joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_SCORE)) .forEach(key -> Assert.assertTrue(String.format("Missing expected key: '%s'", key), indexedDoc.containsKey(key))); } @@ -471,11 +467,11 @@ private static void hostEnrichmentValidation(Map indexedDoc) { enriched = true; } if (ips.contains(indexedDoc.get(DST_IP))) { - Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION + boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION ,HostEnrichments.IMPORTANT ,HostEnrichments.PRINTER_TYPE - ).apply(new EvaluationPayload(indexedDoc, DST_IP)) - ); + ).apply(new EvaluationPayload(indexedDoc, DST_IP)); + Assert.assertTrue(isEnriched); enriched = true; } } @@ -492,11 +488,11 @@ private static void hostEnrichmentValidation(Map indexedDoc) { enriched = true; } if (ips.contains(indexedDoc.get(DST_IP))) { - Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION + boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION ,HostEnrichments.IMPORTANT ,HostEnrichments.WEBSERVER_TYPE - ).apply(new EvaluationPayload(indexedDoc, DST_IP)) - ); + ).apply(new EvaluationPayload(indexedDoc, DST_IP)); + Assert.assertTrue(isEnriched); enriched = true; } } diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java new file mode 100644 index 0000000000..1f06733883 --- /dev/null +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.integration; + +public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest { + @Override + public String fluxPath() { + return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml"; + } +} diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java new file mode 100644 index 0000000000..c3a3109a0a --- /dev/null +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import com.google.common.collect.ImmutableMap; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.enrichment.adapters.stellar.StellarAdapter; +import org.apache.metron.enrichment.bolt.CacheKey; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.json.simple.JSONObject; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class ParallelEnricherTest { + /** + * { + "enrichment": { + "fieldMap": { + "stellar" : { + "config" : { + "numeric" : { + "map" : "{ 'blah' : 1}" + ,"one" : "MAP_GET('blah', map)" + ,"foo": "1 + 1" + } + ,"ALL_CAPS" : "TO_UPPER(source.type)" + } + } + } + ,"fieldToTypeMap": { } + }, + "threatIntel": { } +} + */ + @Multiline + public static String goodConfig; + + private static ParallelEnricher enricher; + private static Context stellarContext; + private static AtomicInteger numAccesses = new AtomicInteger(0); + @BeforeClass + public static void setup() { + ConcurrencyContext infrastructure = new ConcurrencyContext(); + infrastructure.initialize(5, 100, 10, null, null, false); + stellarContext = new Context.Builder() + .build(); + StellarFunctions.initialize(stellarContext); + StellarAdapter adapter = new StellarAdapter(){ + @Override + public void logAccess(CacheKey value) { + numAccesses.incrementAndGet(); + } + }.ofType("ENRICHMENT"); + adapter.initializeAdapter(new HashMap<>()); + enricher = new ParallelEnricher(ImmutableMap.of("stellar", adapter), infrastructure, false); + } + + @Test + public void testCacheHit() throws Exception { + numAccesses.set(0); + JSONObject message = new JSONObject() {{ + put(Constants.SENSOR_TYPE, "test"); + }}; + for(int i = 0;i < 10;++i) { + SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class); + config.getConfiguration().putIfAbsent("stellarContext", stellarContext); + ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); + } + //we only want 2 actual instances of the adapter.enrich being run due to the cache. + Assert.assertTrue(2 >= numAccesses.get()); + } + + @Test + public void testGoodConfig() throws Exception { + SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(goodConfig, SensorEnrichmentConfig.class); + config.getConfiguration().putIfAbsent("stellarContext", stellarContext); + JSONObject message = new JSONObject() {{ + put(Constants.SENSOR_TYPE, "test"); + }}; + ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); + JSONObject ret = result.getResult(); + Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size()); + Assert.assertEquals(1, ret.get("map.blah")); + Assert.assertEquals("test", ret.get("source.type")); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("foo")); + Assert.assertEquals("TEST", ret.get("ALL_CAPS")); + Assert.assertEquals(0, result.getEnrichmentErrors().size()); + } + + /** + * { + "enrichment": { + "fieldMap": { + "stellar" : { + "config" : { + "numeric" : [ + "map := { 'blah' : 1}" + ,"one := MAP_GET('blah', map)" + ,"foo := 1 + 1" + ] + ,"ALL_CAPS" : "TO_UPPER(source.type)" + ,"errors" : [ + "error := 1/0" + ] + } + } + } + ,"fieldToTypeMap": { } + }, + "threatIntel": { } +} + */ + @Multiline + public static String badConfig; + + @Test + public void testBadConfig() throws Exception { + SensorEnrichmentConfig config = JSONUtils.INSTANCE.load(badConfig, SensorEnrichmentConfig.class); + config.getConfiguration().putIfAbsent("stellarContext", stellarContext); + JSONObject message = new JSONObject() {{ + put(Constants.SENSOR_TYPE, "test"); + }}; + ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); + JSONObject ret = result.getResult(); + Assert.assertEquals(ret + " is not what I expected", 8, ret.size()); + Assert.assertEquals(1, ret.get("map.blah")); + Assert.assertEquals("test", ret.get("source.type")); + Assert.assertEquals(1, ret.get("one")); + Assert.assertEquals(2, ret.get("foo")); + Assert.assertEquals("TEST", ret.get("ALL_CAPS")); + Assert.assertEquals(1, result.getEnrichmentErrors().size()); + } +} diff --git a/metron-platform/metron-enrichment/unified_enrichment_arch.svg b/metron-platform/metron-enrichment/unified_enrichment_arch.svg new file mode 100644 index 0000000000..e42d394fe1 --- /dev/null +++ b/metron-platform/metron-enrichment/unified_enrichment_arch.svg @@ -0,0 +1,14 @@ + + +EnrichmentThreat Intelligence
Indexing Queue in Kafka
[Not supported by viewer]
Enrichment Error Output
Enrichment Error Output
Threat Intel Error Output
[Not supported by viewer]
Enrichment Queue in KafkaOutputError Queue in Kafka
diff --git a/metron-platform/metron-enrichment/unified_enrichment_arch_diagram.xml b/metron-platform/metron-enrichment/unified_enrichment_arch_diagram.xml new file mode 100644 index 0000000000..269a0d2745 --- /dev/null +++ b/metron-platform/metron-enrichment/unified_enrichment_arch_diagram.xml @@ -0,0 +1,14 @@ + + +7b3Z0qvIkiX8NHn5tzEKuGSeQWKGO+Z5nnn6hm9nnqqsk1XVbXXqtP1l2pa5t4QQBOER7ss9llb8BtPtwU/hUKh9kja/QUBy/AYzv0EQDoH338+B89cBBEd/HcinMvl1CPyXA2Z5pb8fBH4/upZJOv/pxKXvm6Uc/nww7rsujZc/HQunqd//fFrWN3++6xDm6d8dMOOw+fujbpksxR+P9fqX40Ja5sUfdwZfxK9PojCu86lfu9/v9xsEZz9/fn3chn9c6/cHnYsw6fd/dQhmf4Ppqe+XX6/ag06bp2v/6LZf3+P+nU//1u4p7Zb/ky/8bpYtbNb0jxb/tGs5/+iLYmmb+xX4G0z9rbXA/SYJ5yJNfn8TNmXe3a/j+77pdB/Y0mkp7w4lf/+gLZPkueJzjeG5cnvkz9D5X7/MBf3697nsM2SA//XCntfPeIKe63f9Ehe/3+une3/u/NOmZerrv1np7kAqK5uG7pt++mk/jKZEnMDP8b5bfh9qP9f89c2/OjMpp3tUlf3T8LlfnytTv3fU/Vjp8e92Nvg3E94zI+3bdJnO+5Tfv0Cg/+v3/v59WoAI8uv9/i+D7I8pUPzr8QX8fjD8fVznf7v2v9j2fvG7ef/a1K+/MzXbTWVctE/r/63R/7BS2f7MlL+ZUwmjtHn3c/l770T9svTtfULzfED9bfT/0atd36V/MRqWfngGzTz8mrxZeTz2pH7uRv5xFPjjyP06CZfwN5j89Rbihi7/DaJLh9KNHZD5vCfvP5ppF6yd3684/P6LOmjSv/+lr1f3G0Rd98ugARr24xgImYIJjRqRAbfpmiWyDDXwEm1neJ8I+9e8SDErxmQODJIkeexK26ReD7WmfXSWiWJQzt/GJe84+TpdqQtDEmdWiOoUS6NdOMTzYqgR8T27fOOZ9UbTfn3kr6OzyqMy4C093cXpCSyAp/t+dfaaZALEiKR0Fh1zieNaYGZ93y6Dwx6jPhZ+/qNw/P43bp2Gvm7Dch1rle9dqNmr1HahFy5aPmKKqTpOQCYHV656gC2aPhW0g+9vHyeuvgXulIbCdTJlndK1+txmv0c2Fwkf1E7lWL/fhukmLJeFZyF7f/J5R/PTKbeFGs9wqLdLtbLs4QbdE7U6TMkl6acuHBSdshUvhfrTMON1DdcuC1qngfe4pCb7KiQays3HdIxiHUEpBwHcaqwPHHErQuR9Erk9T+vJJ3ikpiCbOBY6DRZSFToRpZ1mzGmc7pTnkulJEYd4qwYkqubD6v2tBLC1dzudxnZfyDHbzpk+2kYZI5/okq8PHoOStMHnMYAxz11Oz9sjUuD5NOkdXzrur3FVgJtL8cog0LnfuTglTI8JanjWKYtDNDZUYBw8S0m4h+EHgkyIninMfuySld59pnl6wkrrWHG2i7nMz4TM+4sndGmWVXxngC22EeDTh+iu3lemtNf1AYmnOVoBXdq6qsL9GlogQV7uOUjZHgOMoNhqQI+93PsAtvCiG1zpB8gVqCde1X3Mo6V7VFBowwYto1Xku4TWDb1Y/CC54UwHOktWcn21pYL26j3PuZSBu+3daQbm9rc9KMlkhbHMaly35ACsJ2OKFka9KgK/Nlxh3gRTKyhLWIXgNkmVhLa79/fX6K3ljh1DGQTpipm/j3AApJvxjKDQ5+6f5deQJbN3iWN371Bq4M8YBrg6rKonR58kxAJvOP0ZE3yd3Gc09uv1/swbaQA8TxSAJAjHkjWFo0V4goufj1l2BW8DysY844VCaABj5ftVsAWEXpPnjLi6V0Tt1DogVHArnxU+kvCDIxTje71P7AbFn+gI3KtNyIdnMqXRBEMDjrB0ksqCdym+TSLnLLsg3Jd1Epqu4Whb3MW8UlKahHZvgckdRWymMV0J/+lPoaWHZ4qIWr6HXiNFmPDxDxOLF9J4uiVTCTg7IP/EmDJUAntMgKS/H4DbDjgfnzk20f7tyblXH6q820Uod6zbVSDaXB80weB29syhLsD4umnMVUrJynjOf0usrqrGpYG5XgIC1rf8WxiQ8BlQW3AC+P6WZ0uFa5wtpV7rI0KRadcEkHheLmfWt7xH5dszcHg6k/5pisQc0HJXVXFENRblazVXfJ5Ouu+eRR3WjQCS0LCmGSGuwJVDewr64tS36IPvtd3H+zTANnF1mj9948Qm4Joju7qR77SFpHloEdLPhOL2GoTfHitWhnhOJEvJsggYDpzWsptDpmwOCLkMsm5nt6enzM8bdYwcjkS9MAZQTyBmczAd8UXE4RVuJ3mv29d+D9K2Ofx4TZXope66td8eTleAushSpuRTY2IjoAva3X+G4jPdKeyA91+uVWEJhjxCDNhV/7RuB0J01Vwt4K9Ph4DACaXfUquPfcDeLToQg+BSldZgrkS6r0dkJsXqgu0LIqm/2lkJQjboda7dxWcKdI+tztttnFE3x1zQZGObX4VLx4lYAKiwPc6g0FSpt5OIuEzw6cgPGJSHWWf+roqYEwf0ceBRxq/3/Tj5cQbv5rlucAxcSxj2SfXsO6CJMXLfgoTr1ek5Hlu7uP/pydGJku1zBweroGZ00d8yxD5+KyGM4KcveKqgWj4Mi/72muDbYRHpjSV5dCJIKjAe+CmtTJoXdRuibJ/GLrdeyeU2mDVb2vg209gG3HaOF3e9u8elshdM8Z04ChCP2xxfMQt75URLuJ3woWz/FYTEExToVCo6dsxtWwl8oZfbT0MrTaQAUpKqUO6GraFfTesBxCfh+tnqYtuBisdfePqV2eg99bks4525C+NK7jfUiiAxm1VUTPTAe1ObUNU11aCvzwDACuOuceKV9Mk7pW6dYH2j2rsLWpdhXiehbEcgFldUuTm5Hn1HzD+hpDu3smqPgwxJHctWhZsS5f1+hx9D74PhMBjbhq9D3xisTfmnOXqJrvWDIkbqczEwjnze8oV8xrCwE+meChRX0fvA1hr3pl3qGX9Ihby6fJCC+KqrQvDR28u6TgBh7+lN3DN1gF5iwIe6XTUYUTmCThtAz8y0jNa8TweY+liQOsasK1I8SOHaHsAEz+/opNTuNIImZTwBoipEBaTHdc8PRX+9ZSSczbBTnOlis4rK/JiZTvc85tdsUQUCacxSadgP/Jjn/JOg9nsrsnkZp9cF84VHf/Kn9y7+QTAc/qonxkVbikBNBFGp+vmQP5UdG5yMZdnG9evhGtGGd0lKR2ZegGz0feMpyVL3wDxx6gYyn5ruFNXeMB3q7+g7gVPDy3R8fGqzC2tDu0SuuYOwfsViv2Sh0155LDXnBXY9jYA7ChRAOq6zNjZghFy1S1noCM7A+34G9Zk1gfDMxpDhVDYmr6vVvOltq9l6eyjL5g4V8bwHSjhejIiWkAe1lcrNfJ64sI0xAwbIYLR9Zr9kuznvmMJVD/T2rPid39flyB25yvTKwa6VwDZO5c8QCBt8+rzDy4F/9/2WLuDrJa3FsU9rb/FaQTIKjiKIZtR2FxOVZ/6MuVZ4r3nXcB/QgswA3XgDRiLquKdkF+xa8DgT1cIqCRZz0Qgi8UUEEOKTcgnamp3dkYh7/r/hiRymV18JYviGXjtAapjEnkaQNUjtkgMer2dGogCusdBj5svJ7aYwk6lvfP4UXSZDAX3Ugwc5C5T6KVghEm0IG9fzMvOxSdsSBhLAeMEiiNlZubchvZqcsvkoFi0GWLWJxDQccjJyeWT0yib1ENWY5PRKJq9iRhkzRBOovb8EzKc6nCbDBoQNyBJVmR6b626SnRUF4iVwpn3yZFkEPJeENZ5bYyIOPHDqB19N1415dh6UWkv8SHo0uMpZgGxcYhlJLn64F3iju9LPVKflRfO6osaUIJd7BkjM7sr7xm1Jw3WnwylJ+Lo8J+OUZxigVMu2HNWTobI0uhc5UI1G0HrNOMS7evuekLQAzx0p9WhVN32RsGgcePiyFaoo0Nmae7HZ1JAZE3o8TLjHKJxbfbTj6PzdPHHQLDTcTtQCpvTnrQPHLyMCs6Nr8rb3OBc0VGfoN1mwBGnx9sflt5T5RKRcFOdzEo7hzTj6eCGtt+y04NrLhiorDN0xm+pfLSDemEBNCTeMqhsuL1vyMdEIdLuSz9oozSqX1OITNQilZovmJVDsOsVbh+BvTMrZMbUe9MKczilGpu76aC8qKP+K6+PERiwiQdoOwio5snXu0U9eEOHkCNk4PobDhOXQPDsSS5dbZfd2NsO+eiDu80m70pWzwnF6Fbs2AMpY+QkflsOEL4bWLkM9ZRStWc/TWknCeYlEiK8lxEA4e3OLA5DCxBKS+CBlqQ5Z8gxTOpTQ0RltJISCIfHDVxaCDPLMEyrhXlen3E46z7s5YdptLB9giiWsGWBowDN1a/aAOt7ZRSG3EzD2febu8tQoGrQ4p+IDYzi1PRhRKC4cDhPWarvBTOh7aQXKgZbqQHMQJlavbP1E8brVYAouWqkxYXBWJww2wRx3x8T5cAIwpx1bCjGyc/S6STi9fkzKXQdTMSDNmVZ0WV/Y/WCU27JrbEqinlGd0IJRM9qm6EjD20KAvn2Cv1TbdBbOr1clYscZWO0YHepiGsqR92vxQeOFwELgPjH1g6YtlMnFbnB3sk8euIwVSLicKBmNvSG7JsOMrJbXSWb9ginzaCYlXeEydWYtDKO75OJgvPexWQG6TmTpqwEeyOon8JODKpXjgvUQEvXVXk+YgUASi9VeVt5i8dzf8JZLb4u+bGO7LnviacGTI340GItyHwSrA4qpVFueHKGUssfhGEIGR0udJ7bL4YRoDny8PMGYxDprw+QYDg+6gMXFIqwjx/kHgj2+OAd2nH2pFqEdkRq9IIfhHUK4W+quWKme6yiHrhqr/lU8mDd6UoiZzM8SfXCm4IBKTIAf4wdYQwdf3o7elbsPgiZQ/kCNLR/2wG7R1tWKqQ1mLlTnBxrbDLD7IvDxP5+BmyW3afREu4JQKJ9wMpj+Enos3mJyGvUP2Iq64n3YA8JoQ52RjDu5sKkJZL6gEJs0W09BC6jrTo3uradDKh1awFRpq/zW3ZpXnQ3va+M4oOnxaQCz7AFxp7vstteZwDw+UaNa6MlgVbV2IGLKIHOEhabeizD3VeRBXrrK2JS9cSrlntVOXoDYZ2BeZu9WO9HdZbbuTsc4FBZqwdA0hNd56Hi+996YEWXv5EfmfHsM8Vf+8ZyFHm3qgcpqnIMtaEtTOALZHUYWZDPGp6AACXurTeexFieIKkEt7VdEmAsRbDfIZ9H7qwJAUN44j/Lj3XS1zPkIsirLJ7vX6vWUxSvomjw2cKJYb5yXo2TF0LjN6Lp4jVIGG9hD5RKh/wxJCKT3GkpOaXaFBI0qnwrFqKxud4I/KQrUteQzyOQi0H8yQDo0r0sL07M337D2IJ6P4wT7K0hH+HOcg/yMTJvtR/qD+v0Ax4R3WEeHnG/rgFyj/yQqPSjUqwfOC5EFBshQYnzlPoAQxPkkhkAlg2VPXgrfK24KBk9eTGBYhwXhneZzR5B3+FinLNoEbcu+KKnzB1bbXESJV9BsSVboQjfk03SKbgfluOrteSJAGGHlqQkAXhhFT3mtKZk2bqfh/niQyN17ci9/2LBmzlKVKJFJC/zGYcqFAGGyK3bFACL4mvwtfU9OZKetorLeaZssI2Uz7zmy7eTBhxv7q7FiSN+71RKvnJEuzYmbhImhYu2kzklYGYgtrTGAmBPETwYLn/qzt3BrXsVhmsEUsCk2ZjxamlrRNPi+Rqg2udZxiJYbA0pZr08wbFefHy9Bw4JDC1idJ5FO1puVs1cpmEoHenm8pr7hfMPFqWrEOQLzOxSEtrw88OXOjvHtdty9MUzrGlTD0qhWdwQyqxOtXpynbCkI9oFMu8xL86lWVBGz1m2HnlP8k6VX84Q0MVdXUjW/jMXAk9FYEJm36TcK3infAYe+u39Kr9SoZFhL2qG47my8+HOsNMvypELI8vEUGk3ynb1sWcdoC/bNzzCfHyre7aSRJA1vGEMFWgNAQvGUEp9p1mca94XBSCB3OjbjbbwfVR1JSPb+4Hbd5hPYwZuG9gdFIe3w3APlIivasz85zIlkHgSxj6MZDbGOWjdlwfs+GvfgIXaf07QEnRk/le4PiVVU5UxwxLyGET8d5AMT7vcz+YxHgxZ4iPUuihAFcO5kKnsqg+9BAeUceJcs02O+O3H90UY//W2Ng7oxb8YmJ1Ra/CEcjEw26zFOt1lYsWwL56omWfYJ5CS96u9AGoVxsGYqRn0T+FV1vBhk6Xhl5/ZOIx6XVY0uxG99tXqdFG9JmTmW/KHeqX2tGa6uMLatkVEgo9qz2f1A4kRma2Qmy/wxBJWPzrDRyJBil+iFnUVHPQF5dOJaqBBDUyxDTr3XdGfZxknYtpHKVmLy+SkWr04cytkWDoq9fOSDZPgDyNgxvHbQuu0bvt4ld0/dYzSf2iD1TOrM6Ai9bMP9cVlNbH8Q2bwkpes9vWwc/zQXtxrodMCalW79QVVCXr796OPsaOpdKt1RcoZlmuOZmQ4e23kBvH+KWcxTSiRnXXQHEmnktJ9DGtGEOp7aD0bH1FuNADSbhC4/nlIrJq7ZOXkwLygWoS7Oiw24hbR2sus4SSrt9sm0nRv0PbjMdFxU1Z7wd+qlQzuRVuZMV47B7qbvcIbIWOGP0H0S2XEqduRObHNQAK+PpnNLPEfN81WM2DUTYqwHBoi+iRfkRfM7q1Hqot1IKp2Kc/ejYPRj1rTRRoyeJtTYT51O8Z7hfh4K7j1FRCtAtaCfuf6EycdaCzN0w7BceLsV46Ate9WemD7kccvBEqhC1HaQvgTpBpitkDSe59p+OgoI+J7WanuL+5dov5rOJk4Uy0HjU7wucSE+HY9ClWyv+DT0BbAvC0bR6WlcblG61VNb8162kPvRm7doP+PIhWfS5A6do4PKEGzXEHCnFDvP0u0Btw9+dHrHSCkjUWSzv94rzgNcB2OSmxOsEj91gENPde4dUfSWqeMbowm+VCkUId7hr5G/NgHt5tWHkEtPaoUgXWray6esgefXqCD1BRqDUnNTRK2jQstkATevV9NfLwGOutpDCclhkahWM82qLyTeqhc8YmKOoMqNDA3sU+B2PxzKgFAwN59Pz9JKPDKTnVaFHl0xVfMwZa9TjzWU4B8+hpplx+l3tCzBDRS1eSUtwbHdlOUTsNzjBcfax/bNOeslxE4eix2MPsMEhgPq5xQcXq9Ap+v6SS16hTaxlULSpnIIRsUoG+5jKwNXdYlItBKuLZTSG3tNGcaDMyfa6TLc8WOvUoTKpwNsZCg4Po93IvNOIZ+wH0GMJpbLvE9Gn7eOR1gYcurQjuckwk5ON4mV6L/SB/OrhWNqqQ9jVPa5vcB+h2iTDhcreHwOF66ofODX66BRfFtXmdBA/CmjwZFDTxYgAU1P5GQKvV7iioKyw2NBHzFjGrvaU0c44nS5WGoqB3ZoKQpD8hHqnws3MRrxBvWBG0mLuqYFOw5sVrBG7JXoh9oAjiF8Bs5pvQjn47oINV/epBrMdtXx7SpdlkkcpoLfI8A/Fdn9XT0V3pGm0coLPJhkJpx8f4j3Nbu0dsEgB7479Ep87HHezp0WSv718SA+MLt+TqL+HLf+Et5OdBzy51Vbsw3BlPE7UL4zn2VOvASruWMe9AfzpKlNafK4rNkY4dFTK1fzd1dsr+dlYej0bRVust1udscxJoUfCIOLD35BjETH0Da9dsAQN1/+0EICUp9+p+uOK1507e0vxxH49obUAqq/nniX1mCysuhu4K0gzmosnbVoA/OH7YeMh+FtYQMepbB3yKCqNHlzKZItJKiEr1dmNtx5FIq85/RNkpq07Ke/v33a4QP0Z3JRlbFnv6YZmb2pDL579XFBmgTo4TEI/DNMZt+pEIJVB/eBukkeHbnb1b3qIMjdXfg9NAZZmm3P86JfCxnXswzHCT/LDS++Gs+h5zMslDe0Md0VIsiE8Pw+JXNO4pbxUwdNTJ4P3Ce84JKQn1IlQwJ68nsb54O/Dcp6OQltOeMPOY/8NCWeqwd7y/O1Pl/SLw1HZ/TpZ7t5MRaS00+gLKobsewfsi2iMY4zfOW2O6H9OYu0PKF2dj2tVisZBdw1tDtgOt04g+zHBfueVJ5EbcKRlXmWqOonY0cUZqSqjHnzzlQn4rG8PMGoIiTFj9WVMhoFcKP6FOKe3y2sDCzNrHAtIPtI9cu1kIh4l533pHYq6AaKVjB98RlO5ek4bCZy+KlIhRXq0177mCYTBAxU3m9Sa0+8iCSnNY681fQTfSDP6xkiQggQ/p1X4yLHcLzyTIrxHQlwdyw+F3s5YIolRzqdyTpN0MsDTdu7Ieh0HaPALBVuHbjC8hTLqDhsvMJ/5+tHprkKNsB71j+VwCejKRkBLquQo6umVN4zMxOPDUL0bR7guIyxlXOIPNv2E/XxFUYySHOBOjzvyW89U4myTdx2lqdspyKPtZl3cGmQzRKnw3TtzOdCOZt82PgZrVrUBMto4h+/BqfXkzXNJUCZP+t4l/Z6sEEjdswCcnFZU6pugis5DNf4wsBur9diU9zVmNzGSrSCGA0ZoI8BQx6c5h3gC17LLUTGLQpOOgfoDcmZ/PMkZrmFltXhg8XnFbBDumH+5OYwMtcyStHJuJTniNTs3Ixs8raNIIua9XVup/aGDmU11cUWGYRa9ynBsVcj1lk1Hqx0yCguRGR2ZwQgCkWIlLxvF1TU5VtvDVkwxDuAMlPZvDDA2qAdEleHHCrxzkROyO6R1H9zqo/33GgOT3xaE+bJwinZjK0NfDO++h44wUc6Yhc2kMfeVf8zAiqS0AYtKCqoC3oS12GS/LzXSGS2Ss5Gsr/BMp13cVpUMjzGtQ8nXqrC29xBuJ/KjsR4A40EcWWCYWj0d3zvJPAIMgjMCQDOJqxZChT1KAtxqYruW4jFn1koJOuz9K53U05wR68PbysoEvuqSiFH0llfJLkXgdYMaMG0lN6mnIBfuhgBebp5nU8E4+Hc7wQ9rmCIv8d9b5Qoe/HYGX1oXLRYNHtNdyoHL9vDbjhoIMhkNeibT/y5k2njtsQTpPqfBQvxdg9DeCrS4+7kjHFI2SoCRNNuNzJaHP3yK5b3PJFZpi0JpCdJvC5TevwXpXiQzr6Vyq815iTMBrNtzTb07tNc2u3/UCplCou/A86JZdYDgn2tB7a5vQEKokqVb/Cq8ixd5m/ZqT7c8aRucValtkRDLY5zRCXTiVmcBod1lxB4Y8WnC582tDAbaNW10z3sVmcYiVVrUwcNeNCIMrPTqrClnSkrHaz5kEUqVe+xZvPJErO7L9xYPpRCqYE5WZs7GL9jOfEICBnTZ/26Au6ku7jbfyTmm+RFUSOpHnlWVkpVA3xlfCDXWYlzg0VnwhQq3jEcC2buprKA5HBSD3pvK8nwtMi5XN8kEN3xWZXsZzrfMHESL6Ob4Qe/ThEeuBAET/WwumF1uKCKmDBd9D645p+haRB9OD9wfU/E6lfUwCaRYuDbBbzpo088gCPpd5ej/LP6psxltWBPYavKAoYv4JSXqWc5ZTfyeKl8GkrJQfx1HepBA4tKR7rpMB46zlv+Vj5TWS71W6zvifnbHxyReRifiHVk+bOyXJxONPaYYPGNCnvPvYwqldunS1bC7fRo6lLQh4w+UaqIXJOAfXJpf858bnpCFuVeWFfubHK4nt9GXFmJh8CxgY5z6H7deBefmmtHgZdribcPwdtLWQYYx5KlQZ28x+zLU7pcAU6JyEYW5/hhfjz5nbNEoOLMNDAjvA5lE9nN7+sO4Y/3XTthKopIl7NpVa1fD3a160nqw/XWpTTwhhvCRrSiOs/YF/FhK6FX92QtN26IlqB5GYiixXmnPnPXcUyxq0IbAnpoIKrAlzMzblnzKZUpXee9OFVGh837gPTMZ29rG544BuFAaD43f91AvBAyJSbS+qUGOpyOhs1sIxachgrvGeEkMpasM6G/BLXJZpesEPqM2NFYiccQl7KJ+EF/rGqjg6z1gXqidP6NBTrB7atnw0jgOlPOQhuukCPSOtftlOn54TORpu3ohozSvig+rKw/MdjA1z+Gk4bBf2KkQSjwd4w0FP17RtoL/a8T0rAv9/A/4R6m4bz8Y8wMQvif7Yxhf3AR/8Q9/Evy4X/d1PjfmdoqpvRJiQDxNlpzGyq9o/aXhvilIX5piF8a4peG+KUhfmmIXxril4b4pSF+aYhfGuKXhvilIX5piF8a4peG+KUhfmmIXxril4b4pSF+aYhfGuKXhvilIX5piF8a4peG+KUhfmmIXxril4b4pSF+aYhfGuKXhvj/NxoiBIP/z3iIxN+R08QuSY/yIXcBnzW9j94n3/gckMPsWVX9H0VS4+2HpIb9IqlRS88i9f3Cwep/oail/h6B8E+CMmzF7BznkpqrM983Ki2lGqCqHLdQdmkL1t3XUvBYmDDu+HHrXJwhWbxIVVTsJDEXgfZuiDWZfVmdT4n71btE0t+zBarLkY+ckn/gEH0qrthFJRjgo4C9Nz9P4DpTH/DjwCj9fqq5H/1M+UPG7ZCcLXUTui0lfvvFVEswdHNVcnoK8lwuumTh0//uvx9SZU3xDQvepumPQxLLg6H1M8CLj+G8kOtpJh31nhHrlmQYe23mxpONcBvh4u0iSJVEQIkURbSzTxNN0ag8+S8CF+etQllOsypS5GrqtNdutwqJ3muRK4pn4ahYhskLB8CKLiS41HgrRno/qZy0tVoCNU5AE4yaF2QaPI1rElFi36Z8NFjIOx8zQO4Hd1lSyC1dN4xKIkmAkUzVRpL3shDA9bKEDVpkAQsCL+mlEcEnYp68vLwg3GAUDvEpl6JIUJH79e2/7VfSARO6HqjVbW1Gd/7gTYAmmAYrH91P2QTgGjz5fGS9KRDm8MuGvhM68x17+jzEQluxVTo9xDvqWbgkJkPNP906JID5K9cA0XhHnGjx6lUvn0n5ANdVo26kJ7L3IwSY616lfRQkDRrhB+SBgGB+RRNADsDpFXIwduSD2AkGd2DH+Oou/8U8gIIlx3monwpKx24lLeb6KZsP6tb7p+WP6wYPWRI2I2921Wcl/uHf8K/PGDSY7KewegfzJ9C6a16dqghHeelTJ3KDx1638MM/yyI8uStQdIMcrORXqx7X7YfYDlbmC/e8O+aErBVXJDQ3kwWJQla0H6a7MQyUozPwEVexlwYpL9NWNt9QJmw2QejqqcaytZJjufC367I6lm5rrqV6+up9UiQPlh/jOxPnugmyttXjaIdhy+xtO2j8pPwC0TNrasJklpuIZqfNhT+VO7s/+xSjnulgEaf9TF2aOt0UgvUhVLXNUxqALPdPpbO9EnGVKCyWKUzO6y0co7RlvG6zvOQr5qdN8ux6kj10MyUYi6VyotyzM/SI518HlNeHFMcVbzIBGljlkz2cuBp8arI/H7CKNLFntXwkoeTVgoIh7IlD2dWbUZM03UWM1DR6Khj3XNyZ7JyGBpz2x9PQAM74CdD3iylHOfSpDsb+LCNEJMFhUhAf2+IZxzPiP0wcjsb0XaxQfKkdjrcLwuMdIG8oCkvAaoStfk2v0A74hpDN3a54r1stCUvFj6fWnBmw9jZAr9B8UgAlvlP4U0GjJ5vaiGfB+/GiSkTuLFa6HRYoZUPRI5o9pZiZECe/UNdC+9Do3im7d43mWs4hoJ5FTZctmeesnytTc1DV6TF0fxUYa0yoWb09ljx68zN4JP9u7wcxcr0jBUUZ8yddYqwFvhNysfbd7RBppmchs68/hkUbu9aYVZFsZROzfV6T+ofy0qhRnXiihzOolZ+a+uqSqRwaAvs8CBG9essJCldLTHGsui5TRLdUYcnNxGWXf81QZav3DIyXh21HzF4/HhZqcXo8Cy7Tp+oWmR2/CabKP35yejBiaOvtNTMBb3Y+q21Jy6tP1hzrP7PP2LQPUrkB0MNmybBqDVnpnR46oQMg2STQ7brldZ4pg32n8hZ0jzcyN4QK7fHtIfRwy/r8jWbiCwiQvB3L9rK6lfZekwjC5VIsL0E4nbRlrBl/4oZv7MiOP40DDVyQC65MELeZNvQQJcWtQtdPHNOZbZl/SudXTOsMMnClrEpoKDmf6vYNDaizson/PERAy3hlNTp/p1geC74tvay8LHwZckVPUhRWngGmQiiNDt0roJCv5w1R5wksvC28fpLyPG6Qt8G93n2yYjCcjUSJ0+ULXnhvu8ZGGXaffGEv83zv/lLIkO2W9EONPNYjiooqecoQxEWYHwIv7/hBncrPbwIe/JzThoBpAIOg9ri87wTKkeMcTrUAIDSG1KyChCWGYekbmpqKqYBwZY3ywdPnPW2MtyQfrFG+f6FaC44nfDaF/NVRA/cUovGSn32agjovg2myASDoqH4yN7hLz4C6/NqEh0t3RFT00kV4f345RH/3hs/AudNqs8sh+cLrxji//TA3uVeTVgeehtr6Kj6eki+NkH2e53NJw4CUUUMAFopwMEcEVw7STyBzPWX2JLINERxJ0/GkgWL6KuqGYuY7AzxwTZYzOkVYMAwo9jP1ggEDmA7NA1bkAy2JuZG/p3Nmi6bvhhgTe9YbVTVDdog5LHyldTMliPf1Ad8vVnBoQn0WmzjeQjydxVYa+jxpJd9L86ahQI6UdK4CRp34a2DKFUN/cA3eFpnuXoYuEZjkCXl7HRDFKIoUiXKmmW2dHAZtbcuNveneP9orp9JRZVvU7lD6nguUp17Y4yUPSvISqpFf7O2VaVCFP7pFzDw2VCe1XbxT+sY8E6leI6JB4yQrMnnCvFuOpdea1ylAVWwGcwYmyLKQfrx9HppgwbMYor10vJR1w8Nuh2RdxWPzqK0BTjQATb0sKi8rP2GUYJWn5SVdZ4EMu/YDUKhngbI5rugQBtsp9ZdjPzUEOaRdjQ8u7clP6mYz4CUD/amPUdZQ3xMMurzWNSxfLGy7atIlRiTKKy5GpsfaK6JCalTYAqaifCiFRN3XmDxVhRKLPh//A9bjAj9xbbFNKCUEGPatBdrMt+fIqQfEudmrz6h5bi6295h9k7E/Qkl+CK96qc63XHPEK5fs27pvCG60dWk+gT3B+kQcwDVj5phLzZ4PTE/vPwDUILN7DiKLVTt+KnVynrbc+3FtbYAeke2bV4bPGJh1VY83TLUbH1ZW7wkTCE5kXZfzLkuyViOk6+mw8neDZNjdkECODRvYYPDsMH6S9kSZrRtj8OflaGWFejLR9CY9F5ULpulGu2MbNy6KBKC6OSmGuthIBHZu7K5u1/s2029GV5A9OtxdilbGfI1Laxwi3y10r26fCa18KamyzJPozsl1r2Ny6EJlz2V8H1nDo37KJS1X4RvrzdvyxEwVjXr3YgqxTOuKC0LcK3qhid+eDHCKBoa9mUiSTNRdj88NgKWQBT9rBxFIBCbkhGBIWWtbShwrylWYejQE6HnwdqCRDQY1W1MSJOm5BMo0CbdTkawZmWy8S7Salkf53AvASlNCXj24aJnPNL4a0J6YGHopR3vG5FUJRQVi2cF58QLMBw88EvvUhyM5c8XliH4QHHtoP3XSh/+SZFCs3oHDEWuzX9iPqizWumE2vlKf1KDssWuyK6wrOSz7fBwvhH9WDl80xlf+eRxmvPd90U/TszphfnAmZeEnTDx+DTa6/CmYuwxnSJbShYUOKBp/KfCB0uQ4+CZVD06shR5nWUWxF54KOug+KSdGu6/Io1vfrtZgEKuNqRcVZiZ/jTwewbR+Rz414QVI7wIQfeaBVLtqoV64aq7oEJkmjle10wzVnScY5dWAFJUfLHucACIMpRHVPsfejjpBlKvCPYHM3sbSrSsGlgQauAWX7BcUBo0in/UIIIszALene/BAZstwywyuCtyggpBVB10qT0RPm3NtUfzoexdAUpdjb70wk+ZiT1QB7eGtG+87G7H6rKqHc3cy5fYFIaacyn1p0Ai2kNjmuB4+p4fb3QY9JYwzTF5YMKHybqs+KI5OcKIujy6t8hkC4kLYnpzzjcJEBxYXgaiO6cm3qfYpAd2eV1GXRC+EtwRwfG1SSwxqoYuxFpwRzo8vUw06zW2kfVJb3XS4U6dfhd2+u+Mlb+8FvROVw2xLlctM9Sl560l/I9CO6VHOg6G5xz9+zSs5qFwlCNzQXxKET9whYNDBByRvWQRWKjq7z089wE5AXlgfeP2cycBHmcsG5WjZi0RDc4Lgbtl6PQFVnUUvj0WQXxpWVh4MA/io7eOLmGcP1LoYadSl8cU/ljjJcsF91ISCUZyXkoiZMFdQ3DeRCFwc6K0cZXOn5JAViuWJ5zIXT9xgLNoTjk3xTFmSc2AZGQ0VUR7HOWjnOv5QOW1rrJj+Hhb00ZFgQH0qanH24UnlCCkLG9TIXOuk6MvYcdBStksc9kJ51mFCPbPotH1+CUVF5Gem+HzxMCD+sFIQ+PSN+rc3gAaCuZqYlvMaD2Ei7q+bSSyFBR/+85DGYQCdGs/5qj48p4RPabTYeBf38kOlgjAOrxlu49tdJ9ZTUWaWYh0KFLGfgqFpe8IdkJAzEQPlKLLx6hU1p+CSbwXi/aTk0WLBgIiP19v2Px/JQ+4QnKBVx/W+2E0vQCOJRIaQogVpLCUexpUUVRFLXBbehFhct4MxZVZPGM9HmZuauMH6JU5s3BK8+RTQ7ZLsqTubkzdw2opUEOQL65HBepWIu5+cIzswVGtiqodoL+FTG4F1GKRe7Z4EM+zD6tM0ipig7XuOxDr2g4d7/8mjngzZII8wUPWjgQNdTtuA1a2SLhhSn5YUNBURCOeIKuW3fVlMRJzrlH2ElLOrvSwTXslEXnm/CzMsavK06X41IgHAITqnTSADoxrOwNNsQrtOmJGG5cBT0WCS3u9IBl/B0xBeFlFXVF0KIfGPllM6STkiy2ws2w9RNTjcBmBOoWARAFtC9VQ7IG7Y16FDHakwyEoG1HghoFdit2LG4uTCuo+NBbpSAsyphnaStPPdarj8PhUIbFhQrxCgHmDiE0PN+Kyh0CDKIRoBRBzuTj3IUDqlVGGNlw6RoE5xCVdEW2+LKYvqeIY66L1cKcTCJ2h7OEz3BRiU4AekQUXfULnk8ib4jIhyTzX19TDqUgvOgf7F5zLdQsyTFLC1Lm8rZHkPCYV4/EtT1UHlGv6g9zsrJr1RTLzl71SskMPMmXEMHnRe9cUFEE9K9sKezJR6+Xyjzlpd9c2EJZFCC/gQ3AP5meh7LiKfghSoKHUyRp2fUkh3nfXaXjt9mGbVvDW9V0KAmJF6go6xV0vTbRqPMBjxVTfCpwoBEjX7G0IXZlkQHyz9idVi3XRnXoAYGQlPAf3pb66sX53ygVBzyWy79SaQPEekhCfHSPrWyrHVFhNvUAWCamtTut1K4+GSkL4eZ8ZckS2UjKlxrUPfmX5X3/16Y4DEzOt0BrfjKSNI3sJGVW9NKxG+0zdcpQ6tv/FgXu1qSiZ0id5vPMXn+oOckxWPOOT4aCczVP6zPoMUn23JmeIGVcIhbRVB85PHNB1pnKQuT9sP3sResMibmMcYbNmHSsE9gHjxpPV2qa5NtTlMQyWoazw1CiHcNbFImubsJZJrKt7kvg7u3DxtQ52Xuuf4swjKfban8Wi3J5M13gmxK9THMXa8LJg7C9quQaTmB0jWwHruNSJwNTLFXF0aTFPITsSu/AGKcTPJgqffr/XJA3TK8C8qws/oeIGYEEx8U/kvG0DOVOF4LeanMS3TkOdi5PnRJMU/ObjzKZaHh4OH2qY4tlvfFqNpQ/Nse4IKH+eVOb7EjCO6Z0xS7Dvv26ZFnSxuE2Vbt+zyXrGxOyWQ7TuKNjE/hHQEPFiEtwSzXpy6PtrYYeWkxrLK2PWhl3uzYfvcfMM1W3HPsBVR8pkQ21gmG3SYfgtuXNmCs8XhKmyf2rwjzHWj9Qfd3QBZhInONyZQ5RuPRbTgnsSZF1IQ5HH9bD+PlU+CbQ+p9/w6gs0IdSxoM1fJ13K8fy8kvCckEJ5HpwTdikDf68pXT4b6JVth1Kwr5LpTE1HxZiYUzQVRdEAK1rjBq8uiIEMj/rpqbAYJUecpVLLI3p6lVarEvd7H4uU08wC8h6ZuDtPKDCgxb886wzX+rEVycKJ1Sse4S97FsR4SgA5pUZc/5cHtAYo6wOH4qAj0fAUv37D7xK4/w+5F5cVLq8uzsaCI7AMslxh/d5TQUSKn72Zug9N6Z42VBX9eGDVE8dakNAKkIwPG/sKZBWge9DJLkBxkJCmi0IphqZkqFuYjQtBvPz9zZd80Gt3/Exa4QwAJZ3WrSvsgI/oNLtVlKYF78CnYKNe91DZaW0/1h9yRdN3PzEtw5gNu2E4j28dNK4oJsM6igPjBNUgUyNDmIW17I0piU6im8xUFaFd7Hl4KLe2ocKMOTvuQcPATn8Os3D9Q+hLeQUj4sEhm4LocUMtgRfFC8N3fZru1g+MD4ioXh+3xpgnXv50lhRV6g6q/igckAulhXmDO/k6SpTziD4xjPZwCPX2oJalLz/gfMV8UohUUgI1RqOtMnzh4CijaZzY31g4vRXj2cDQ9XkZ6ego/b84iodcpGNulrJpBkeiNCEIZ+0wzr+/UAqg0RhWxO5dCV00fWGRMcXTpYpVjWppaAnHe1CIFkv9ZEiye95QOlRXYQmiVx1x2hLIuZjUY8mMl7rESSDHMdqBzY5tXoTG2EQxc+UPJw7PzCRR7XrMYozQt4SOnG/Nry10RGGswsOlCH6sQkWJ+xAi0/Qo+9T1RmVcatcO22Z7MDoiTvGIPKMdVB9xPBiWWAl3Taw+7yeJiiN4iTIf466k+9LnPrQ/FzecIsJB5t+OlxUeafHDeRP3ue0EQ7uBjHxWlxlUYVQseDEAEdnU8FPI7stf3h1Bo5u0jJbsPB4HdPdvOL3NSxdkmX9v7sMhcpLgDKp8ytgkDAERYY/Ejx0EBOKW3uxFtNOZjgktdkOdpU8TicPdUD7kyZ4eG0OUbvadBqO01ligjLu8ntEHFaauPsU8tPK4PCMGWyXYNJFaBadEKR+o1QrR4MoqmthiT54W7iRnlzOefQRWljdjambv78urI29UwBop2lqKqt6lm3KUNP6fUoWE+2tpjssimxX1PfQHz5yd+dFi/TofLO1fnbVxl58uho8vSj2A0HDXGElGO2Yrf2p9g/IDNjsvyrDITnT9NcH2bW1P4HB41CsHKiCJEOw9Zi3lnqIK47i/AlDrknpZ+Thvi/vyeRCdSJtrdOZVypM9IwpPNiCdbWXsGC0ptQtR5gDnHh6s9eDuNPsvcrLeJSc4GzuOkQ2fk8mkJhw+5aIrGAWSoDJse6j0MeZH8gn3JtemyVBgYTrxkMkyxxck5rUy7ajnnNMOVsSCyJYZOHTuxaRBux4udl2w0bEFsxSwdxBY2P3bBs/g+5c1L7vZu9pCOAcxCdGdkpCxI7yyW5yootOJ3IZMlsgSTvc5hqlQdMQXyOmQ+fj95iEQb/zoa6NQ4MK+kI7neVNY1K9lj6mBKtf8rUxboTdMPRjmIrOBub5+gRsuTNcRMno1ZPxxFhaOAoW9g9AHejPsyXl70kac+dlVyI7D3jLzvc8nBy904n6x3vc40+MA3x2oFlw0p5m4trmOo/lBK4pOvJzxaZk2U2OYCoFXhxPHDfUJ5ge1BzOsmceJSdosBT7b5iVhoEnuIDl3lwbzlo0cp36E+qRtKRhV1CNE95TOmTI+sWB++VYICWVcPlOPwshfqRttBCfBcB4LbcXzZFHwtvRqeHW52seuj0Y22nii0dSL0Rg2z7bJSHornB49Qfud/EOnjEI90CVobbp6oimsVuO7zmg4mu3nFd7i50y9+IrVsKq8CVqzOXAr0ji85CFAMj3kbvvNMe6nK+VE+x3H1qokke2tHTNF+Un/cO2OZ8Z9fxNZ3iNJmEX8phbcm+PJJcgyTBBgeiDGCAW6tQZrkuoD1t8cHPyQQ5af1QoBqFQ02XEYctvG45oA8aE9hndVb5yDzhI4YHqO/+PYOoakdtnceY+1QIjzzS4X3pnFi94m3g+g0PJC/jMbsq0j0M8wTMckyMA9ktGxj7UnwlQ/VsHMKQcac72a8v7Af+uneqq+rZvv+hcpk2sB6GeF6pHH6dmhOdzVS8/NTGtZmSCNyXCW80hV6ElgieC11I767T7QRupGvxJt8UJwX3sksvbPKk9TSvmY/1Z5nLTYTXgApngPsPGviJCUZNspOtZTfDubhPPwV7WEvyiU1hzB+juxTOPxjqBAoivyZCoH8lVQP+voLMgT+XydD/KH281Vl+ieoMkH46/+lKhMI/odbQgLsbYDbjQD6ugzrd4/IrzjTV5zpK870FWf6ijN9xZm+4kxfcaavONNXnOkrzvQVZ/qKM33Fmb7iTF9xpt++4kxfcaavONNXnOkrzvQVZ/qKM33Fmb7iTF9xpq8401ec6SvO9BVn+oozfcWZvuJMX3GmrzjTV5zpnyrO9N/EUsT+zFxDwH+eXhMIfymK/wlFce7X58r/CEMT6B+UxN9NDSPQX1AU/97U2D+CoYj8JxtHflmKX5bil6X4ZSl+WYpfluJvX5bil6X4ZSl+WYpfluKXpfhlKX5Zil+W4pel+GUpflmKX5bil6X4ZSl+WYpfluKXpfhlKX5Zil+W4pel+GUp/vZlKX5Zil+W4pel+GUpflmKX5bil6X4P5ClCEH4n2mKAPFXWor/XURF9EtU/OcRFf+VTubfmIrgX8op/jdxFV//sZridyfR706i351EvzuJfncS/e4k+t1J9LuT6Hcn0e9Oot+dRL87iX53Ev3uJPrdSfS7k+h3J9HvTqLfnUS/O4l+dxL97iT63Un0u5PodyfR706i351EvzuJfncS/e4k+t1J9H/0TqL/CD0u6E/UB/Qv5Lj+u3YMxf6O9/CVYvpKMX2lmL5STF8ppq8U01eK6SvF9JVi+koxfaWYvlJMXymmrxTTb18ppq8U01eK6SvF9JVi+koxfaWYvlJMXymmrxTTV4rpK8X0lWL6SjF9pZi+UkxfKaavFNNXiukrxfSVYvpKMX2lmP75Ukz/ADIaTAB/YqP9JR3tv0t0ifiKLv0noktpOC//GEMj6L8xNIb9peLSX0gugf8AySUI+HvJpd93g/yqLX3Vlr5qS1+1pa/a0ldt6au29FVb+qotfdWWvmpLX7Wlr9rSV23pq7b0VVv6qi191Za+aktftaWv2tJXbemrtvRVW/qqLX3Vlr5qS1+1pa/a0ldt6au29FVb+o95DyAM/on38IJe/zS9pT9a+yW4/Ke7ik39Ev5+6P+DgX+Q6f/NlnIo/Pem/6stxoh/BN8F+pr+/970MPoPsjyB/hvLQ/9nlv+/Z7Xdb6e+X/7VZ/zT+2qfpM8Z/xs=