From 56c35d4cb2d97c4c34715f4b717f218cc9c8b650 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 10 Jul 2018 15:42:19 -0400 Subject: [PATCH] Moving message timestamp determination to the MessageRouter. Logic needed by both Storm and Spark --- .../profiler/DefaultMessageDistributor.java | 6 +- .../metron/profiler/DefaultMessageRouter.java | 34 ++++++++-- .../metron/profiler/MessageDistributor.java | 4 +- .../apache/metron/profiler/MessageRoute.java | 64 ++++++++++++++++++- .../metron/profiler/StandAloneProfiler.java | 26 ++------ .../DefaultMessageDistributorTest.java | 32 +++++----- .../profiler/DefaultMessageRouterTest.java | 55 ++++++++++++++++ .../profiler/bolt/ProfileBuilderBolt.java | 4 +- .../profiler/bolt/ProfileSplitterBolt.java | 37 ++++------- .../profiler/bolt/ProfileBuilderBoltTest.java | 2 +- .../bolt/ProfileSplitterBoltTest.java | 5 +- 11 files changed, 189 insertions(+), 80 deletions(-) diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index 82f7174fbe..037fece2a4 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -146,16 +146,14 @@ public DefaultMessageDistributor( /** * Distribute a message along a MessageRoute. * - * @param message The message that needs distributed. - * @param timestamp The timestamp of the message. * @param route The message route. * @param context The Stellar execution context. */ @Override - public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) { + public void distribute(MessageRoute route, Context context) { try { ProfileBuilder builder = getBuilder(route, context); - builder.apply(message, timestamp); + builder.apply(route.getMessage(), route.getTimestamp()); } catch(ExecutionException e) { LOG.error("Unexpected error", e); diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java index d1a1a3b597..c68929e8da 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java @@ -22,6 +22,9 @@ import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.ClockFactory; +import org.apache.metron.profiler.clock.DefaultClockFactory; import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; import org.apache.metron.stellar.common.StellarStatefulExecutor; import org.apache.metron.stellar.dsl.Context; @@ -53,10 +56,16 @@ public class DefaultMessageRouter implements MessageRouter { */ private StellarStatefulExecutor executor; + /** + * Responsible for creating the {@link Clock}. + */ + private ClockFactory clockFactory; + public DefaultMessageRouter(Context context) { this.executor = new DefaultStellarStatefulExecutor(); StellarFunctions.initialize(context); executor.setContext(context); + clockFactory = new DefaultClockFactory(); } /** @@ -73,7 +82,8 @@ public List route(JSONObject message, ProfilerConfig config, Conte // attempt to route the message to each of the profiles for (ProfileConfig profile: config.getProfiles()) { - Optional route = routeToProfile(message, profile); + Clock clock = clockFactory.createClock(config); + Optional route = routeToProfile(message, profile, clock); route.ifPresent(routes::add); } @@ -86,20 +96,24 @@ public List route(JSONObject message, ProfilerConfig config, Conte * @param profile The profile that may need the message. * @return A MessageRoute if the message is needed by the profile. */ - private Optional routeToProfile(JSONObject message, ProfileConfig profile) { + private Optional routeToProfile(JSONObject message, ProfileConfig profile, Clock clock) { Optional route = Optional.empty(); // allow the profile to access the fields defined within the message @SuppressWarnings("unchecked") final Map state = (Map) message; - try { // is this message needed by this profile? if (executor.execute(profile.getOnlyif(), state, Boolean.class)) { - // what is the name of the entity in this message? - String entity = executor.execute(profile.getForeach(), state, String.class); - route = Optional.of(new MessageRoute(profile, entity)); + // what time is is? could be either system or event time + Optional timestamp = clock.currentTimeMillis(message); + if(timestamp.isPresent()) { + + // what is the name of the entity in this message? + String entity = executor.execute(profile.getForeach(), state, String.class); + route = Optional.of(new MessageRoute(profile, entity, message, timestamp.get())); + } } } catch(Throwable e) { @@ -110,4 +124,12 @@ private Optional routeToProfile(JSONObject message, ProfileConfig return route; } + + public void setExecutor(StellarStatefulExecutor executor) { + this.executor = executor; + } + + public void setClockFactory(ClockFactory clockFactory) { + this.clockFactory = clockFactory; + } } diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java index ea5be0f9c6..b164207f87 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java @@ -43,12 +43,10 @@ public interface MessageDistributor { /** * Distribute a message along a {@link MessageRoute}. * - * @param message The message that needs distributed. - * @param timestamp The timestamp of the message. * @param route The message route. * @param context The Stellar execution context. */ - void distribute(JSONObject message, long timestamp, MessageRoute route, Context context); + void distribute(MessageRoute route, Context context); /** * Flush all active profiles. diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java index 7288f03468..89d411c032 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java @@ -20,7 +20,11 @@ package org.apache.metron.profiler; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.json.simple.JSONObject; /** * Defines the 'route' a message must take through the Profiler. @@ -45,15 +49,27 @@ public class MessageRoute { */ private String entity; + /** + * The message taking this route. + */ + private JSONObject message; + + /** + * The timestamp of the message. + */ + private Long timestamp; + /** * Create a {@link MessageRoute}. * * @param profileDefinition The profile definition. - * @param entity The entity. + * @param entity The entity. */ - public MessageRoute(ProfileConfig profileDefinition, String entity) { + public MessageRoute(ProfileConfig profileDefinition, String entity, JSONObject message, Long timestamp) { this.entity = entity; this.profileDefinition = profileDefinition; + this.message = message; + this.timestamp = timestamp; } public String getEntity() { @@ -71,4 +87,48 @@ public ProfileConfig getProfileDefinition() { public void setProfileDefinition(ProfileConfig profileDefinition) { this.profileDefinition = profileDefinition; } + + public JSONObject getMessage() { + return message; + } + + public void setMessage(JSONObject message) { + this.message = message; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MessageRoute that = (MessageRoute) o; + return new EqualsBuilder() + .append(profileDefinition, that.profileDefinition) + .append(entity, that.entity) + .append(message, that.message) + .append(timestamp, that.timestamp) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(profileDefinition) + .append(entity) + .append(message) + .append(timestamp) + .toHashCode(); + } } diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java index f79efe6568..befa29685e 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java @@ -112,26 +112,14 @@ public StandAloneProfiler(ProfilerConfig config, * @param message The message to apply. */ public void apply(JSONObject message) { - - // what time is it? - Clock clock = clockFactory.createClock(config); - Optional timestamp = clock.currentTimeMillis(message); - - // can only route the message, if we have a timestamp - if(timestamp.isPresent()) { - - // route the message to the correct profile builders - List routes = router.route(message, config, context); - for (MessageRoute route : routes) { - distributor.distribute(message, timestamp.get(), route, context); - } - - routeCount += routes.size(); - messageCount += 1; - - } else { - LOG.warn("No timestamp available for the message. The message will be ignored."); + // route the message to the correct profile builders + List routes = router.route(message, config, context); + for (MessageRoute route : routes) { + distributor.distribute(route, context); } + + routeCount += routes.size(); + messageCount += 1; } /** diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java index ea9c5c66f9..48161e25a1 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -123,10 +123,10 @@ public void testDistribute() throws Exception { long timestamp = 100; ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, timestamp); // distribute one message and flush - distributor.distribute(messageOne, timestamp, route, context); + distributor.distribute(route, context); List measurements = distributor.flush(); // expect one measurement coming from one profile @@ -144,12 +144,12 @@ public void testDistributeWithTwoProfiles() throws Exception { String entity = (String) messageOne.get("ip_src_addr"); // distribute one message to the first profile - MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity); - distributor.distribute(messageOne, timestamp, routeOne, context); + MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity, messageOne, timestamp); + distributor.distribute(routeOne, context); // distribute another message to the second profile, but same entity - MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity); - distributor.distribute(messageOne, timestamp, routeTwo, context); + MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity, messageOne, timestamp); + distributor.distribute(routeTwo, context); // expect 2 measurements; 1 for each profile List measurements = distributor.flush(); @@ -164,13 +164,13 @@ public void testDistributeWithTwoEntities() throws Exception { // distribute one message String entityOne = (String) messageOne.get("ip_src_addr"); - MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne); - distributor.distribute(messageOne, timestamp, routeOne, context); + MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne, messageOne, timestamp); + distributor.distribute(routeOne, context); // distribute another message with a different entity String entityTwo = (String) messageTwo.get("ip_src_addr"); - MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo); - distributor.distribute(messageTwo, timestamp, routeTwo, context); + MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo, messageTwo, timestamp); + distributor.distribute(routeTwo, context); // expect 2 measurements; 1 for each entity List measurements = distributor.flush(); @@ -190,7 +190,7 @@ public void testNotYetTimeToExpireProfiles() throws Exception { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, @@ -198,7 +198,7 @@ public void testNotYetTimeToExpireProfiles() throws Exception { ticker); // distribute one message - distributor.distribute(messageOne, 1000000, route, context); + distributor.distribute(route, context); // advance time to just shy of the profile TTL ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS); @@ -220,7 +220,7 @@ public void testProfilesShouldExpire() throws Exception { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, @@ -228,7 +228,7 @@ public void testProfilesShouldExpire() throws Exception { ticker); // distribute one message - distributor.distribute(messageOne, 100000, route, context); + distributor.distribute(route, context); // advance time to just beyond the period duration ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS); @@ -251,7 +251,7 @@ public void testExpiredProfilesShouldBeRemoved() throws Exception { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, @@ -259,7 +259,7 @@ public void testExpiredProfilesShouldBeRemoved() throws Exception { ticker); // distribute one message - distributor.distribute(messageOne, 1000000, route, context); + distributor.distribute(route, context); // advance time a couple of hours ticker.advanceTime(2, HOURS); diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java index 534f155f1d..f583c30ab7 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java @@ -55,6 +55,17 @@ public class DefaultMessageRouterTest { private String inputTwo; private JSONObject messageTwo; + /** + * { + * "ip_src_addr": "10.0.0.1", + * "value": "22", + * "timestamp": 1531250226659 + * } + */ + @Multiline + private String inputWithTimestamp; + private JSONObject messageWithTimestamp; + /** * { * "profiles": [ ] @@ -175,6 +186,23 @@ public class DefaultMessageRouterTest { @Multiline private String goodAndBad; + /** + * { + * "profiles": [ + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private String profileWithEventTime; + private DefaultMessageRouter router; private Context context; @@ -193,6 +221,7 @@ public void setup() throws Exception { JSONParser parser = new JSONParser(); this.messageOne = (JSONObject) parser.parse(inputOne); this.messageTwo = (JSONObject) parser.parse(inputTwo); + this.messageWithTimestamp = (JSONObject) parser.parse(inputWithTimestamp); } @Test @@ -268,4 +297,30 @@ public void testWithGoodAndBad() throws Exception { assertEquals("good-profile", route1.getProfileDefinition().getProfile()); assertEquals(messageOne.get("ip_src_addr"), route1.getEntity()); } + + /** + * + */ + @Test + public void testMessageWithTimestamp() throws Exception { + List routes = router.route(messageWithTimestamp, createConfig(profileWithEventTime), context);; + + assertEquals(1, routes.size()); + MessageRoute route1 = routes.get(0); + assertEquals("profile-one", route1.getProfileDefinition().getProfile()); + assertEquals(messageWithTimestamp.get("ip_src_addr"), route1.getEntity()); + assertEquals(messageWithTimestamp.get("timestamp"), route1.getTimestamp()); + } + + /** + * If the timestamp of a message cannot be determined, it should not be routed. + * + *

This might happen when using event time and the message is missing the timestamp field. + */ + @Test + public void testMessageWithMissingTimestamp() throws Exception { + // messageOne does not contain a timestamp + List routes = router.route(messageOne, createConfig(profileWithEventTime), context); + assertEquals(0, routes.size()); + } } diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index 0d1f27ea72..f9c0edd72d 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -363,9 +363,9 @@ private void handleMessage(Tuple input) { activeFlushSignal.update(timestamp); // distribute the message - MessageRoute route = new MessageRoute(definition, entity); + MessageRoute route = new MessageRoute(definition, entity, message, timestamp); synchronized (messageDistributor) { - messageDistributor.distribute(message, timestamp, route, getStellarContext()); + messageDistributor.distribute(route, getStellarContext()); } LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp); diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java index f28411f647..f57deb7eea 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java @@ -96,11 +96,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { */ private transient MessageRouter router; - /** - * Responsible for creating the {@link Clock}. - */ - private transient ClockFactory clockFactory; - /** * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt. */ @@ -114,10 +109,9 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll this.collector = collector; this.parser = new JSONParser(); this.router = new DefaultMessageRouter(getStellarContext()); - this.clockFactory = new DefaultClockFactory(); } - private Context getStellarContext() { + public Context getStellarContext() { Map global = getConfigurations().getGlobalConfig(); return new Context.Builder() .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) @@ -162,15 +156,9 @@ private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingEx // ensure there is a valid profiler configuration ProfilerConfig config = getProfilerConfig(); if(config != null && config.getProfiles().size() > 0) { + routeMessage(input, message, config); - // what time is it? - Clock clock = clockFactory.createClock(config); - Optional timestamp = clock.currentTimeMillis(message); - - // route the message. if a message does not contain the timestamp field, it cannot be routed. - timestamp.ifPresent(ts -> routeMessage(input, message, config, ts)); - - } else { + } else if(LOG.isDebugEnabled()) { LOG.debug("No Profiler configuration found. Nothing to do."); } } @@ -180,24 +168,23 @@ private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingEx * @param input The input tuple on which to anchor. * @param message The telemetry message. * @param config The Profiler configuration. - * @param timestamp The timestamp of the telemetry message. */ - private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config, Long timestamp) { + private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config) { // emit a tuple for each 'route' List routes = router.route(message, config, getStellarContext()); for (MessageRoute route : routes) { - Values values = createValues(message, timestamp, route); + Values values = createValues(route); collector.emit(input, values); LOG.debug("Found route for message; profile={}, entity={}, timestamp={}", route.getProfileDefinition().getProfile(), route.getEntity(), - timestamp); + route.getTimestamp()); } - LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp); + LOG.debug("Found {} route(s) for message", routes.size()); } /** @@ -222,22 +209,20 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { /** * Creates the {@link Values} attached to the outgoing tuple. * - * @param message The telemetry message. - * @param timestamp The timestamp of the message. * @param route The route the message must take. * @return */ - private Values createValues(JSONObject message, Long timestamp, MessageRoute route) { + private Values createValues(MessageRoute route) { // the order here must match `declareOutputFields` - return new Values(message, timestamp, route.getEntity(), route.getProfileDefinition()); + return new Values(route.getMessage(), route.getTimestamp(), route.getEntity(), route.getProfileDefinition()); } protected MessageRouter getMessageRouter() { return router; } - public void setClockFactory(ClockFactory clockFactory) { - this.clockFactory = clockFactory; + public void setRouter(MessageRouter router) { + this.router = router; } } diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 3d009fb46b..3132ae62df 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -127,7 +127,7 @@ public void testExtractMessage() throws Exception { bolt.execute(tupleWindow); // the message should have been extracted from the tuple and passed to the MessageDistributor - verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any()); + verify(distributor).distribute(any(MessageRoute.class), any()); } diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java index bf819239ef..d57e8257b9 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java @@ -23,6 +23,7 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.DefaultMessageRouter; import org.apache.metron.profiler.clock.FixedClockFactory; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.test.bolt.BaseBoltTest; @@ -428,7 +429,9 @@ private ProfileSplitterBolt createBolt(ProfilerConfig config) throws Exception { bolt.prepare(new HashMap<>(), topologyContext, outputCollector); // set the clock factory AFTER calling prepare to use the fixed clock factory - bolt.setClockFactory(new FixedClockFactory(timestamp)); + DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext()); + router.setClockFactory(new FixedClockFactory(timestamp)); + bolt.setRouter(router); return bolt; }