From 4e9fc6307356e9dcb71ec1d9ed78644b1e7f3eb3 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 25 Nov 2016 18:24:22 -0500 Subject: [PATCH 1/7] METRON-606 Extracted ProfileBuilder logic from the ProfileBuilderBolt. --- .../metron/profiler/ProfileBuilder.java | 331 +++++++++++++++ .../apache/metron/profiler/ProfilePeriod.java | 7 + .../stellar/DefaultStellarExecutor.java | 5 + .../profiler/stellar/StellarExecutor.java | 7 + .../metron/profiler/ProfileBuilderTest.java | 348 ++++++++++++++++ .../metron/profiler/ProfilePeriodTest.java | 7 + .../profiler/bolt/ProfileBuilderBolt.java | 318 +++------------ .../metron/profiler/bolt/ProfileState.java | 101 ----- .../profiler/bolt/ProfileBuilderBoltTest.java | 385 +++++++----------- metron-deployment/amazon-ec2/run.sh | 6 +- 10 files changed, 909 insertions(+), 606 deletions(-) create mode 100644 metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java create mode 100644 metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java delete mode 100644 metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java new file mode 100644 index 0000000000..9696a60134 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -0,0 +1,331 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.apache.commons.beanutils.BeanMap; +import org.apache.commons.collections4.ListUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.dsl.Context; +import org.apache.metron.common.dsl.ParseException; +import org.apache.metron.common.dsl.StellarFunctions; +import org.apache.metron.profiler.stellar.DefaultStellarExecutor; +import org.apache.metron.profiler.stellar.StellarExecutor; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; + +/** + * Responsible for building and maintaining a Profile. + * + * One or more messages are applied to the Profile with `apply` and a profile measurement is + * produced by calling `flush`. + * + * Any one instance is responsible only for building the profile for a specific [profile, entity] + * pairing. There will exist many instances, one for each [profile, entity] pair that exists + * within the incoming telemetry data applied to the profile. + */ +public class ProfileBuilder { + + protected static final Logger LOG = LoggerFactory.getLogger(ProfileBuilder.class); + + /** + * A ProfileMeasurement is created and emitted each window period. A Profile + * itself is composed of many ProfileMeasurements. + */ + private ProfileMeasurement measurement; + + /** + * The definition of the Profile that the bolt is building. + */ + private ProfileConfig definition; + + /** + * Executes Stellar code and maintains state across multiple invocations. + */ + private StellarExecutor executor; + + /** + * Has the profile been initialized? + */ + private boolean isInitialized; + + /** + * Use the ProfileBuilder.Builder to create a new ProfileBuilder. + */ + private ProfileBuilder(ProfileConfig definition, + String entity, + long whenMillis, + long periodDurationMillis, + CuratorFramework client, + Map global) { + + this.isInitialized = false; + this.definition = definition; + this.measurement = new ProfileMeasurement( + definition.getProfile(), + entity, + whenMillis, + periodDurationMillis, + TimeUnit.MILLISECONDS); + this.executor = new DefaultStellarExecutor(); + Context context = new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) + .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) + .build(); + StellarFunctions.initialize(context); + this.executor.setContext(context); + } + + /** + * Apply a message to the profile. + * @param message The message to apply. + */ + public void apply(JSONObject message) { + + if(!isInitialized()) { + assign(definition.getInit(), message, "init"); + isInitialized = true; + } + + assign(definition.getUpdate(), message, "update"); + } + + /** + * Flush the Profile. + * + * Completes and emits the ProfileMeasurement. Clears all state in preparation for + * the next window period. + * @return Returns the completed profile measurement. + */ + public ProfileMeasurement flush() { + LOG.debug("Flushing profile: profile={}, entity={}", measurement.getProfileName(), measurement.getEntity()); + + // execute the 'result' expression + Object value = execute(definition.getResult(), new JSONObject(), "result"); + measurement.setValue(value); + + // execute the 'groupBy' expression(s) - allow each expression to refer to the fields of the ProfileMeasurement + List groups = execute(definition.getGroupBy(), new BeanMap(measurement), "groupBy"); + measurement.setGroups(groups); + + // execute the 'tickUpdate' expression(s) + assign(definition.getTickUpdate(), Collections.singletonMap("result", value),"tickUpdate"); + + // save a copy of current state then clear it to prepare for the next window + Map state = executor.getState(); + executor.clearState(); + + // the 'tickUpdate' state is not flushed - make sure to bring that state along to the next period + definition.getTickUpdate().forEach((var, expr) -> { + Object val = state.get(var); + executor.assign(var, val); + }); + + isInitialized = false; + return measurement; + } + + /** + * Executes an expression contained within the profile definition. + * @param expression The expression to execute. + * @param transientState Additional transient state provided to the expression. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + * @return The result of executing the expression. + */ + private Object execute(String expression, Map transientState, String expressionType) { + Object result = null; + + List allResults = execute(Collections.singletonList(expression), transientState, expressionType); + if(allResults.size() > 0) { + result = allResults.get(0); + } + + return result; + } + + /** + * Executes a set of expressions whose results need to be assigned to a variable. + * @param expressions Maps the name of a variable to the expression whose result should be assigned to it. + * @param transientState Additional transient state provided to the expression. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + */ + private void assign(Map expressions, Map transientState, String expressionType) { + try { + + // execute each of the 'update' expressions + MapUtils.emptyIfNull(expressions) + .forEach((var, expr) -> executor.assign(var, expr, transientState)); + + } catch(ParseException e) { + + // make it brilliantly clear that one of the 'update' expressions is bad + String msg = format( + "Bad '%s' expression: %s, profile=%s, entity=%s", + expressionType, + e.getMessage(), + measurement.getProfileName(), + measurement.getEntity()); + throw new ParseException(msg, e); + } + } + + /** + * Executes the expressions contained within the profile definition. + * @param expressions A list of expressions to execute. + * @param transientState Additional transient state provided to the expressions. + * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. + * @return The result of executing each expression. + */ + private List execute(List expressions, Map transientState, String expressionType) { + List results = new ArrayList<>(); + + try { + ListUtils.emptyIfNull(expressions) + .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class))); + + } catch (Throwable e) { + String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", + expressionType, + e.getMessage(), + measurement.getProfileName(), + measurement.getEntity()); + throw new ParseException(msg, e); + } + + return results; + } + + /** + * Returns the current value of a variable. + * @param variable The name of the variable. + */ + public Object valueOf(String variable) { + return executor.getState().get(variable); + } + + public boolean isInitialized() { + return isInitialized; + } + + public ProfileConfig getDefinition() { + return definition; + } + + /** + * A builder used to construct a new ProfileBuilder. + */ + public static class Builder { + + private ProfileConfig definition; + private String entity; + private long periodAt; + private long periodDurationMillis; + private CuratorFramework zookeeperClient; + private Map global; + + /** + * @param definition The profiler definition. + */ + public Builder withDefinition(ProfileConfig definition) { + this.definition = definition; + return this; + } + + /** + * @param entity The name of the entity + */ + public Builder withEntity(String entity) { + this.entity = entity; + return this; + } + + /** + * @param periodAtMillis A time within the profile period when the measurement was taken in epoch milliseconds. + */ + public Builder withPeriodAt(long periodAtMillis) { + this.periodAt = periodAtMillis; + return this; + } + + /** + * @param duration The duration of each profile period. + * @param units The units used to specify the duration of the profile period. + */ + public Builder withPeriodDuration(long duration, TimeUnit units) { + this.periodDurationMillis = units.toMillis(duration); + return this; + } + + /** + * @param millis The duration of each profile period in milliseconds. + */ + public Builder withPeriodDurationMillis(long millis) { + this.periodDurationMillis = millis; + return this; + } + + /** + * @param zookeeperClient The zookeeper client. + */ + public Builder withZookeeperClient(CuratorFramework zookeeperClient) { + this.zookeeperClient = zookeeperClient; + return this; + } + + /** + * @param global The global configuration. + */ + public Builder withGlobalConfiguration(Map global) { + + // TODO how does the profile builder ever seen a global that has been update in zookeeper? + + this.global = global; + return this; + } + + /** + * Construct a ProfileBuilder. + */ + public ProfileBuilder build() { + + if(definition == null) { + throw new IllegalArgumentException("missing profiler definition; got null"); + } + + if(StringUtils.isEmpty(entity)) { + throw new IllegalArgumentException(format("missing entity name; got '%s'", entity)); + } + + return new ProfileBuilder(definition, entity, periodAt, periodDurationMillis, zookeeperClient, global); + } + } +} diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java index 1b8efc84e1..c4669196a6 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit; +import static java.lang.String.format; + /** * The Profiler captures a ProfileMeasurement once every ProfilePeriod. There can be * multiple ProfilePeriods every hour. @@ -45,6 +47,11 @@ public class ProfilePeriod { * @param units The units of the duration; hours, minutes, etc. */ public ProfilePeriod(long epochMillis, long duration, TimeUnit units) { + if(duration <= 0) { + throw new IllegalArgumentException(format( + "period duration must be greater than 0; got '%d %s'", duration, units)); + } + this.durationMillis = units.toMillis(duration); this.period = epochMillis / durationMillis; } diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java index 18997a6185..ad748f6a02 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java @@ -95,6 +95,11 @@ public void assign(String variable, String expression, Map trans state.put(variable, result); } + @Override + public void assign(String variable, Object value) { + state.put(variable, value); + } + /** * Execute a Stellar expression and return the result. The internal state of the executor * is not modified. diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java index 869db4210b..2342eb79e9 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java @@ -29,6 +29,13 @@ */ public interface StellarExecutor { + /** + * Assign a variable a specific value. + * @param variable The variable name. + * @param value The value to assign to the variable. + */ + void assign(String variable, Object value); + /** * Execute an expression and assign the result to a variable. The variable is maintained * in the context of this executor and is available to all subsequent expressions. diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java new file mode 100644 index 0000000000..eda6b6bd8f --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java @@ -0,0 +1,348 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.apache.metron.common.utils.ConversionUtils.convert; +import static org.junit.Assert.assertEquals; + +/** + * Tests the ProfileBuilder class. + */ +public class ProfileBuilderTest { + + /** + * { + * "ip_src_addr": "10.0.0.1", + * "ip_dst_addr": "10.0.0.20", + * "value": 100 + * } + */ + @Multiline + private String input; + private JSONObject message; + private ProfileBuilder builder; + private ProfileConfig definition; + + @Before + public void setup() throws Exception { + message = (JSONObject) new JSONParser().parse(input); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "100", + * "y": "200" + * }, + * "result": "x + y" + * } + */ + @Multiline + private String testInitProfile; + + /** + * Ensure that the 'init' block is executed correctly. + */ + @Test + public void testInit() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); + builder = new ProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodAt(System.currentTimeMillis()) + .withPeriodDuration(10, TimeUnit.MINUTES) + .build(); + + // execute + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate that x = 100, y = 200 + assertEquals(100 + 200, (int) convert(m.getValue(), Integer.class)); + } + + /** + * The 'init' block is executed only when the first message is received. If no message + * has been received, the 'init' block will not be executed. + */ + @Test + public void testInitWithNoMessage() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); + builder = new ProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodAt(System.currentTimeMillis()) + .withPeriodDuration(10, TimeUnit.MINUTES) + .build(); + + // execute + ProfileMeasurement m = builder.flush(); + + // validate that x = 0 and y = 0 as no initialization occurred + assertEquals(0, (int) convert(m.getValue(), Integer.class)); + } + + /** + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": { + * "x": "0", + * "y": "0" + * }, + * "update": { + * "x": "x + 1", + * "y": "y + 2" + * }, + * "result": "x + y" + * } + */ + @Multiline + private String testUpdateProfile; + + /** + * Ensure that the 'update' expressions are executed for each message applied to the profile. + */ + @Test + public void testUpdate() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class); + builder = new ProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodAt(System.currentTimeMillis()) + .withPeriodDuration(10, TimeUnit.MINUTES) + .build(); + + // execute + int count = 10; + for(int i=0; i profileCache; + private transient Cache profileCache; /** * Parses JSON messages. @@ -115,11 +107,11 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll super.prepare(stormConf, context, collector); if(timeToLiveMillis < periodDurationMillis) { - String msg = String.format("invalid configuration: expect profile TTL (%d) greater than period duration (%d)", - timeToLiveMillis, periodDurationMillis); - throw new IllegalStateException(msg); + throw new IllegalStateException(format( + "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)", + timeToLiveMillis, + periodDurationMillis)); } - this.collector = collector; this.parser = new JSONParser(); this.profileCache = CacheBuilder @@ -160,207 +152,23 @@ public void execute(Tuple input) { */ private void doExecute(Tuple input) throws ExecutionException { - if(isTickTuple(input)) { - flush(input); - - } else { - if (!isInitialized(input)) { - init(input); - } - update(input); - } - } - - /** - * Initialize the bolt. Occurs when the first tuple is received at the start - * of each window period. - * @param input The input tuple - */ - private void init(Tuple input) throws ExecutionException { - - ProfileState state = getProfileState(input); - try { - - // the original telemetry message is provided as additional context for the 'update' expressions - JSONObject message = getMessage(input); - - // execute the 'init' expression - Map expressions = state.getDefinition().getInit(); - expressions.forEach((var, expr) -> state.getExecutor().assign(var, expr, message)); - - } catch(ParseException e) { - - // make it brilliantly clear that one of the 'init' expressions is bad - ProfileMeasurement measurement = state.getMeasurement(); - String msg = format("Bad 'init' expression: %s, profile=%s, entity=%s", - e.getMessage(), measurement.getProfileName(), measurement.getEntity()); - throw new ParseException(msg, e); - } - } - - /** - * Update the Profile based on data contained in a new message. - * @param input The tuple containing a new message. - */ - private void update(Tuple input) throws ExecutionException { - - ProfileState state = getProfileState(input); - try { - - // the original telemetry message is provided as additional context for the 'update' expressions - JSONObject message = getMessage(input); + if(TupleUtils.isTick(input)) { - // execute each of the 'update' expressions - Map expressions = state.getDefinition().getUpdate(); - expressions.forEach((var, expr) -> state.getExecutor().assign(var, expr, message)); - - } catch(ParseException e) { - - // make it brilliantly clear that one of the 'update' expressions is bad - ProfileMeasurement measurement = state.getMeasurement(); - String msg = format("Bad 'update' expression: %s, profile=%s, entity=%s", - e.getMessage(), measurement.getProfileName(), measurement.getEntity()); - throw new ParseException(msg, e); - } - } - - /** - * Flush the Profiles. - * - * Executed on a fixed time period when a tick tuple is received. Completes - * and emits the ProfileMeasurement. Clears all state in preparation for - * the next window period. - */ - private void flush(Tuple tickTuple) { - - // flush each of the profiles maintain by this bolt - profileCache.asMap().forEach((key, profileState) -> { - - ProfileMeasurement measurement = profileState.getMeasurement(); - StellarExecutor executor = profileState.getExecutor(); - ProfileConfig definition = profileState.getDefinition(); - LOG.info(String.format("Flushing profile: profile=%s, entity=%s", measurement.getProfileName(), measurement.getEntity())); - - // execute the 'result' and 'group by' expressions - Object value = executeResult(definition.getResult(), executor, measurement); - measurement.setValue(value); - - List groups = executeGroupBy(definition.getGroupBy(), executor, measurement); - measurement.setGroups(groups); - - // emit the completed profile measurement - emit(measurement, definition); - - // execute the update with the old state - Map tickUpdate = definition.getTickUpdate(); - Map state = executor.getState(); - if(tickUpdate != null) { - tickUpdate.forEach((var, expr) -> executor.assign(var, expr, Collections.singletonMap("result", value))); - } - - // clear the execution state to prepare for the next window - executor.clearState(); - - //make sure that we bring along the update state - if(tickUpdate != null) { - tickUpdate.forEach((var, expr) -> executor.getState().put(var, state.get(var))); - } + // when a 'tick' is received, flush the profile and emit the completed profile measurement + profileCache.asMap().forEach((key, profileBuilder) -> { + ProfileMeasurement measurement = profileBuilder.flush(); + collector.emit(new Values(measurement, profileBuilder.getDefinition())); + }); // cache maintenance profileCache.cleanUp(); - }); - } - - /** - * Create the state necessary to build a Profile. - * @param tuple The tuple that needs applied to a profile. - */ - private ProfileState createProfileState(Tuple tuple) { - - // extract the profile definition - ProfileConfig profileDefinition = getProfileDefinition(tuple); - - // create the profile measurement which will be emitted at the end of the window period - ProfileMeasurement measurement = new ProfileMeasurement( - profileDefinition.getProfile(), - getEntity(tuple), - getTimestamp(), - periodDurationMillis, - TimeUnit.MILLISECONDS); - - // create the executor - StellarExecutor executor = new DefaultStellarExecutor(); - Context context = new Context.Builder() - .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) - .build(); - StellarFunctions.initialize(context); - executor.setContext(context); - // create the profile state which is maintained within a cache for a fixed period of time - ProfileState state = new ProfileState(); - state.setExecutor(executor); - state.setDefinition(profileDefinition); - state.setMeasurement(measurement); - - return state; - } - - /** - * Executes the 'result' expression of a Profile. - * @return The result of evaluating the 'result' expression. - */ - private Object executeResult(String expression, StellarExecutor executor, ProfileMeasurement measurement) { - Object result; - try { - result = executor.execute(expression, new JSONObject(), Object.class); + } else { - } catch(ParseException e) { - String msg = format("Bad 'result' expression: %s, profile=%s, entity=%s", - e.getMessage(), measurement.getProfileName(), measurement.getEntity()); - throw new ParseException(msg, e); + // telemetry message provides additional context for 'init' and 'update' expressions + JSONObject message = getField("message", input, JSONObject.class); + getBuilder(input).apply(message); } - return result; - } - - - /** - * Executes each of the 'groupBy' expressions. The result of each - * expression are the groups used to sort the data as part of the - * row key. - * @param expressions The 'groupBy' expressions to execute. - * @return The result of executing the 'groupBy' expressions. - */ - private List executeGroupBy(List expressions, StellarExecutor executor, ProfileMeasurement measurement) { - List groups = new ArrayList<>(); - - if(!isEmpty(expressions)) { - try { - // allows each 'groupBy' expression to refer to the fields of the ProfileMeasurement - BeanMap measureAsMap = new BeanMap(measurement); - - for (String expr : expressions) { - Object result = executor.execute(expr, measureAsMap, Object.class); - groups.add(result); - } - - } catch(Throwable e) { - String msg = format("Bad 'groupBy' expression: %s, profile=%s, entity=%s", - e.getMessage(), measurement.getProfileName(), measurement.getEntity()); - throw new ParseException(msg, e); - } - } - - return groups; - } - - /** - * Emits a message containing a ProfileMeasurement and the Profile configuration. - * @param measurement The completed ProfileMeasurement. - * @param definition The profile definition. - */ - private void emit(ProfileMeasurement measurement, ProfileConfig definition) { - collector.emit(new Values(measurement, definition)); } /** @@ -368,55 +176,44 @@ private void emit(ProfileMeasurement measurement, ProfileConfig definition) { * @param tuple A tuple. */ private String cacheKey(Tuple tuple) { - return String.format("%s:%s", getProfileDefinition(tuple).getProfile(), getEntity(tuple)); + return format("%s:%s", + getField("profile", tuple, ProfileConfig.class), + getField("entity", tuple, String.class)); } /** - * Retrieves the state associated with a Profile. If none exists, the state will - * be initialized. + * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile. If none exists, + * one will be created and returned. * @param tuple The tuple. */ - protected ProfileState getProfileState(Tuple tuple) throws ExecutionException { - return profileCache.get(cacheKey(tuple), () -> createProfileState(tuple)); - } - - /** - * Extracts the profile definition from a tuple. - * @param tuple The tuple sent by the splitter bolt. - */ - private ProfileConfig getProfileDefinition(Tuple tuple) { - ProfileConfig definition = (ProfileConfig) tuple.getValueByField("profile"); - if(definition == null) { - throw new IllegalStateException("invalid tuple received: missing profile definition"); + protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException { + return profileCache.get( + cacheKey(tuple), + () -> new ProfileBuilder.Builder() + .withDefinition(getField("profile", tuple, ProfileConfig.class)) + .withEntity(getField("entity", tuple, String.class)) + .withPeriodAt(getTimestamp()) + .withPeriodDurationMillis(periodDurationMillis) + .withGlobalConfiguration(getConfigurations().getGlobalConfig()) + .withZookeeperClient(client) + .build()); + } + + /** + * Retrieves an expected field from a Tuple. If the field is missing an exception is thrown to + * indicate a fatal error. + * @param fieldName The name of the field. + * @param tuple The tuple from which to retrieve the field. + * @param clazz The type of the field value. + * @param The type of the field value. + */ + private T getField(String fieldName, Tuple tuple, Class clazz) { + T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz); + if(value == null) { + throw new IllegalStateException(format("invalid tuple received: missing field '%s'", fieldName)); } - return definition; - } - - /** - * Extracts the name of the entity from a tuple. - * @param tuple The tuple sent by the splitter bolt. - */ - private String getEntity(Tuple tuple) { - String entity = tuple.getStringByField("entity"); - if(entity == null) { - throw new IllegalStateException("invalid tuple received: missing entity name"); - } - - return entity; - } - - /** - * Extracts the original telemetry message from a tuple. - * @param input The tuple sent by the splitter bolt. - */ - private JSONObject getMessage(Tuple input) { - JSONObject message = (JSONObject) input.getValueByField("message"); - if(message == null) { - throw new IllegalStateException("invalid tuple received: missing message"); - } - - return message; + return value; } /** @@ -427,23 +224,6 @@ private long getTimestamp() { return System.currentTimeMillis(); } - /** - * Has the Stellar execution environment already been initialized - * @return True, it it has been initialized. - */ - private boolean isInitialized(Tuple tuple) { - return profileCache.getIfPresent(cacheKey(tuple)) != null; - } - - /** - * Is this a tick tuple? - * @param tuple The tuple - */ - protected static boolean isTickTuple(Tuple tuple) { - return Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) && - Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); - } - public ProfileBuilderBolt withPeriodDurationMillis(long periodDurationMillis) { this.periodDurationMillis = periodDurationMillis; return this; diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java deleted file mode 100644 index f6c48b44de..0000000000 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.profiler.stellar.StellarExecutor; - -/** - * The state that must be maintained for each [profile, entity] pair when building a Profile. - */ -public class ProfileState { - - /** - * A ProfileMeasurement is created and emitted each window period. A Profile - * itself is composed of many ProfileMeasurements. - */ - private ProfileMeasurement measurement; - - /** - * The definition of the Profile that the bolt is building. - */ - private ProfileConfig definition; - - /** - * Executes Stellar code and maintains state across multiple invocations. - */ - private StellarExecutor executor; - - public ProfileMeasurement getMeasurement() { - return measurement; - } - - public void setMeasurement(ProfileMeasurement measurement) { - this.measurement = measurement; - } - - public ProfileConfig getDefinition() { - return definition; - } - - public void setDefinition(ProfileConfig definition) { - this.definition = definition; - } - - public StellarExecutor getExecutor() { - return executor; - } - - public void setExecutor(StellarExecutor executor) { - this.executor = executor; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ProfileState that = (ProfileState) o; - - if (measurement != null ? !measurement.equals(that.measurement) : that.measurement != null) return false; - if (definition != null ? !definition.equals(that.definition) : that.definition != null) return false; - return executor != null ? executor.equals(that.executor) : that.executor == null; - - } - - @Override - public int hashCode() { - int result = measurement != null ? measurement.hashCode() : 0; - result = 31 * result + (definition != null ? definition.hashCode() : 0); - result = 31 * result + (executor != null ? executor.hashCode() : 0); - return result; - } - - @Override - public String toString() { - return "ProfileState{" + - "measurement=" + measurement + - ", definition=" + definition + - ", executor=" + executor + - '}'; - } -} diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 43149e2ce6..8d610bd9f8 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -21,17 +21,18 @@ package org.apache.metron.profiler.bolt; import org.adrianwalker.multilinestring.Multiline; -import org.apache.log4j.Level; +import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.ProfileBuilder; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.metron.test.bolt.BaseBoltTest; -import org.apache.metron.test.utils.UnitTestHelper; import org.apache.storm.Constants; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -39,13 +40,13 @@ import java.util.HashMap; import java.util.concurrent.TimeUnit; -import static org.hamcrest.CoreMatchers.equalTo; +import static org.apache.metron.common.utils.ConversionUtils.convert; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.refEq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -58,40 +59,57 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { /** * { * "ip_src_addr": "10.0.0.1", - * "ip_dst_addr": "10.0.0.20" + * "value": "22" * } */ @Multiline - private String input; + private String inputOne; + private JSONObject messageOne; - private JSONObject message; + /** + * { + * "ip_src_addr": "10.0.0.2", + * "value": "22" + * } + */ + @Multiline + private String inputTwo; + private JSONObject messageTwo; public static Tuple mockTickTuple() { - return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); - } - - public static Tuple mockTuple(String componentId, String streamId) { Tuple tuple = mock(Tuple.class); - when(tuple.getSourceComponent()).thenReturn(componentId); - when(tuple.getSourceStreamId()).thenReturn(streamId); + when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID); + when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID); return tuple; } - public void setup(String profile) throws Exception { - - // parse the input message + @Before + public void setup() throws Exception { JSONParser parser = new JSONParser(); - message = (JSONObject) parser.parse(input); - - // the tuple will contain the original message - when(tuple.getValueByField(eq("message"))).thenReturn(message); + messageOne = (JSONObject) parser.parse(inputOne); + messageTwo = (JSONObject) parser.parse(inputTwo); + } - // the tuple will contain the 'fully resolved' name of the entity - when(tuple.getStringByField(eq("entity"))).thenReturn("10.0.0.1"); + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfileConfig createDefinition(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + } - // the tuple will contain the profile definition - ProfileConfig profileConfig = JSONUtils.INSTANCE.load(profile, ProfileConfig.class); - when(tuple.getValueByField(eq("profile"))).thenReturn(profileConfig); + /** + * Create a tuple that will contain the message, the entity name, and profile definition. + * @param entity The entity name + * @param message The telemetry message. + * @param profile The profile definition. + */ + private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile) { + Tuple tuple = mock(Tuple.class); + when(tuple.getValueByField(eq("message"))).thenReturn(message); + when(tuple.getValueByField(eq("entity"))).thenReturn(entity); + when(tuple.getValueByField(eq("profile"))).thenReturn(profile); + return tuple; } /** @@ -111,270 +129,171 @@ private ProfileBuilderBolt createBolt() throws IOException { /** * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "true", - * "init": { - * "x": "10", - * "y": "20" - * }, - * "update": { - * "x": "x + 10", - * "y": "y + 20" - * }, - * "result": "x + y" - * } - */ - @Multiline - private String basicProfile; - - /** - * Ensure that the bolt can update a profile based on new messages that it receives. - */ - @Test - public void testProfileUpdate() throws Exception { - - setup(basicProfile); - ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - bolt.execute(tuple); - - // validate that x=10+10+10 y=20+20+20 - ProfileState state = bolt.getProfileState(tuple); - assertEquals(10+10+10.0, state.getExecutor().getState().get("x")); - assertEquals(20+20+20.0, state.getExecutor().getState().get("y")); - } - - /** - * { - * "profile": "test", + * "profile": "profileOne", * "foreach": "ip_src_addr", - * "update": { "x": "2" }, + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, * "result": "x" * } */ @Multiline - private String profileWithNoInit; - - /** - * If the 'init' field is not defined, then the profile should - * behave as normal, but with no variable initialization. - */ - @Test - public void testProfileWithNoInit() throws Exception { - - setup(profileWithNoInit); - ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - bolt.execute(tuple); + private String profileOne; - // validate - ProfileState state = bolt.getProfileState(tuple); - assertEquals(2, state.getExecutor().getState().get("x")); - } /** * { - * "profile": "test", + * "profile": "profileTwo", * "foreach": "ip_src_addr", - * "init": { "x": "2" }, + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, * "result": "x" * } */ @Multiline - private String profileWithNoUpdate; + private String profileTwo; /** - * If the 'update' field is not defined, then no updates should occur as messages - * are received. + * The bolt should create a ProfileBuilder to manage a profile. */ @Test - public void testProfileWithNoUpdate() throws Exception { + public void testCreateProfileBuilder() throws Exception { - setup(profileWithNoUpdate); ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - bolt.execute(tuple); - bolt.execute(tuple); + ProfileConfig definition = createDefinition(profileOne); + String entity = (String) messageOne.get("ip_src_addr"); + Tuple tupleOne = createTuple(entity, messageOne, definition); - // validate - ProfileState state = bolt.getProfileState(tuple); - assertEquals(2, state.getExecutor().getState().get("x")); + // execute - send two tuples with different entities + bolt.execute(tupleOne); + + // validate - 1 messages applied + ProfileBuilder builderOne = bolt.getBuilder(tupleOne); + assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class)); } /** - * Ensure that the bolt can flush the profile when a tick tuple is received. + * This test creates two different messages, with different entities that are applied to + * the same profile. The bolt should create separate ProfileBuilder objects to handle each + * profile/entity pair. */ @Test - public void testProfileFlush() throws Exception { + public void testCreateProfileBuilderForEachEntity() throws Exception { // setup - setup(basicProfile); ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - bolt.execute(tuple); - - // execute - the tick tuple triggers a flush of the profile - bolt.execute(mockTickTuple()); - - // capture the ProfileMeasurement that should be emitted - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); - verify(outputCollector, times(1)).emit(arg.capture()); - - Values actual = arg.getValue(); - ProfileMeasurement measurement = (ProfileMeasurement) actual.get(0); - - // verify - assertThat(measurement.getValue(), equalTo(90.0)); - assertThat(measurement.getEntity(), equalTo("10.0.0.1")); - assertThat(measurement.getProfileName(), equalTo("test")); + ProfileConfig definition = createDefinition(profileOne); + + // apply a message to the profile + String entityOne = (String) messageOne.get("ip_src_addr"); + Tuple tupleOne = createTuple(entityOne, messageOne, definition); + bolt.execute(tupleOne); + bolt.execute(tupleOne); + + // apply a different message (with different entity) to the same profile + String entityTwo = (String) messageTwo.get("ip_src_addr"); + Tuple tupleTwo = createTuple(entityTwo, messageTwo, definition); + bolt.execute(tupleTwo); + + // validate - 2 messages applied + ProfileBuilder builderOne = bolt.getBuilder(tupleOne); + assertTrue(builderOne.isInitialized()); + assertEquals(2, (int) convert(builderOne.valueOf("x"), Integer.class)); + + // validate - 1 message applied + ProfileBuilder builderTwo = bolt.getBuilder(tupleTwo); + assertTrue(builderTwo.isInitialized()); + assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class)); + + assertNotSame(builderOne, builderTwo); } /** - * What happens if we try to flush, but have yet to receive any messages to - * apply to the profile? - * - * The ProfileBuilderBolt will not have received the data necessary from the - * ProfileSplitterBolt, like the entity and profile name, that is required - * to perform the flush. The flush has to be skipped until this information - * is received from the Splitter. + * The bolt should create separate ProfileBuilder objects to handle each + * profile/entity pair. */ @Test - public void testProfileFlushWithNoMessages() throws Exception { + public void testCreateProfileBuilderForEachProfile() throws Exception { - setup(basicProfile); + // setup - apply one message to different profile definitions ProfileBuilderBolt bolt = createBolt(); + String entity = (String) messageOne.get("ip_src_addr"); - // no messages have been received before a flush occurs - bolt.execute(mockTickTuple()); - bolt.execute(mockTickTuple()); - bolt.execute(mockTickTuple()); + // apply a message to the first profile + ProfileConfig definitionOne = createDefinition(profileOne); + Tuple tupleOne = createTuple(entity, messageOne, definitionOne); + bolt.execute(tupleOne); - // no ProfileMeasurement should be written to the ProfileStore - verify(outputCollector, times(0)).emit(any(Values.class)); - } - - /** - * The executor's state should be cleared after a flush. - */ - @Test - public void testStateClearedAfterFlush() throws Exception { + // apply the same message to the second profile + ProfileConfig definitionTwo = createDefinition(profileTwo); + Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo); + bolt.execute(tupleTwo); - setup(basicProfile); - ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - bolt.execute(tuple); + // validate - 1 message applied + ProfileBuilder builderOne = bolt.getBuilder(tupleOne); + assertTrue(builderOne.isInitialized()); + assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class)); - // execute - should clear state from previous tuples - bolt.execute(mockTickTuple()); + // validate - 1 message applied + ProfileBuilder builderTwo = bolt.getBuilder(tupleTwo); + assertTrue(builderTwo.isInitialized()); + assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class)); - ProfileState state = bolt.getProfileState(tuple); - assertThat(state.getExecutor().getState().size(), equalTo(0)); + assertNotSame(builderOne, builderTwo); } /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "true", - * "init": { "x": 10 }, - * "update": { "x": "x + 'string'" }, - * "result": "x" - * } - */ - @Multiline - private String profileWithBadUpdate; - - /** - * What happens when the profile contains a bad Stellar expression? + * A ProfileMeasurement should be emitted for each profile/entity currently being tracked + * by the bolt. */ @Test - public void testProfileWithBadUpdate() throws Exception { + public void testEmitMeasurementsOnFlush() throws Exception { - // setup - ensure the bad profile is used - setup(profileWithBadUpdate); - UnitTestHelper.setLog4jLevel(ProfileBuilderBolt.class, Level.FATAL); - // execute + // setup ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - UnitTestHelper.setLog4jLevel(ProfileBuilderBolt.class, Level.ERROR); - - // verify - expect the tuple to be acked and an error reported - verify(outputCollector, times(1)).ack(eq(tuple)); - verify(outputCollector, times(1)).reportError(any()); - } + final String entity = (String) messageOne.get("ip_src_addr"); - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "true", - * "init": { "x": "10 + 'string'" }, - * "update": { "x": "x + 2" }, - * "result": "x" - * } - */ - @Multiline - private String profileWithBadInit; + // apply the message to the first profile + ProfileConfig definitionOne = createDefinition(profileOne); + Tuple tupleOne = createTuple(entity, messageOne, definitionOne); + bolt.execute(tupleOne); - /** - * What happens when the profile contains a bad Stellar expression? - */ - @Test - public void testProfileWithBadInit() throws Exception { + // apply the same message to the second profile + ProfileConfig definitionTwo = createDefinition(profileTwo); + Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo); + bolt.execute(tupleTwo); - // setup - ensure the bad profile is used - setup(profileWithBadInit); + // execute - the tick tuple triggers a flush of the profile + bolt.execute(mockTickTuple()); - // execute - ProfileBuilderBolt bolt = createBolt(); - UnitTestHelper.setLog4jLevel(ProfileBuilderBolt.class, Level.FATAL); - bolt.execute(tuple); - UnitTestHelper.setLog4jLevel(ProfileBuilderBolt.class, Level.ERROR); - // verify - expect the tuple to be acked and an error reported - verify(outputCollector, times(1)).ack(eq(tuple)); - verify(outputCollector, times(1)).reportError(any()); - } + // capture the ProfileMeasurement that should be emitted + ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + verify(outputCollector, times(2)).emit(arg.capture()); - /** - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "true", - * "groupBy": ["2 + 2", "4 + 4"], - * "init": { "x": "0" }, - * "update": { "x": "x + 1" }, - * "result": "x" - * } - */ - @Multiline - private String profileWithGroupBy; + // validate + for(Values value : arg.getAllValues()) { - /** - * Ensure that the Profile's 'groupBy' are handled correctly. - */ - @Test - public void testProfileWithGroupBy() throws Exception { + ProfileMeasurement measurement = (ProfileMeasurement) value.get(0); + ProfileConfig definition = (ProfileConfig) value.get(1); - // setup - setup(profileWithGroupBy); - ProfileBuilderBolt bolt = createBolt(); - bolt.execute(tuple); - bolt.execute(tuple); + if(StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) { - // execute - the tick tuple triggers a flush of the profile - bolt.execute(mockTickTuple()); + // validate measurement emitted for profile two + assertEquals(definitionTwo, definition); + assertEquals(entity, measurement.getEntity()); + assertEquals(definitionTwo.getProfile(), measurement.getProfileName()); + assertEquals(1, (int) convert(measurement.getValue(), Integer.class)); - // capture the ProfileMeasurement that should be emitted - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); - verify(outputCollector, times(1)).emit(arg.capture()); + } else if(StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) { - Values actual = arg.getValue(); - ProfileMeasurement measurement = (ProfileMeasurement) actual.get(0); + // validate measurement emitted for profile one + assertEquals(definitionOne, definition); + assertEquals(entity, measurement.getEntity()); + assertEquals(definitionOne.getProfile(), measurement.getProfileName()); + assertEquals(1, (int) convert(measurement.getValue(), Integer.class)); - // verify the groups - assertThat(measurement.getGroups().size(), equalTo(2)); - assertThat(measurement.getGroups().get(0), equalTo(4.0)); - assertThat(measurement.getGroups().get(1), equalTo(8.0)); + } else { + fail(); + } + } } } diff --git a/metron-deployment/amazon-ec2/run.sh b/metron-deployment/amazon-ec2/run.sh index fac3f680cc..a1e260c7ed 100755 --- a/metron-deployment/amazon-ec2/run.sh +++ b/metron-deployment/amazon-ec2/run.sh @@ -59,9 +59,9 @@ echo "Launching Metron[$ENV] @ $NOW"... >> $LOGFILE $DEPLOYDIR/../scripts/platform-info.sh >> $LOGFILE # build metron -cd ../.. -mvn package -DskipTests -PHDP-2.5.0.0 -RC=$?; if [[ $RC != 0 ]]; then exit $RC; fi +#cd ../.. +#mvn package -DskipTests -PHDP-2.5.0.0 +#RC=$?; if [[ $RC != 0 ]]; then exit $RC; fi # deploy metron cd $DEPLOYDIR From deaaf4cb5066e1c03f20b45bb62a577724cdf22f Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 2 Dec 2016 18:07:43 -0500 Subject: [PATCH 2/7] METRON-606 Ensure that a new profile measurement is created on each flush --- .../profiler/client/GetProfileTest.java | 21 ++++- .../client/HBaseProfilerClientTest.java | 31 +++++-- .../metron/profiler/client/ProfileWriter.java | 26 +++--- .../metron/profiler/ProfileBuilder.java | 83 +++++++++---------- .../metron/profiler/ProfileMeasurement.java | 41 +++++---- .../metron/profiler/ProfileBuilderTest.java | 8 -- .../hbase/SaltyRowKeyBuilderTest.java | 34 ++++---- .../profiler/bolt/ProfileBuilderBolt.java | 1 - .../profiler/bolt/ProfileHBaseMapper.java | 10 +-- .../profiler/bolt/ProfileHBaseMapperTest.java | 7 +- 10 files changed, 144 insertions(+), 118 deletions(-) diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java index ce9965fb7e..95582e7c6f 100644 --- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java +++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java @@ -129,7 +129,11 @@ public void testWithNoGroups() { // setup - write some measurements to be read later final int count = hours * periodsPerHour; - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); + profileWriter.write(m, count, group, val -> expectedValue); // execute - read the profile values - no groups @@ -153,7 +157,10 @@ public void testWithOneGroup() { // setup - write some measurements to be read later final int count = hours * periodsPerHour; - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, count, group, val -> expectedValue); // create a variable that contains the groups to use @@ -180,7 +187,10 @@ public void testWithTwoGroups() { // setup - write some measurements to be read later final int count = hours * periodsPerHour; - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, count, group, val -> expectedValue); // create a variable that contains the groups to use @@ -224,7 +234,10 @@ public void testOutsideTimeHorizon() { final List group = Collections.emptyList(); // setup - write a single value from 2 hours ago - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, 1, group, val -> expectedValue); // create a variable that contains the groups to use diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java index 0076396163..ed75a65537 100644 --- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java +++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java @@ -113,7 +113,11 @@ public void testFetchWithDurationAgoAndOneGroup() throws Exception { final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours); // setup - write two groups of measurements - 'weekends' and 'weekdays' - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); + profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue); profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0); @@ -137,7 +141,10 @@ public void testFetchWithDurationAgoAndNoGroup() { final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours); // create two groups of measurements - one on weekdays and one on weekends - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekdays"), val -> expectedValue); profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekends"), val -> 0); @@ -160,7 +167,10 @@ public void testFetchWithDurationAgoAndOutsideTimeWindow() throws Exception { final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); // setup - write some values to read later - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, hours * periodsPerHour, group, val -> 1000); // execute @@ -183,7 +193,10 @@ public void testFetchWithStartEndAndOneGroup() throws Exception { final long startTime = endTime - TimeUnit.HOURS.toMillis(hours); // setup - write two groups of measurements - 'weekends' and 'weekdays' - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue); profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0); @@ -210,7 +223,10 @@ public void testFetchWithStartEndAndNoGroup() { final long startTime = endTime - TimeUnit.HOURS.toMillis(hours); // create two groups of measurements - one on weekdays and one on weekends - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(startTime, periodDuration, periodUnits); profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue); profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0); @@ -235,7 +251,10 @@ public void testFetchWithStartEndAndOutsideTimeWindow() throws Exception { final long measurementTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); // setup - write some values to read later - ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", measurementTime, periodDuration, periodUnits); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile1") + .withEntity("entity1") + .withPeriod(measurementTime, periodDuration, periodUnits); profileWriter.write(m, numberToWrite, group, val -> 1000); // execute diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java index a4ea8af872..6e2b11e173 100644 --- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java +++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java @@ -70,21 +70,18 @@ public void write(ProfileMeasurement prototype, int count, List group, F ProfileMeasurement m = prototype; for(int i=0; i new Random().nextInt(10)); diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java index 9696a60134..24426ca6fb 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -20,7 +20,7 @@ package org.apache.metron.profiler; -import org.apache.commons.beanutils.BeanMap; +import com.google.common.collect.ImmutableMap; import org.apache.commons.collections4.ListUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -58,10 +58,14 @@ public class ProfileBuilder { protected static final Logger LOG = LoggerFactory.getLogger(ProfileBuilder.class); /** - * A ProfileMeasurement is created and emitted each window period. A Profile - * itself is composed of many ProfileMeasurements. + * The name of the profile. */ - private ProfileMeasurement measurement; + private String profileName; + + /** + * The name of the entity. + */ + private String entity; /** * The definition of the Profile that the bolt is building. @@ -78,24 +82,25 @@ public class ProfileBuilder { */ private boolean isInitialized; + /** + * The duration of each period in milliseconds. + */ + private long periodDurationMillis; + /** * Use the ProfileBuilder.Builder to create a new ProfileBuilder. */ private ProfileBuilder(ProfileConfig definition, String entity, - long whenMillis, long periodDurationMillis, CuratorFramework client, Map global) { this.isInitialized = false; this.definition = definition; - this.measurement = new ProfileMeasurement( - definition.getProfile(), - entity, - whenMillis, - periodDurationMillis, - TimeUnit.MILLISECONDS); + this.profileName = definition.getProfile(); + this.entity = entity; + this.periodDurationMillis = periodDurationMillis; this.executor = new DefaultStellarExecutor(); Context context = new Context.Builder() .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) @@ -119,6 +124,15 @@ public void apply(JSONObject message) { assign(definition.getUpdate(), message, "update"); } + /** + * Used to determine when the profile measurement occurred. + * + * Ultimately, this needs refactored to handle wall clock versus event time. + */ + public long getTimestamp() { + return System.currentTimeMillis(); + } + /** * Flush the Profile. * @@ -127,18 +141,16 @@ public void apply(JSONObject message) { * @return Returns the completed profile measurement. */ public ProfileMeasurement flush() { - LOG.debug("Flushing profile: profile={}, entity={}", measurement.getProfileName(), measurement.getEntity()); + LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity); // execute the 'result' expression Object value = execute(definition.getResult(), new JSONObject(), "result"); - measurement.setValue(value); - // execute the 'groupBy' expression(s) - allow each expression to refer to the fields of the ProfileMeasurement - List groups = execute(definition.getGroupBy(), new BeanMap(measurement), "groupBy"); - measurement.setGroups(groups); + // execute the 'groupBy' expression(s) - can refer to value of 'result' expression + List groups = execute(definition.getGroupBy(), ImmutableMap.of("result", value), "groupBy"); - // execute the 'tickUpdate' expression(s) - assign(definition.getTickUpdate(), Collections.singletonMap("result", value),"tickUpdate"); + // execute the 'tickUpdate' expression(s) - can refer to value of 'result' expression + assign(definition.getTickUpdate(), ImmutableMap.of("result", value),"tickUpdate"); // save a copy of current state then clear it to prepare for the next window Map state = executor.getState(); @@ -151,7 +163,13 @@ public ProfileMeasurement flush() { }); isInitialized = false; - return measurement; + + return new ProfileMeasurement() + .withProfileName(profileName) + .withEntity(entity) + .withGroups(groups) + .withPeriod(getTimestamp(), periodDurationMillis, TimeUnit.MILLISECONDS) + .withValue(value); } /** @@ -188,12 +206,7 @@ private void assign(Map expressions, Map transie } catch(ParseException e) { // make it brilliantly clear that one of the 'update' expressions is bad - String msg = format( - "Bad '%s' expression: %s, profile=%s, entity=%s", - expressionType, - e.getMessage(), - measurement.getProfileName(), - measurement.getEntity()); + String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); throw new ParseException(msg, e); } } @@ -213,11 +226,7 @@ private List execute(List expressions, Map trans .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class))); } catch (Throwable e) { - String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", - expressionType, - e.getMessage(), - measurement.getProfileName(), - measurement.getEntity()); + String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); throw new ParseException(msg, e); } @@ -247,7 +256,6 @@ public static class Builder { private ProfileConfig definition; private String entity; - private long periodAt; private long periodDurationMillis; private CuratorFramework zookeeperClient; private Map global; @@ -268,14 +276,6 @@ public Builder withEntity(String entity) { return this; } - /** - * @param periodAtMillis A time within the profile period when the measurement was taken in epoch milliseconds. - */ - public Builder withPeriodAt(long periodAtMillis) { - this.periodAt = periodAtMillis; - return this; - } - /** * @param duration The duration of each profile period. * @param units The units used to specify the duration of the profile period. @@ -305,9 +305,7 @@ public Builder withZookeeperClient(CuratorFramework zookeeperClient) { * @param global The global configuration. */ public Builder withGlobalConfiguration(Map global) { - // TODO how does the profile builder ever seen a global that has been update in zookeeper? - this.global = global; return this; } @@ -320,12 +318,11 @@ public ProfileBuilder build() { if(definition == null) { throw new IllegalArgumentException("missing profiler definition; got null"); } - if(StringUtils.isEmpty(entity)) { throw new IllegalArgumentException(format("missing entity name; got '%s'", entity)); } - return new ProfileBuilder(definition, entity, periodAt, periodDurationMillis, zookeeperClient, global); + return new ProfileBuilder(definition, entity, periodDurationMillis, zookeeperClient, global); } } } diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java index b20dccf21b..bbd17a5b1c 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java @@ -58,18 +58,33 @@ public class ProfileMeasurement { */ private ProfilePeriod period; - /** - * @param profileName The name of the profile. - * @param entity The name of the entity being profiled. - * @param whenMillis When the measurement was taken in epoch milliseconds. - * @param periodDuration The duration of each profile period. - * @param periodUnits The units of the duration of each profile period. - */ - public ProfileMeasurement(String profileName, String entity, long whenMillis, long periodDuration, TimeUnit periodUnits) { + public ProfileMeasurement() { + this.groups = Collections.emptyList(); + } + + public ProfileMeasurement withProfileName(String profileName) { this.profileName = profileName; + return this; + } + + public ProfileMeasurement withEntity(String entity) { this.entity = entity; + return this; + } + + public ProfileMeasurement withValue(Object value) { + this.value = value; + return this; + } + + public ProfileMeasurement withGroups(List groups) { + this.groups = groups; + return this; + } + + public ProfileMeasurement withPeriod(long whenMillis, long periodDuration, TimeUnit periodUnits) { this.period = new ProfilePeriod(whenMillis, periodDuration, periodUnits); - this.groups = Collections.emptyList(); + return this; } public String getProfileName() { @@ -88,18 +103,10 @@ public ProfilePeriod getPeriod() { return period; } - public void setValue(Object value) { - this.value = value; - } - public List getGroups() { return groups; } - public void setGroups(List groups) { - this.groups = groups; - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java index eda6b6bd8f..b97b0e7b9d 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java @@ -80,7 +80,6 @@ public void testInit() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -103,7 +102,6 @@ public void testInitWithNoMessage() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -142,7 +140,6 @@ public void testUpdate() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -178,7 +175,6 @@ public void testResult() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -212,7 +208,6 @@ public void testGroupBy() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -251,7 +246,6 @@ public void testFlushClearsState() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -291,7 +285,6 @@ public void testEntity() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity(entity) - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); @@ -323,7 +316,6 @@ public void testTickUpdate() throws Exception { builder = new ProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") - .withPeriodAt(System.currentTimeMillis()) .withPeriodDuration(10, TimeUnit.MINUTES) .build(); diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java index c83f998176..5d7d121824 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler.hbase; +import org.apache.commons.collections.CollectionUtils; import org.apache.storm.tuple.Tuple; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.metron.profiler.ProfilePeriod; @@ -64,8 +65,10 @@ public class SaltyRowKeyBuilderTest { public void setup() throws Exception { // a profile measurement - measurement = new ProfileMeasurement("profile", "entity", AUG2016, periodDuration, periodUnits); - measurement.setValue(22); + measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(AUG2016, periodDuration, periodUnits); // the tuple will contain the original message tuple = mock(Tuple.class); @@ -80,8 +83,7 @@ public void setup() throws Exception { @Test public void testRowKeyWithOneGroup() throws Exception { // setup - List groups = Arrays.asList("group1"); - measurement.setGroups(groups); + measurement.withGroups(Arrays.asList("group1")); // the expected row key ByteBuffer buffer = ByteBuffer @@ -107,8 +109,7 @@ public void testRowKeyWithOneGroup() throws Exception { @Test public void testRowKeyWithTwoGroups() throws Exception { // setup - List groups = Arrays.asList("group1","group2"); - measurement.setGroups(groups); + measurement.withGroups(Arrays.asList("group1","group2")); // the expected row key ByteBuffer buffer = ByteBuffer @@ -135,8 +136,7 @@ public void testRowKeyWithTwoGroups() throws Exception { @Test public void testRowKeyWithOneIntegerGroup() throws Exception { // setup - List groups = Arrays.asList(200); - measurement.setGroups(groups); + measurement.withGroups(Arrays.asList(200)); // the expected row key ByteBuffer buffer = ByteBuffer @@ -162,8 +162,7 @@ public void testRowKeyWithOneIntegerGroup() throws Exception { @Test public void testRowKeyWithMixedGroups() throws Exception { // setup - List groups = Arrays.asList(200, "group1"); - measurement.setGroups(groups); + measurement.withGroups(Arrays.asList(200, "group1")); // the expected row key ByteBuffer buffer = ByteBuffer @@ -190,8 +189,7 @@ public void testRowKeyWithMixedGroups() throws Exception { @Test public void testRowKeyWithNoGroup() throws Exception { // setup - List groups = Collections.emptyList(); - measurement.setGroups(groups); + measurement.withGroups(Collections.emptyList()); // the expected row key ByteBuffer buffer = ByteBuffer @@ -224,8 +222,11 @@ public void testRowKeys() throws Exception { // a dummy profile measurement long now = System.currentTimeMillis(); long oldest = now - TimeUnit.HOURS.toMillis(hoursAgo); - ProfileMeasurement m = new ProfileMeasurement("profile", "entity", oldest, periodDuration, periodUnits); - m.setValue(22); + ProfileMeasurement m = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(oldest, periodDuration, periodUnits) + .withValue(22); // generate a list of expected keys List expectedKeys = new ArrayList<>(); @@ -237,7 +238,10 @@ public void testRowKeys() throws Exception { // advance to the next period ProfilePeriod next = m.getPeriod().next(); - m = new ProfileMeasurement("profile", "entity", next.getStartTimeMillis(), periodDuration, periodUnits); + m = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(next.getStartTimeMillis(), periodDuration, periodUnits); } // execute diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index d3bd164bf3..30499085d9 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -192,7 +192,6 @@ protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException { () -> new ProfileBuilder.Builder() .withDefinition(getField("profile", tuple, ProfileConfig.class)) .withEntity(getField("entity", tuple, String.class)) - .withPeriodAt(getTimestamp()) .withPeriodDurationMillis(periodDurationMillis) .withGlobalConfiguration(getConfigurations().getGlobalConfig()) .withZookeeperClient(client) diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java index fdea3e904b..3e7b4d43a3 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java @@ -20,10 +20,7 @@ package org.apache.metron.profiler.bolt; -import org.apache.storm.tuple.Tuple; -import org.apache.commons.beanutils.BeanMap; import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.dsl.ParseException; import org.apache.metron.hbase.bolt.mapper.ColumnList; import org.apache.metron.hbase.bolt.mapper.HBaseMapper; import org.apache.metron.profiler.ProfileMeasurement; @@ -31,15 +28,10 @@ import org.apache.metron.profiler.hbase.RowKeyBuilder; import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; -import org.apache.metron.profiler.stellar.StellarExecutor; +import org.apache.storm.tuple.Tuple; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; -import static java.lang.String.format; -import static org.apache.commons.collections.CollectionUtils.isEmpty; - /** * An HbaseMapper that defines how a ProfileMeasurement is persisted within an HBase table. */ diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java index f468bbf0a3..7e0606e56f 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java @@ -59,8 +59,11 @@ public void setup() { mapper = new ProfileHBaseMapper(); mapper.setRowKeyBuilder(rowKeyBuilder); - measurement = new ProfileMeasurement("profile", "entity", 20000, 15, TimeUnit.MINUTES); - measurement.setValue(22); + measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withValue(22); profile = new ProfileConfig(); From cad4674df2eaf67bcfbc8de3cc8fa74a8f1a2b77 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 2 Dec 2016 18:34:46 -0500 Subject: [PATCH 3/7] METRON-606 Added a unit test specifically for METRON-606 --- .../metron/profiler/ProfileBuilder.java | 28 +++++++----- .../apache/metron/profiler/clock/Clock.java | 15 +++++++ .../metron/profiler/clock/FixedClock.java | 18 ++++++++ .../metron/profiler/clock/WallClock.java | 12 ++++++ .../metron/profiler/ProfileBuilderTest.java | 43 +++++++++++++++++++ .../profiler/bolt/ProfileBuilderBolt.java | 10 +---- metron-deployment/amazon-ec2/run.sh | 6 +-- 7 files changed, 110 insertions(+), 22 deletions(-) create mode 100644 metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java create mode 100644 metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java create mode 100644 metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java index 24426ca6fb..d62f12f261 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -29,6 +29,8 @@ import org.apache.metron.common.dsl.Context; import org.apache.metron.common.dsl.ParseException; import org.apache.metron.common.dsl.StellarFunctions; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.WallClock; import org.apache.metron.profiler.stellar.DefaultStellarExecutor; import org.apache.metron.profiler.stellar.StellarExecutor; import org.json.simple.JSONObject; @@ -87,11 +89,17 @@ public class ProfileBuilder { */ private long periodDurationMillis; + /** + * A clock is used to tell time; imagine that. + */ + private Clock clock; + /** * Use the ProfileBuilder.Builder to create a new ProfileBuilder. */ private ProfileBuilder(ProfileConfig definition, String entity, + Clock clock, long periodDurationMillis, CuratorFramework client, Map global) { @@ -100,6 +108,7 @@ private ProfileBuilder(ProfileConfig definition, this.definition = definition; this.profileName = definition.getProfile(); this.entity = entity; + this.clock = clock; this.periodDurationMillis = periodDurationMillis; this.executor = new DefaultStellarExecutor(); Context context = new Context.Builder() @@ -124,15 +133,6 @@ public void apply(JSONObject message) { assign(definition.getUpdate(), message, "update"); } - /** - * Used to determine when the profile measurement occurred. - * - * Ultimately, this needs refactored to handle wall clock versus event time. - */ - public long getTimestamp() { - return System.currentTimeMillis(); - } - /** * Flush the Profile. * @@ -168,7 +168,7 @@ public ProfileMeasurement flush() { .withProfileName(profileName) .withEntity(entity) .withGroups(groups) - .withPeriod(getTimestamp(), periodDurationMillis, TimeUnit.MILLISECONDS) + .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS) .withValue(value); } @@ -259,6 +259,12 @@ public static class Builder { private long periodDurationMillis; private CuratorFramework zookeeperClient; private Map global; + private Clock clock = new WallClock(); + + public Builder withClock(Clock clock) { + this.clock = clock; + return this; + } /** * @param definition The profiler definition. @@ -322,7 +328,7 @@ public ProfileBuilder build() { throw new IllegalArgumentException(format("missing entity name; got '%s'", entity)); } - return new ProfileBuilder(definition, entity, periodDurationMillis, zookeeperClient, global); + return new ProfileBuilder(definition, entity, clock, periodDurationMillis, zookeeperClient, global); } } } diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java new file mode 100644 index 0000000000..54ce4c6923 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java @@ -0,0 +1,15 @@ +package org.apache.metron.profiler.clock; + +/** + * A clock can tell time; imagine that. + * + * This allows the Profiler to support different treatments of time like wall clock versus event time. + */ +public interface Clock { + + /** + * The current time in epoch milliseconds. + */ + long currentTimeMillis(); + +} diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java new file mode 100644 index 0000000000..75e2a7e706 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java @@ -0,0 +1,18 @@ +package org.apache.metron.profiler.clock; + +/** + * A clock that reports whatever time you tell it to. Most useful for testing. + */ +public class FixedClock implements Clock { + + private long epochMillis; + + public void setTime(long epochMillis) { + this.epochMillis = epochMillis; + } + + @Override + public long currentTimeMillis() { + return this.epochMillis; + } +} diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java new file mode 100644 index 0000000000..a969412eba --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java @@ -0,0 +1,12 @@ +package org.apache.metron.profiler.clock; + +/** + * A clock that uses the system clock to provide wall clock time. + */ +public class WallClock implements Clock { + + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } +} diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java index b97b0e7b9d..c9873afc0e 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java @@ -23,6 +23,8 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.FixedClock; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.junit.Before; @@ -186,6 +188,47 @@ public void testResult() throws Exception { assertEquals(100, (int) convert(m.getValue(), Integer.class)); } + /** + * Ensure that time advances properly on each flush. + */ + @Test + public void testProfilePeriodOnFlush() throws Exception { + // setup + FixedClock clock = new FixedClock(); + clock.setTime(100); + + definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); + builder = new ProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withClock(clock) + .build(); + + { + // apply a message and flush + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate the profile period + ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); + assertEquals(expected, m.getPeriod()); + } + { + // advance time by at least one period - 10 minutes + clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10)); + + // apply a message and flush again + builder.apply(message); + ProfileMeasurement m = builder.flush(); + + // validate the profile period + ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); + assertEquals(expected, m.getPeriod()); + } + } + + /** * { * "profile": "test", diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index 30499085d9..1fcba3087d 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -27,6 +27,7 @@ import org.apache.metron.common.utils.ConversionUtils; import org.apache.metron.profiler.ProfileBuilder; import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.clock.WallClock; import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -195,6 +196,7 @@ protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException { .withPeriodDurationMillis(periodDurationMillis) .withGlobalConfiguration(getConfigurations().getGlobalConfig()) .withZookeeperClient(client) + .withClock(new WallClock()) .build()); } @@ -215,14 +217,6 @@ private T getField(String fieldName, Tuple tuple, Class clazz) { return value; } - /** - * Returns a value that can be used as the current timestamp. Allows subclasses - * to override, if necessary. - */ - private long getTimestamp() { - return System.currentTimeMillis(); - } - public ProfileBuilderBolt withPeriodDurationMillis(long periodDurationMillis) { this.periodDurationMillis = periodDurationMillis; return this; diff --git a/metron-deployment/amazon-ec2/run.sh b/metron-deployment/amazon-ec2/run.sh index a1e260c7ed..fac3f680cc 100755 --- a/metron-deployment/amazon-ec2/run.sh +++ b/metron-deployment/amazon-ec2/run.sh @@ -59,9 +59,9 @@ echo "Launching Metron[$ENV] @ $NOW"... >> $LOGFILE $DEPLOYDIR/../scripts/platform-info.sh >> $LOGFILE # build metron -#cd ../.. -#mvn package -DskipTests -PHDP-2.5.0.0 -#RC=$?; if [[ $RC != 0 ]]; then exit $RC; fi +cd ../.. +mvn package -DskipTests -PHDP-2.5.0.0 +RC=$?; if [[ $RC != 0 ]]; then exit $RC; fi # deploy metron cd $DEPLOYDIR From d5bfa09248018ad009ac20ccf4032082f4c06d74 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 2 Dec 2016 18:53:04 -0500 Subject: [PATCH 4/7] METRON-606 Added missing ASF license headers --- .../apache/metron/profiler/clock/Clock.java | 20 +++++++++++++++++++ .../metron/profiler/clock/FixedClock.java | 20 +++++++++++++++++++ .../metron/profiler/clock/WallClock.java | 20 +++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java index 54ce4c6923..6730e49079 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.metron.profiler.clock; /** diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java index 75e2a7e706..682d2b785b 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.metron.profiler.clock; /** diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java index a969412eba..eb4b33394a 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.metron.profiler.clock; /** From 8b0d84336503898300cf17ddc8ac78c8c25932d3 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 2 Dec 2016 20:15:57 -0500 Subject: [PATCH 5/7] METRON-606 Exception message can cause an NPE --- .../apache/metron/profiler/stellar/DefaultStellarExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java index ad748f6a02..737867cdbd 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler.stellar; +import org.apache.commons.lang.ClassUtils; import org.apache.metron.common.dsl.Context; import org.apache.metron.common.dsl.functions.resolver.FunctionResolver; import org.apache.metron.common.dsl.MapVariableResolver; @@ -119,7 +120,7 @@ public T execute(String expression, Map state, Class claz T result = ConversionUtils.convert(resultObject, clazz); if (result == null) { throw new IllegalArgumentException(String.format("Unexpected type: expected=%s, actual=%s, expression=%s", - clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expression)); + clazz.getSimpleName(), ClassUtils.getShortClassName(resultObject,"null"), expression)); } return result; From 7ffb603ed8aa3e90a73a36166914560b6ffdcd86 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Sat, 3 Dec 2016 10:12:26 -0500 Subject: [PATCH 6/7] METRON-606 Resolving NotSerializableException issues --- .../java/org/apache/metron/profiler/ProfileBuilder.java | 3 ++- .../java/org/apache/metron/profiler/clock/FixedClock.java | 4 +++- .../java/org/apache/metron/profiler/clock/WallClock.java | 4 +++- .../profiler/stellar/DefaultStellarExecutorTest.java | 8 ++++++++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java index d62f12f261..2f1bc93d45 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -55,7 +56,7 @@ * pairing. There will exist many instances, one for each [profile, entity] pair that exists * within the incoming telemetry data applied to the profile. */ -public class ProfileBuilder { +public class ProfileBuilder implements Serializable { protected static final Logger LOG = LoggerFactory.getLogger(ProfileBuilder.class); diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java index 682d2b785b..c6e93cde16 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java @@ -20,10 +20,12 @@ package org.apache.metron.profiler.clock; +import java.io.Serializable; + /** * A clock that reports whatever time you tell it to. Most useful for testing. */ -public class FixedClock implements Clock { +public class FixedClock implements Clock, Serializable { private long epochMillis; diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java index eb4b33394a..1a20c94112 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java @@ -20,10 +20,12 @@ package org.apache.metron.profiler.clock; +import java.io.Serializable; + /** * A clock that uses the system clock to provide wall clock time. */ -public class WallClock implements Clock { +public class WallClock implements Clock, Serializable { @Override public long currentTimeMillis() { diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java index d90c6996f4..3110329a8a 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java @@ -22,6 +22,11 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.dsl.Context; +import org.apache.metron.common.dsl.functions.StringFunctions; +import org.apache.metron.common.dsl.functions.resolver.ClasspathFunctionResolver; +import org.apache.metron.common.dsl.functions.resolver.FunctionResolver; +import org.apache.metron.common.dsl.functions.resolver.SimpleFunctionResolver; +import org.apache.metron.common.field.validation.primitive.IntegerValidation; import org.apache.metron.profiler.stellar.DefaultStellarExecutor; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -66,6 +71,9 @@ public void setup() throws ParseException { // create the executor to test executor = new DefaultStellarExecutor(); executor.setContext(Context.EMPTY_CONTEXT()); + + ClasspathFunctionResolver resolver = new ClasspathFunctionResolver(); + executor.setFunctionResolver(resolver); } /** From 77d69e2c74ed4a59debba099cdd41a513bc8bf40 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Sat, 3 Dec 2016 10:26:12 -0500 Subject: [PATCH 7/7] METRON-606 Let's prevent future abuse --- .../apache/metron/profiler/stellar/DefaultStellarExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java index 737867cdbd..27d23e2fe7 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler.stellar; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang.ClassUtils; import org.apache.metron.common.dsl.Context; import org.apache.metron.common.dsl.functions.resolver.FunctionResolver; @@ -77,7 +78,7 @@ public DefaultStellarExecutor(Map initialState) { */ @Override public Map getState() { - return new HashMap<>(state); + return ImmutableMap.copyOf(state); } /**