From 93cc3bc80a01ec2b82a88d2812c17de09d122c6c Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 9 Apr 2018 17:45:27 -0400 Subject: [PATCH 01/10] Updating stellar transformations to use caching. --- .../transformation/StellarTransformation.java | 3 +- .../metron/parsers/bolt/ParserBolt.java | 15 +- .../common/CachingStellarProcessor.java | 129 ++++++++++++++++++ .../apache/metron/stellar/dsl/Context.java | 1 + 4 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java index 2a22e2168b..bb7501df62 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java @@ -18,6 +18,7 @@ package org.apache.metron.common.field.transformation; +import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.MapVariableResolver; import org.apache.metron.stellar.dsl.StellarFunctions; @@ -40,7 +41,7 @@ public Map map( Map input Set outputs = new HashSet<>(outputField); MapVariableResolver resolver = new MapVariableResolver(ret, intermediateVariables, input); resolver.add(sensorConfig); - StellarProcessor processor = new StellarProcessor(); + StellarProcessor processor = new CachingStellarProcessor(); for(Map.Entry kv : fieldMappingConfig.entrySet()) { String oField = kv.getKey(); Object transformObj = kv.getValue(); diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index e996f14597..ee67036055 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; + +import com.github.benmanes.caffeine.cache.Cache; import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; @@ -45,6 +47,7 @@ import org.apache.metron.parsers.filters.Filters; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; +import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.storm.task.OutputCollector; @@ -67,6 +70,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { private WriterHandler writer; private Context stellarContext; private transient MessageGetStrategy messageGetStrategy; + private transient Cache cache; public ParserBolt( String zookeeperUrl , String sensorType , MessageParser parser @@ -94,6 +98,9 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll super.prepare(stormConf, context, collector); messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get(); this.collector = collector; + if(getSensorParserConfig() != null) { + cache = CachingStellarProcessor.createCache(getSensorParserConfig().getParserConfig()); + } initializeStellar(); if(getSensorParserConfig() != null && filter == null) { getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext); @@ -119,11 +126,15 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll } protected void initializeStellar() { - this.stellarContext = new Context.Builder() + Context.Builder builder = new Context.Builder() .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) - .build(); + ; + if(cache != null) { + builder = builder.with(Context.Capabilities.CACHE, () -> cache); + } + this.stellarContext = builder.build(); StellarFunctions.initialize(stellarContext); } diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java new file mode 100644 index 0000000000..50dc8bef9c --- /dev/null +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.stellar.common; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.VariableResolver; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * The Caching Stellar Processor is a stellar processor that optionally fronts stellar with an expression-by-expression + * LFU cache. + */ +public class CachingStellarProcessor extends StellarProcessor { + private static ThreadLocal> > variableCache = ThreadLocal.withInitial(() -> new HashMap<>()); + public static String MAX_CACHE_SIZE_PARAM = "stellar.cache.maxSize"; + public static long MAX_CACHE_SIZE_DEFAULT = 10000; + public static String MAX_TIME_RETAIN_PARAM = "stellar.cache.maxTimeRetain"; + public static int MAX_TIME_RETAIN_DEFAULT = 10; + + public static class Key { + private String expression; + private Map input; + + public Key(String expression, Map input) { + this.expression = expression; + this.input = input; + } + + public String getExpression() { + return expression; + } + + public Map getInput() { + return input; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Key key = (Key) o; + + if (getExpression() != null ? !getExpression().equals(key.getExpression()) : key.getExpression() != null) + return false; + return getInput() != null ? getInput().equals(key.getInput()) : key.getInput() == null; + + } + + @Override + public int hashCode() { + int result = getExpression() != null ? getExpression().hashCode() : 0; + result = 31 * result + (getInput() != null ? getInput().hashCode() : 0); + return result; + } + } + + + /** + * Parses and evaluates the given Stellar expression, {@code expression}. Results will be taken from a cache if possible. + * + * @param expression The Stellar expression to parse and evaluate. + * @param variableResolver The {@link VariableResolver} to determine values of variables used in the Stellar expression, {@code expression}. + * @param functionResolver The {@link FunctionResolver} to determine values of functions used in the Stellar expression, {@code expression}. + * @param context The context used during validation. + * @return The value of the evaluated Stellar expression, {@code expression}. + */ + @Override + public Object parse(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { + Optional cacheOpt = context.getCapability(Context.Capabilities.CACHE); + if(cacheOpt.isPresent()) { + Cache cache = (Cache) cacheOpt.get(); + Key k = toKey(expression, variableResolver); + return cache.get(k, x -> super.parse(x.expression, variableResolver, functionResolver, context)); + } + else { + return super.parse(expression, variableResolver, functionResolver, context); + } + } + + private Key toKey(String expression, VariableResolver resolver) { + Set variablesUsed = variableCache.get().computeIfAbsent(expression, this::variablesUsed); + Map input = new HashMap<>(); + for(String v : variablesUsed) { + input.computeIfAbsent(v, resolver::resolve); + } + return new Key(expression, input); + } + + /** + * Create a cache given a config. Note that if the cache size is <= 0, then no cache will be returned. + * @param config + * @return A cache. + */ + public static Cache createCache(Map config) { + long maxSize = config == null?MAX_CACHE_SIZE_DEFAULT: (long) config.getOrDefault(MAX_CACHE_SIZE_PARAM, MAX_CACHE_SIZE_DEFAULT); + int maxTimeRetain = config == null?MAX_TIME_RETAIN_DEFAULT: (int) config.getOrDefault(MAX_TIME_RETAIN_PARAM, MAX_TIME_RETAIN_DEFAULT); + if(maxSize <= 0) { + return null; + } + return Caffeine.newBuilder() + .maximumSize(maxSize) + .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) + .build(); + } +} diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java index 9568a05e3f..2cdf4ff809 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java @@ -36,6 +36,7 @@ public enum Capabilities { , STELLAR_CONFIG , CONSOLE , SHELL_VARIABLES + , CACHE } public enum ActivityType { From c38d5a2a91f9eebf428b40db3c3165152f27a786 Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 10 Apr 2018 09:46:19 -0400 Subject: [PATCH 02/10] Updating test. --- .../StellarTransformationTest.java | 30 +++++++++++++++++++ .../common/CachingStellarProcessor.java | 16 ++++++++-- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java index 0a3cbb0224..fc9184469a 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java @@ -18,19 +18,49 @@ package org.apache.metron.common.field.transformation; +import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.adrianwalker.multilinestring.Multiline; import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.configuration.FieldTransformer; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.stellar.common.CachingStellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +@RunWith(Parameterized.class) public class StellarTransformationTest { + Context context; + public StellarTransformationTest(Cache cache) { + if(cache == null) { + context = Context.EMPTY_CONTEXT(); + } + else { + context = new Context.Builder().with(Context.Capabilities.CACHE, () -> cache).build(); + } + } + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList( + new Object[][] { + { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 10)) } + , { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 1)) } + , { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 0)) } + , { null } + } + ); + } + /** { "fieldTransformations" : [ diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java index 50dc8bef9c..32efdc7e20 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java @@ -19,6 +19,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.VariableResolver; import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; @@ -90,7 +91,7 @@ public int hashCode() { */ @Override public Object parse(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { - Optional cacheOpt = context.getCapability(Context.Capabilities.CACHE); + Optional cacheOpt = context.getCapability(Context.Capabilities.CACHE, false); if(cacheOpt.isPresent()) { Cache cache = (Cache) cacheOpt.get(); Key k = toKey(expression, variableResolver); @@ -116,8 +117,8 @@ private Key toKey(String expression, VariableResolver resolver) { * @return A cache. */ public static Cache createCache(Map config) { - long maxSize = config == null?MAX_CACHE_SIZE_DEFAULT: (long) config.getOrDefault(MAX_CACHE_SIZE_PARAM, MAX_CACHE_SIZE_DEFAULT); - int maxTimeRetain = config == null?MAX_TIME_RETAIN_DEFAULT: (int) config.getOrDefault(MAX_TIME_RETAIN_PARAM, MAX_TIME_RETAIN_DEFAULT); + Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, MAX_CACHE_SIZE_DEFAULT, Long.class); + Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, MAX_TIME_RETAIN_DEFAULT, Integer.class); if(maxSize <= 0) { return null; } @@ -126,4 +127,13 @@ public static Cache createCache(Map config) { .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) .build(); } + + private static T getParam(Map config, String key, T defaultVal, Class clazz) { + Object o = config.get(key); + if(o == null) { + return defaultVal; + } + T ret = ConversionUtils.convert(o, clazz); + return ret == null?defaultVal:ret; + } } From 1502e739a833dbe5de76e8a36b19727f846ebbd7 Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 10 Apr 2018 15:30:41 -0400 Subject: [PATCH 03/10] Updating readme. --- .../common/configuration/SensorParserConfig.java | 15 +++++++++++++++ metron-platform/metron-parsers/README.md | 13 +++++++++++++ .../apache/metron/parsers/bolt/ParserBolt.java | 2 +- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java index 2d0ccd8027..d347481015 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java @@ -45,10 +45,25 @@ public class SensorParserConfig implements Serializable { private Integer parserNumTasks = 1; private Integer errorWriterParallelism = 1; private Integer errorWriterNumTasks = 1; + private Map cacheConfig = new HashMap<>(); private Map spoutConfig = new HashMap<>(); private String securityProtocol = null; private Map stormConfig = new HashMap<>(); + /** + * Cache config for stellar field transformations. + * * stellar.cache.maxSize - The maximum number of elements in the cache. + * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes). + * @return + */ + public Map getCacheConfig() { + return cacheConfig; + } + + public void setCacheConfig(Map cacheConfig) { + this.cacheConfig = cacheConfig; + } + /** * Return the number of workers for the topology. This property will be used for the parser unless overridden on the CLI. * @return diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 6b9d62e399..8cfca9ba9a 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -174,6 +174,19 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line. * `securityProtocol` : The security protocol to use for reading from kafka (this is a string). This can be overridden on the command line and also specified in the spout config via the `security.protocol` key. If both are specified, then they are merged and the CLI will take precedence. * `stormConfig` : The storm config to use (this is a map). This can be overridden on the command line. If both are specified, they are merged with CLI properties taking precedence. +* `cacheConfig` : Cache config for stellar field transformations. This configures a least frequently used cache. This is a map with the following keys. + * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default `10000`. + * `stellar.cache.maxTimeRetain` - The maximum amount of time an element is kept in the cache (in minutes). Default `10` minutes. + + Example of a cache config to contain at max `20000` stellar expressions for at most `20` minutes.: +``` +{ + "cacheConfig" : { + "stellar.cache.maxSize" : 20000, + "stellar.cache.maxTimeRetain" : 20 + } +} +``` The `fieldTransformations` is a complex object which defines a transformation which can be done to a message. This transformation can diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index ee67036055..dd593559b5 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -99,7 +99,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get(); this.collector = collector; if(getSensorParserConfig() != null) { - cache = CachingStellarProcessor.createCache(getSensorParserConfig().getParserConfig()); + cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig()); } initializeStellar(); if(getSensorParserConfig() != null && filter == null) { From f0a029d80b95f8d34141623f6ffcf83bdb8bb181 Mon Sep 17 00:00:00 2001 From: cstella Date: Tue, 10 Apr 2018 16:17:05 -0400 Subject: [PATCH 04/10] updating test --- .../apache/metron/management/ConfigurationFunctionsTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java index 19200317bb..1dc4a608a3 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java @@ -88,7 +88,8 @@ private static void pushConfigs(String inputPath) throws Exception { "errorWriterNumTasks":1, "spoutConfig":{}, "parserNumTasks":1, - "spoutParallelism":1 + "spoutParallelism":1, + "cacheConfig" : {} } */ @Multiline From 936c501e758f06e8bfd0d390cf6c6f755f1f56e8 Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 11 Apr 2018 15:49:21 -0400 Subject: [PATCH 05/10] Fixing bug --- .../java/org/apache/metron/parsers/json/JSONMapParser.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java index bddf35dfd5..f5d67f99c7 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java @@ -89,8 +89,7 @@ public JSONObject handle(String key, Map value, JSONObject obj) { public static final String JSONP_QUERY = "jsonpQuery"; private MapStrategy mapStrategy = MapStrategy.DROP; - private TypeRef>> typeRef = new TypeRef>>() { - }; + private transient TypeRef>> typeRef = null; private String jsonpQuery = null; @@ -99,6 +98,7 @@ public void configure(Map config) { String strategyStr = (String) config.getOrDefault(MAP_STRATEGY_CONFIG, MapStrategy.DROP.name()); mapStrategy = MapStrategy.valueOf(strategyStr); if (config.containsKey(JSONP_QUERY)) { + typeRef = new TypeRef>>() { }; jsonpQuery = (String) config.get(JSONP_QUERY); Configuration.setDefaults(new Configuration.Defaults() { From 75c533376faf02a10bd47d3741b9b402687ae5d2 Mon Sep 17 00:00:00 2001 From: cstella Date: Wed, 11 Apr 2018 16:21:51 -0400 Subject: [PATCH 06/10] Adding the bug from #991 back in. --- .../java/org/apache/metron/parsers/json/JSONMapParser.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java index f5d67f99c7..7a5968484b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/json/JSONMapParser.java @@ -89,7 +89,7 @@ public JSONObject handle(String key, Map value, JSONObject obj) { public static final String JSONP_QUERY = "jsonpQuery"; private MapStrategy mapStrategy = MapStrategy.DROP; - private transient TypeRef>> typeRef = null; + private TypeRef>> typeRef = new TypeRef>>() { }; private String jsonpQuery = null; @@ -98,7 +98,6 @@ public void configure(Map config) { String strategyStr = (String) config.getOrDefault(MAP_STRATEGY_CONFIG, MapStrategy.DROP.name()); mapStrategy = MapStrategy.valueOf(strategyStr); if (config.containsKey(JSONP_QUERY)) { - typeRef = new TypeRef>>() { }; jsonpQuery = (String) config.get(JSONP_QUERY); Configuration.setDefaults(new Configuration.Defaults() { From b34e3e084e46e8551d02aad119a051eab9c09405 Mon Sep 17 00:00:00 2001 From: cstella Date: Thu, 12 Apr 2018 11:30:39 -0400 Subject: [PATCH 07/10] Updating docs and turning off by default. --- metron-platform/Performance-tuning-guide.md | 10 ++++++++++ metron-platform/metron-parsers/README.md | 6 +++--- .../stellar/common/CachingStellarProcessor.java | 11 ++++++----- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md index 7d79ace374..5a74cf0627 100644 --- a/metron-platform/Performance-tuning-guide.md +++ b/metron-platform/Performance-tuning-guide.md @@ -43,6 +43,16 @@ parallelism will leave you with idle consumers since Kafka limits the max number important because Kafka has certain ordering guarantees for message delivery per partition that would not be possible if more than one consumer in a given consumer group were able to read from that partition. +## Parser Tuning Suggestions + +If you are using stellar field transformations in your parsers, by default, stellar expressions +are not cached. Turning on caching via setting the `cacheConfig` [property](metron-parsers#parser_configuration) +in your parser configuration can have performance impact if your stellar expressions are +complex (e.g. `ENRICHMENT_GET` calls or other high latency calls). The tradeoff, though, is +that non-deterministic stellar expressions will yield cached results which may be wrong, +for the period of time in which the data exists in the cache (the max time in the cache is +configurable). + ## Component Tuning Levers - Kafka diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 8cfca9ba9a..21d4c74745 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -174,9 +174,9 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line. * `securityProtocol` : The security protocol to use for reading from kafka (this is a string). This can be overridden on the command line and also specified in the spout config via the `security.protocol` key. If both are specified, then they are merged and the CLI will take precedence. * `stormConfig` : The storm config to use (this is a map). This can be overridden on the command line. If both are specified, they are merged with CLI properties taking precedence. -* `cacheConfig` : Cache config for stellar field transformations. This configures a least frequently used cache. This is a map with the following keys. - * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default `10000`. - * `stellar.cache.maxTimeRetain` - The maximum amount of time an element is kept in the cache (in minutes). Default `10` minutes. +* `cacheConfig` : Cache config for stellar field transformations. This configures a least frequently used cache. This is a map with the following keys. If unconfigured, then no cache will be used. + * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default is to not use a cache. + * `stellar.cache.maxTimeRetain` - The maximum amount of time an element is kept in the cache (in minutes). Default is to not use a cache. Example of a cache config to contain at max `20000` stellar expressions for at most `20` minutes.: ``` diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java index 32efdc7e20..14f4663eab 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java @@ -37,9 +37,7 @@ public class CachingStellarProcessor extends StellarProcessor { private static ThreadLocal> > variableCache = ThreadLocal.withInitial(() -> new HashMap<>()); public static String MAX_CACHE_SIZE_PARAM = "stellar.cache.maxSize"; - public static long MAX_CACHE_SIZE_DEFAULT = 10000; public static String MAX_TIME_RETAIN_PARAM = "stellar.cache.maxTimeRetain"; - public static int MAX_TIME_RETAIN_DEFAULT = 10; public static class Key { private String expression; @@ -117,9 +115,12 @@ private Key toKey(String expression, VariableResolver resolver) { * @return A cache. */ public static Cache createCache(Map config) { - Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, MAX_CACHE_SIZE_DEFAULT, Long.class); - Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, MAX_TIME_RETAIN_DEFAULT, Integer.class); - if(maxSize <= 0) { + if(config == null) { + return null; + } + Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, null, Long.class); + Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, null, Integer.class); + if(maxSize == null || maxTimeRetain == null || maxSize <= 0 || maxTimeRetain <= 0) { return null; } return Caffeine.newBuilder() From 5415b37f7d39b500309b08c188274ade1776fb50 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 16 Apr 2018 15:10:31 -0400 Subject: [PATCH 08/10] Updating docs --- metron-platform/Performance-tuning-guide.md | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md index 5a74cf0627..ac4d22fcc2 100644 --- a/metron-platform/Performance-tuning-guide.md +++ b/metron-platform/Performance-tuning-guide.md @@ -43,15 +43,18 @@ parallelism will leave you with idle consumers since Kafka limits the max number important because Kafka has certain ordering guarantees for message delivery per partition that would not be possible if more than one consumer in a given consumer group were able to read from that partition. -## Parser Tuning Suggestions - -If you are using stellar field transformations in your parsers, by default, stellar expressions -are not cached. Turning on caching via setting the `cacheConfig` [property](metron-parsers#parser_configuration) -in your parser configuration can have performance impact if your stellar expressions are -complex (e.g. `ENRICHMENT_GET` calls or other high latency calls). The tradeoff, though, is -that non-deterministic stellar expressions will yield cached results which may be wrong, -for the period of time in which the data exists in the cache (the max time in the cache is -configurable). +## Sensor Topology Tuning Suggestions + +If you are using stellar field transformations in your sensors, by default, stellar expressions +are not cached. Sensors that use stellar field transformations by see a performance +boost by turning on caching via setting the `cacheConfig` +[property](metron-parsers#parser_configuration). +This is beneficial if your transformations: + +* Are complex (e.g. `ENRICHMENT_GET` calls or other high latency calls) +* All Yield the same results for the same inputs ( caching is either off or applied to all transformations) + * If any of your transformations are non-deterministic, caching should not be used as it will result in the likelihood of incorrect results being returned. + ## Component Tuning Levers From ddb9d2d6957486756c4dd3004772dee920d2f2d5 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 23 Apr 2018 14:58:26 -0400 Subject: [PATCH 09/10] Reacting to @nickwallen's comments --- metron-platform/metron-parsers/README.md | 2 +- .../common/CachingStellarProcessor.java | 8 +- .../apache/metron/stellar/dsl/Context.java | 44 ++++++++-- .../common/CachingStellarProcessorTest.java | 87 +++++++++++++++++++ 4 files changed, 131 insertions(+), 10 deletions(-) create mode 100644 metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index 21d4c74745..1d2d834346 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -174,7 +174,7 @@ then it is assumed to be a regex and will match any topic matching the pattern ( * `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line. * `securityProtocol` : The security protocol to use for reading from kafka (this is a string). This can be overridden on the command line and also specified in the spout config via the `security.protocol` key. If both are specified, then they are merged and the CLI will take precedence. * `stormConfig` : The storm config to use (this is a map). This can be overridden on the command line. If both are specified, they are merged with CLI properties taking precedence. -* `cacheConfig` : Cache config for stellar field transformations. This configures a least frequently used cache. This is a map with the following keys. If unconfigured, then no cache will be used. +* `cacheConfig` : Cache config for stellar field transformations. This configures a least frequently used cache. This is a map with the following keys. If not explicitly configured (the default), then no cache will be used. * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default is to not use a cache. * `stellar.cache.maxTimeRetain` - The maximum amount of time an element is kept in the cache (in minutes). Default is to not use a cache. diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java index 14f4663eab..36e6579463 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java @@ -93,13 +93,17 @@ public Object parse(String expression, VariableResolver variableResolver, Functi if(cacheOpt.isPresent()) { Cache cache = (Cache) cacheOpt.get(); Key k = toKey(expression, variableResolver); - return cache.get(k, x -> super.parse(x.expression, variableResolver, functionResolver, context)); + return cache.get(k, x -> parseUncached(x.expression, variableResolver, functionResolver, context)); } else { - return super.parse(expression, variableResolver, functionResolver, context); + return parseUncached(expression, variableResolver, functionResolver, context); } } + protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { + return super.parse(expression, variableResolver, functionResolver, context); + } + private Key toKey(String expression, VariableResolver resolver) { Set variablesUsed = variableCache.get().computeIfAbsent(expression, this::variablesUsed); Map input = new HashMap<>(); diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java index 2cdf4ff809..8a477c43d8 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java @@ -30,13 +30,43 @@ public interface Capability { public enum Capabilities { HBASE_PROVIDER - , GLOBAL_CONFIG - , ZOOKEEPER_CLIENT - , SERVICE_DISCOVERER - , STELLAR_CONFIG - , CONSOLE - , SHELL_VARIABLES - , CACHE + , + /** + * This capability indicates that the global config is available. + */ + GLOBAL_CONFIG + , + /** + * This capability indicates that a zookeeper client (i.e. a Curator client, specifically) is available. + */ + ZOOKEEPER_CLIENT + , + /** + * This capability indicates that a MaaS service discoverer is available. + */ + SERVICE_DISCOVERER + , + /** + * This capability indicates that a map configuring stellar is available. Generally this is done within the global config + * inside of storm, but may be sourced elsewhere (e.g. the CLI when running the REPL). + */ + STELLAR_CONFIG + , + /** + * This capability indicates that the Console object is available. This is available when run via the CLI (e.g. from the REPL). + */ + CONSOLE + , + /** + * This capability indicates that shell variables are available. This is available when run via the CLI (e.g. from the REPL). + */ + SHELL_VARIABLES + , + /** + * This capability indicates that the StellarProcessor should use a Caffeine cache to cache expression -> results. If an expression + * is in the cache, then the cached result will be returned instead of recomputing. + */ + CACHE } public enum ActivityType { diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java new file mode 100644 index 0000000000..dda2b9bb3f --- /dev/null +++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java @@ -0,0 +1,87 @@ +package org.apache.metron.stellar.common; + +import com.github.benmanes.caffeine.cache.Cache; +import com.google.common.collect.ImmutableMap; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.MapVariableResolver; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.metron.stellar.dsl.VariableResolver; +import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class CachingStellarProcessorTest { + + private static Map fields = new HashMap() {{ + put("name", "blah"); + }}; + + @Test + public void testNoCaching() throws Exception { + //no caching, so every expression is a cache miss. + Assert.assertEquals(2, countMisses(2, Context.EMPTY_CONTEXT(), "TO_UPPER(name)")); + //Ensure the correct result is returned. + Assert.assertEquals("BLAH", evaluateExpression(Context.EMPTY_CONTEXT(), "TO_UPPER(name)")); + } + + @Test + public void testCaching() throws Exception { + Cache cache = CachingStellarProcessor.createCache( + ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 2 + ,CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10 + ) + ); + Context context = new Context.Builder() + .with( Context.Capabilities.CACHE , () -> cache ) + .build(); + //running the same expression twice should hit the cache on the 2nd time and only yield one miss + Assert.assertEquals(1, countMisses(2, context, "TO_UPPER(name)")); + + //Ensure the correct result is returned. + Assert.assertEquals("BLAH", evaluateExpression(context, "TO_UPPER(name)")); + + //running the same expression 20 more times should pull from the cache + Assert.assertEquals(0, countMisses(20, context, "TO_UPPER(name)")); + + //Now we are running 4 distinct operations with a cache size of 2. The cache has 1 element in it before we start: + // TO_LOWER(name) - miss (brand new), cache is full + // TO_UPPER(name) - hit, cache is full + // TO_UPPER('foo') - miss (brand new), cache is still full, but TO_LOWER is evicted as the least frequently used + // JOIN... - miss (brand new), cache is still full, but TO_UPPER('foo') is evicted as the least frequently used + //this pattern repeats a 2nd time to add another 3 cache misses, totalling 6. + Assert.assertEquals(6, countMisses(2, context, "TO_LOWER(name)", "TO_UPPER(name)", "TO_UPPER('foo')", "JOIN([name, 'blah'], ',')")); + } + + private Object evaluateExpression(Context context, String expression) { + StellarProcessor processor = new CachingStellarProcessor(); + return processor.parse(expression + , new MapVariableResolver(fields) + , StellarFunctions.FUNCTION_RESOLVER() + , context); + } + + private int countMisses(int numRepetition, Context context, String... expressions) { + AtomicInteger numExpressions = new AtomicInteger(0); + StellarProcessor processor = new CachingStellarProcessor() { + @Override + protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { + numExpressions.incrementAndGet(); + return super.parseUncached(expression, variableResolver, functionResolver, context); + } + }; + + for(int i = 0;i < numRepetition;++i) { + for(String expression : expressions) { + processor.parse(expression + , new MapVariableResolver(fields) + , StellarFunctions.FUNCTION_RESOLVER() + , context); + } + } + return numExpressions.get(); + } +} From f6947d203dbf7720b61b13384a9b9aa38bd34d13 Mon Sep 17 00:00:00 2001 From: cstella Date: Mon, 23 Apr 2018 15:04:29 -0400 Subject: [PATCH 10/10] Updating license --- .../common/CachingStellarProcessorTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java index dda2b9bb3f..94421dec6a 100644 --- a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java +++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.metron.stellar.common; import com.github.benmanes.caffeine.cache.Cache;