Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
METRON-1460: Create a complementary non-split-join enrichment topology
Browse files Browse the repository at this point in the history
…closes #940
  • Loading branch information
cestella committed Mar 7, 2018
1 parent 486be49 commit 1d95b83
Show file tree
Hide file tree
Showing 28 changed files with 1,983 additions and 150 deletions.
4 changes: 2 additions & 2 deletions dependencies_with_url.csv
Expand Up @@ -341,7 +341,7 @@ org.eclipse.persistence:org.eclipse.persistence.asm:jar:2.6.4:compile,EPL 1.0,ht
org.eclipse.persistence:org.eclipse.persistence.core:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
org.eclipse.persistence:org.eclipse.persistence.jpa.jpql:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink

com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE
com.google.code.gson:gson:jar:2.2:compile
org.codehaus.plexus:plexus-classworlds:jar:2.4:compile
org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile
Expand All @@ -356,4 +356,4 @@ com.google.code.gson:gson:jar:2.2:compile
org.sonatype.aether:aether-util:jar:1.12:compile
org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
Expand Up @@ -258,6 +258,7 @@ This package installs the Metron Enrichment files
%{metron_home}/config/zookeeper/enrichments/yaf.json
%{metron_home}/config/zookeeper/enrichments/asa.json
%{metron_home}/flux/enrichment/remote.yaml
%{metron_home}/flux/enrichment/remote-unified.yaml
%attr(0644,root,root) %{metron_home}/lib/metron-enrichment-%{full_version}-uber.jar

# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Expand Up @@ -19,6 +19,10 @@

import java.util.*;

/**
* This is the core logic of how to configure enrichments. The default type of enrichment configuration is a simple list
* however more complex enrichment adapters require more complex configuration (e.g. stellar).
*/
public class ConfigHandler {
private Object config;
private Configs type = Configs.LIST;
Expand Down
Expand Up @@ -25,8 +25,8 @@ public class BytesFromPosition implements MessageGetStrategy {

public BytesFromPosition() {};

public BytesFromPosition(int position) {
this.position = position;
public BytesFromPosition(Integer position) {
this.position = position == null?0:position;
}

@Override
Expand Down
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.message;

import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;

/**
* This retrieves the JSONObject from the field name by reference.
* This is in contrast to JSONFromField, which clones the JSON object and passes by value.
*/
public class JSONFromFieldByReference implements MessageGetStrategy {
private String messageFieldName;
public JSONFromFieldByReference(String messageFieldName) {
this.messageFieldName = messageFieldName;
}

@Override
public JSONObject get(Tuple tuple) {
return (JSONObject) tuple.getValueByField(messageFieldName);
}
}
Expand Up @@ -35,8 +35,8 @@ protected JSONParser initialValue() {

public JSONFromPosition() {};

public JSONFromPosition(int position) {
this.position = position;
public JSONFromPosition(Integer position) {
this.position = position == null?0:position;
}

@Override
Expand Down
Expand Up @@ -41,6 +41,7 @@ public enum MessageGetters {
BYTES_FROM_POSITION((String arg) -> new BytesFromPosition(ConversionUtils.convert(arg, Integer.class))),
JSON_FROM_POSITION((String arg) -> new JSONFromPosition(ConversionUtils.convert(arg, Integer.class))),
JSON_FROM_FIELD((String arg) -> new JSONFromField(arg)),
JSON_FROM_FIELD_BY_REFERENCE((String arg) -> new JSONFromFieldByReference(arg)),
OBJECT_FROM_FIELD((String arg) -> new ObjectFromField(arg)),
DEFAULT_BYTES_FROM_POSITION(new BytesFromPosition()),
DEFAULT_JSON_FROM_POSITION(new JSONFromPosition()),
Expand Down
45 changes: 45 additions & 0 deletions metron-platform/metron-enrichment/README.md
Expand Up @@ -33,6 +33,49 @@ data format (e.g. a JSON Map structure with `original_message` and

![Architecture](enrichment_arch.png)

### Unified Enrichment Topology

There is an experimental unified enrichment topology which is shipped.
Currently the architecture, as described above, has a split/join in
order to perform enrichments in parallel. This poses some issues in
terms of ease of tuning and reasoning about performance.

In order to deal with these issues, there is an alternative enrichment topology which
uses data parallelism as opposed to the split/join task parallelism.
This architecture uses a worker pool to fully enrich any message within
a worker. This results in
* Fewer bolts in the topology
* Each bolt fully operates on a message.
* Fewer network hops

![Unified Architecture](unified_enrichment_arch.svg)

This architecture is fully backwards compatible; the only difference is
how the enrichment will operate on each message (in one bolt where the
split/join is done in a threadpool as opposed
to split across multiple bolts).

#### Using It

In order to use this, you will need to
* Edit `$METRON_HOME/bin/start_enrichment_topology.sh` and adjust it to use `remote-unified.yaml` instead of `remote.yaml`
* Restart the enrichment topology.

#### Configuring It

There are two parameters which you might want to tune in this topology.
Both of them are topology configuration adjustable in the flux file
`$METRON_HOME/config/flux/enrichment/remote-unified.yaml`:
* `metron.threadpool.size` : The size of the threadpool. This can take a number or a multiple of the number of cores (e.g. `5C` to 5 times the number of cores). The default is `2C`.
* `metron.threadpool.type` : The type of threadpool. (note: descriptions taken from [here](https://zeroturnaround.com/rebellabs/fixedthreadpool-cachedthreadpool-or-forkjoinpool-picking-correct-java-executors-for-background-tasks/)).
* `FIXED` is a fixed threadpool of size `n`. `n` threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks. This is the default.
* `WORK_STEALING` is a work stealing threadpool. This will create and shut down threads dynamically to accommodate the required parallelism level. It also tries to reduce the contention on the task queue, so can be really good in heavily loaded environments. Also good when your tasks create more tasks for the executor, like recursive tasks.

In order to configure the parallelism for the enrichment bolt and threat
intel bolt, the configurations will be taken from the respective join bolt
parallelism. When proper ambari support for this is added, we will add
its own property.

## Enrichment Configuration

The configuration for the `enrichment` topology, the topology primarily
Expand Down Expand Up @@ -371,3 +414,5 @@ Now we need to start the topologies and send some data:
* Ensure that the documents have new fields `foo`, `bar` and `ALL_CAPS` with values as described above.

Note that we could have used any Stellar statements here, including calling out to HBase via `ENRICHMENT_GET` and `ENRICHMENT_EXISTS` or even calling a machine learning model via [Model as a Service](../../metron-analytics/metron-maas-service).


6 changes: 6 additions & 0 deletions metron-platform/metron-enrichment/pom.xml
Expand Up @@ -67,6 +67,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.6.2</version>
</dependency>

<dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-profiler-client</artifactId>
Expand Down

0 comments on commit 1d95b83

Please sign in to comment.