Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-590 Enable Use of Event Time in Profiler #395

Closed
wants to merge 14 commits into from

Conversation

nickwallen
Copy link
Contributor

@nickwallen nickwallen commented Dec 14, 2016

METRON-590

Usage

Creating Profiles with Live Data

The most common Profiler use case is creating profiles on live, streaming data. In this case the processing time and event time will normally remain close, but could differ under abnormal conditions. Event time processing has some significant advantages in this scenario.

  • Profiles are not skewed by high demand that might delay the Profiler's processing.
  • Allows the Profiler to take planned/unplanned outages and pick up where it left off
  • Produces more accurate behavioral profiles when there is a time difference between when a behavior occurs and when the telemetry produced to tell us about that behavior is received. For example, think of a sensor that collects data in batches or mini-batches where we get data on regular intervals; every 10 minutes, hourly, etc.

Creating Profiles with Replayed Data

The other use case that this positions us for is creating profiles from replayed or reprocessed archival data.

  • For example, I am creating a model based on a new feature that the Profiler is generating for me. When I move that model into Production, I need a historical view of that feature, to train my model. I can replay archived telemetry through the Profiler generating that history of my new feature. This allows the model to be effective in Production on day 1.

It is important to note that this PR doesn't actually deliver all we need to handle replaying data. This just provides one critical component. I don't want to give anyone the impression that this PR allows us to replay data at this point in time. METRON-594 is tracking the greater effort of replaying telemetry data through the Profiler.

Changes

  • Added event time processing support to the Profiler. Previously the Profiler only supported processing time aka wall clock time processing. Event time processing is advantageous as it is not susceptible to skew caused by heavy processing load, allows the reprocessing/replay of archived telemetry data, and under certain circumstances can produce a more accurate profile of entity behavior.

  • By default, the Profiler will use event time processing. The Flux topology definition file must be edited to switch the Profiler to wall clock or processing time.

  • The Profiler is now leveraging Storm's windowing functionality introduced in Storm 1.x. This provides the core engine for event time processing. This also provides a means for the use of different window types, like sliding windows, in the Profiler. This is currently not exposed to users of the Profiler as the Flux topology definition file must be edited to use different window types.

  • Enhanced the Profiler integration tests which was enabled by the use of event time processing. The integration tests now generate 24 hours of telemetry data at roughly 3 messages per minute, and then flush profile values every 15 minutes. The entire stream of values generated by the Profiler is then validated for correctness.

  • Created a ConfigurationManager that can be used to read the latest configuration changes in a remote data store like Zookeeper. The default implementation, ZkConfigurationManager replicates the functionality that is embedded in the ConfiguredBolt base class. The Profiler bolts can no longer subclass ConfiguredBolt as it subclasses Storm's BaseRichBolt which will not work for the Profiler bolts.

  • The usability of the Profiler was enhanced to better support active profiles that are subsequently edited by the user. Changes should be handled seamlessly by the Profiler. This is especially helpful when a mistake is made when creating a profile, which then needs to be fixed and updated. The Profiler was also made more resilient to failures specific to a single Profile or Tuple. Individual failures should not impact other Profiles or Tuples.

Testing

Tested on a multi-node AWS cluster and the Quick Dev environment. Created, edited, and deleted multiple profile definitions as the Profiler was running and responding to the changes.

Here are manual steps that can be followed to test this PR. At a high-level, we setup a Metron environment and run it for a period of time to create an 'enrichments' topic containing test data. We then wait a period of time so that the data becomes stale and the timestamps attached to that data have passed. We then start the Profiler, but tell it to consume from the START of the topic so that it reads all of the stale data. We then query the Profiler to ensure that it is using the 'old' event timestamp.

  • Setup the Environment
    • Start with a clean Metron environment.
    • Stop all sensors and topologies.
    • Start consuming data from one sensor, like Snort, by starting Pcap Replay, Snort, and the Snort parser topology.
    • Ensure that data is landing in the 'enrichments' topic.
    • Allow the environment to accumulate data in the 'enrichments' topic for 15 minutes.
    • Stop all topologies and wait for an additional 15 - 60 minutes.
  • Setup the Profiler
    • Create the Profiler table in HBase.
      • create 'profiler', 'P'
    • Configure the Profiler topology by editing config/profiler.properties.
      • profiler.period.duration=1
      • profiler.input.topic=enrichments
      • kafka.start=START
    • Configure the Profiler Client by adding the following to the global properties.
      • "profiler.client.period.duration": "1",
      • "profiler.client.period.duration.units": "MINUTES"
    • Define a profile that simply counts messages.
       {
         "profiles" : [
            {
              "profile": "counter",
              "foreach": "'counter'",
              "update": { "s" : "STATS_ADD(s, 1)" },
              "result": "s"
            }
         ]
       }
    
    • Start the Profiler topology.
  • Validate
    • Query the Profiler for data with a current timestamp. There should be none.
    PROFILE_GET_FROM('counter', 'counter', MILLIS(5, 'MINUTES'))
    
    • Query the Profiler for data from 15 - 60 minutes ago, depending on how long you waited. Data should be returned.
    PROFILE_GET_FROM('counter', 'counter', MILLIS(5, 'MINUTES'), NOW() - MILLIS(60, 'MINUTES')
    

@cestella
Copy link
Member

This looks cool, @nickwallen ! The only thing I don't see is changes to GetProfile to ensure that the message is calculating the keys based on event time, rather than wall time. Is that somewhere and I missed it?

Copy link
Member

@cestella cestella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I'm commenting on the config and the client, could we adjust the PROFILE_GET to take an optional argument which is the timestamp from which it wants to calculate the profile from? This allows you to do things like, ask for the profile of this time period last week.

profiler.event.timestamp.field=timestamp

# event time processing - how long time lags behind the last seen timestamp
profiler.event.time.lag=1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we update the docs to describe what this means?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wasn't a clear comment on my part, can we please update the readme to understand exactly the impact of that parameter. As it stands, I'm not sure about the tradeoff for making it 0 minutes, for instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good catch

@cestella
Copy link
Member

Oh, also, I'm ok with the PROFILE_GET change I asked for earlier being a separate Stellar function (maybe PROFILE_GET_FROM)

@nickwallen
Copy link
Contributor Author

PROFILE_GET("profile", "entity", 15, "MINUTES") calculates 15 minutes back from current wall time to get a start and end time stamp. It then uses those times to find profile measurements in that range. It doesn't really matter if the timestamp used to store the measurement is the event or current timestamp.

Right or am I missing something? Or are you looking for something like PROFILE_GET('profile', 'entity', <start>, <end>)?

@cestella
Copy link
Member

Not necessarily looking for <start>, <end>, but I'd accept that as well. What I'm looking for is PROFILE_GET("profile", "entity", 15, "MINUTES", timestamp - 600000) to get the 15 minute chunk starting 10 minutes back.

Also, independent of the ask above, if the profiler is persisting in event time, reading should happen, by default, in event time. They should be in sync.

@cestella
Copy link
Member

sorry, imagine that subtraction put us 24 hours back, not 10 minutes back. That would make more sense as a request. ;)

@nickwallen
Copy link
Contributor Author

I want to provide some feedback on @cestella comments on changes to the Profiler Client API. Before I do that I want to make sure that we're all on the same page about usage scenarios for this functionality.

"Live" Data

The most common use case is creating profiles on live, streaming data. In this case the processing time and event time will normally remain close, but could differ under abnormal conditions.

Note that it is still very valuable to use event time processing in this scenario. Using event time here has the following advantages.

  • Profiles are not skewed by high demand that might delay processing
  • Allows the Profiler to take planned/unplanned outages and pick up where it left off
  • Produces more accurate behavioral profiles when there is a time difference between when a behavior occurs and when the telemetry produced to tell us about that behavior is received. For example, think of a sensor that collects data in batches or mini-batches where we get data on regular intervals; every 10 minutes, hourly, etc.
Replayed Data

The other use case that this positions us for is creating profiles from replayed or reprocessed archival data. I am creating a model based on a new feature that the Profiler is generating for me. When I move that model into Production, I need a historical view of that feature, to train my model. I can replay archived telemetry through the Profiler generating that history of my new feature. I think I put more examples of this in the original JIRA too.

This PR doesn't actually deliver all we need to handle replaying data. This just provides one critical component. I don't want to give anyone the impression that this PR allows us to replay data at this point in time.

@nickwallen
Copy link
Contributor Author

So now I'm thinking about changes to the Profiler Client API based on those usage scenarios.

For the Live Data use case, the existing PROFILE_GET(profile, entity, look-back) function should work just fine. I think this is the most simple function signature to use in this case.

For the Replayed Data use case, the existing PROFILE_GET signature is not going to work. To query the Profiler for replayed data, we need one or both of the following functions. Or maybe there is a better function signature that I'm not thinking of.

PROFILE_GET_RANGE

Return the measurements taken for a profile and entity between a range of time defined by a start and end.

The only problem here is how do I make it easy for the user to specify a timestamp? Do I have to accept a number of different formats and try to coerce to a timestamp? What formats should I accept?

PROFILE_GET_RANGE(profile, entity, start timestamp, end timestamp)

PROFILE_GET_FROM

Return the measurements taken for a profile and entity starting from a point in time, the offset, and looking back from that offset. This might seem initially hard to grok, but also might seem easier to use once you have your mind wrapped around it. This follows closely the existing PROFILE_GET function signature.

PROFILE_GET_FROM(profile, entity, look-back, offset)

For example, I replayed data that I know is 2 months old and now I need to look at that data. I am interested in a 15 minute window from a profile that I know is offset by 2 months.

PROFILE_GET_FROM('replayed-profile', '10.0.0.1', 15, "MINUTES", 2, "MONTHS")

@nickwallen
Copy link
Contributor Author

So now having laid my thoughts out, my opinion is that PROFILE_GET works just fine for Live Data, which is the use scenario that is supported by this PR.

The other client functions are only needed for Replaying Data and since this PR doesn't deliver support for replaying data, I'd rather put off those enhancements to a separate PR. I also need to do a fair bit more opinion gathering on what those functions should look-like exactly. And this PR is already pretty huge. :)

That being said, I'd be happy to implement the new client functions in this PR, if we can agree on their signature and it gets me a +1.

@cestella
Copy link
Member

One question. I like your signature for PROFILE_GET_FROM: PROFILE_GET_FROM(profile, entity, look-back, offset) but your example usage, PROFILE_GET_FROM('replayed-profile', '10.0.0.1', 15, "MINUTES", 2, "MONTHS") doesn't appear to fit (the last 2 args aren't an offset). I'd expect offset to be a timestamp and if you want to do things like figure out which timestamp to use, you'd do it with other stellar functions. So, for instance, I'd imagine your example as:
PROFILE_GET_FROM('replayed-profile', '10.0.0.1', 15, "MINUTES", DATE_SUBTRACT(timestamp, 2, "MONTHS")) where DATE_SUBTRACT subtracts 2 months from the timestamp of the data and returns that timestamp.

Does that make sense at all?

@nickwallen
Copy link
Contributor Author

nickwallen commented Dec 15, 2016

Yes, your example makes sense. I was not sure what the easiest way is to specify a timestamp. In my example the offset was saying 2 months before now. But I like that in your example, you make that explicit (and more flexible) with DATE_SUBTRACT. I'll crank on this and come up with something.

@ottobackwards
Copy link
Contributor

ottobackwards commented Dec 15, 2016

Could we have a way of marking parts of STELLAR as preview or beta? Just get them out there, and have a review/feedback period. Explicitly even if that is implicit at this point?

We also may need to think about deprecation and other language like features.

The attributes could do this

@nickwallen
Copy link
Contributor Author

Not a bad idea @ottobackwards. We are definitely going to have to think about change and dealing with deprecation as we move forward. I think worthy of a separate discuss or JIRA, if you're motivated to drive that discussion.

@cestella
Copy link
Member

@ottobackwards Yeah, it's a very good point. Backwards compatibility is going to be an issue going forward if we aren't quite careful.

@ottobackwards
Copy link
Contributor

OK - I'll create a jira for that - and tag you guys for comments/editing?

@ottobackwards
Copy link
Contributor

@nickwallen @cestella METRON-628
Please update or comment

@@ -0,0 +1,84 @@
package org.apache.metron.common.utils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs lic.

@@ -0,0 +1,84 @@
package org.apache.metron.common.utils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have also taken the executor functions from StellarTest and made them into a util class. There is some overlap here.

https://github.com/apache/incubator-metron/blob/master/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/StellarProcessorUtils.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, good catch. We should bring these two together. Hmm.

The immediate problem for me with StellarProcessorUtils as-is, is that I can't set a function resolver. If I simply add another 'run' method so the user can specify a function resolver, then we're going to have quite a few different run methods, which gets a little confusing in my opinion.

The other thing I prefer about the approach of StellarExecutor is that each time I call run I don't need to pass in all of the extraneous data structures, like the function resolver, the context and the variable resolver. I'd prefer to have a way to set all of these, in the setup method of my test. I think this makes each individual test case easier to read.

Which do you prefer? If we can find one way of doing it, I'd be willing to go through and transition everything to one way or the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having what you have added in core, with a util class adding testing focused helper functions in 'test' is fine. Could we make the StellarProcessorUtils call into the StellarExecutor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed the latest, which I think reflects your ideas @ottobackwards . Take a look when you get a chance.

@@ -0,0 +1,61 @@
package org.apache.metron.common.dsl.functions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs lic.

@@ -0,0 +1,48 @@
package org.apache.metron.common.dsl.functions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs lic.

@ottobackwards
Copy link
Contributor

I like the change tot he StellarProcessorUtils, should we rename it to StellarExecutorUtils?

* Responsible for managing configuration values that are created, persisted, and updated
* in Zookeeper.
*/
public class ZkConfigurationManager implements ConfigurationManager {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation differs a bit from the original ConfiguredBolt.prepare method. In the original, we would add a listener and manually update based on change notifications. I thought that is exactly what the Curator caches do for us, so I did not see the need for all that. Please correct me if I am mistaken.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me attempt to give you some context as to why the ConfiguredBolt.prepare method was written that way. First a TreeCache was necessary to allow configs to be added at runtime since the Zookeeper node for that config may or may not exist at startup. If that feature is not important then I think a collection of NodeCaches is fine. Second, the configs are stored in Zookeeper as JSON so there is a deserialization step that needs to happen before they can be read in a bolt. A listener allows a bolt to react to configuration changes asynchronously, hence deserialization only happens when a config changes and not every time a tuple is processed. A bolt can also do other expensive tasks (not just deserializing JSON) like reinitializing objects or clearing caches.

That being said it makes me nervous that deserialization happens in a synchronized method every time a tuple is executed. If this is going to be used in a low volume topology it's probably fine but if you might want to reconsider using a listener if this should be supported in high velocity streams.

It's a bummer you couldn't reuse ConfiguredBolt. Is there a way we can make it more flexible so that you can reuse it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the background @merrimanr !

deserialization happens in a synchronized method every time a tuple is executed.

Good point about serialization. Let me see if I can optimize that.

It's a bummer you couldn't reuse ConfiguredBolt. Is there a way we can make it more flexible so that you can reuse it?

ConfiguredBolt inherits from BaseRichBolt whereas I am using Storm's windowing functionality and need to inherit from BaseWindowedBolt.

Having said that, I kind of prefer to delegate the functionality to a separate class like ZkConfigurationMgr as it can be easier to test and reuse. ConfiguredBolt could delegate the logic to a ZkConfigurationMgr. But that's just my $.02 after digging into this a bit. I can see advantages in both ways of doing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree with you on deferring to a separate class. I think it makes sense here because this feature is used across all topologies by different kinds of bolts.

Maybe we could move the logic from ConfiguredBolt to ZkConfigurationMgr and change the ConfiguredBolt into an adapter like you suggest? That way we would then have a single robust configuration mechanism that could be used everywhere. If you decide you don't want to take all that on, I would suggest following the pattern in the ConfiguredBolt and use a listener. That way even though we have 2 separate implementations for the same exact thing, at least the code and logic is familiar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add a listener as you suggest to ZkConfigurationManager. That makes sense.

I'll leave changing ConfiguredBolt for a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just trying to work iteratively here. I'm not trying to replace all the functionality of the ConfiguredBolt. I just need enough for the Profiler.

In a separate PR we can enhance it to be the replacement for what is in ConfiguredBolt today. I don't see any reason to do that work in this PR. But I don't see it as being very far away though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally understand and if this were the only implementation of providing configuration from zookeeper to a storm bolt, I would agree that we should not add features we do not know we need (i.e. Work iteratively). In this situation where this is one of the core abstractions in the system and it has grown iteratively to have a set of features, I feel the burden is that if you wish to create a subset of the functionality via a different abstraction, you must complete the circle and replace the old abstraction. To not do that would leave the code base in a point of confusion. I would like to hear other voices though; the last thing I want is to be too strict or wrong-headed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please give me overnight to compare and contrast ConfiguredBolt, ZkConfigurationManager, and BaseWindowedBolt. This overlaps with my PR-345 commit 8329948 in which I enhanced GetProfile.java to respond to runtime config changes per @cestella 's suggestion, so I had previously dug into ConfiguredBolt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just catching up with this thread. Personally, I'd like to see the architectural change that is being discussed about the bolts and composition be made first to the existing ConfiguredBolt. This PR should then leverage those improvements. I do not think we should introduce a separate scheme, even with the thought that it will later be merged. We're in effect forking the architecture in the hopes of stitching it back up at some later point in time. If this was completely greenfield development, then I would agree with Nick about iterating. But this is splitting us off into two similar, yet distinct, approaches.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This thread has gotten unmanageably long. I'm going to start a new thread at the bottom of the page. Please forgive.

@Override
public synchronized void close() {
for (NodeCache cache : valuesCache.values()) {
CloseableUtils.closeQuietly(cache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the curator client should be closed as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ZkConfigurationManager does not open its own client. The client is passed into the constructor. I expect the client to be managed externally. Its mentioned in the constructor javadoc.

Also, I thought it appropriate per the usage guideline in the Curator docs...

IMPORTANT: CuratorFramework instances are fully thread-safe. You should share one CuratorFramework per ZooKeeper cluster in your application.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my apologies

* lookBack - How long to look back in milliseconds
* offset - Optional - When to start the look back from in epoch milliseconds. Defaults to current time.
* groups - Optional - The groups used to sort the profile.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you have offset and groups both optional? How would you go about specifying groups and not offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't. If you want to specify groups then you have to specify the offset. My thought is that the 80% case is no groups. I'm not sure how else it could be done. Thoughts on an alternative?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just not have offset optional? We already have PROFILE_GET which assumes current time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I don't get what that buys us. What am I missing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It violates the principle of least surprise. I don't know if any example of a function with 2 optional args in any other language.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be good with that @mattf-horton . The groups as varargs were there before we had the literal list (or maybe before I knew about that it existed?). Anyone object to changing this both in PROFILE_GET and PROFILE_GET_FROM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal 2: What does everyone think of just having PROFILE_GET_FROM accept milliseconds and then using another function to do the conversion? Personally, I like it better than what we have in PROFILE_GET. What do people think about changing the signature of PROFILE_GET to match?

PROFILE_GET('e1','p1', 15, 'MINUTES')

would become...

PROFILE_GET('e1','p1', MILLIS(15, 'minutes'))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that very much, @nickwallen!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nickwallen I like the list instead of varargs approach (I'm doing the same with the HyperLogLogPlus implementation currently) as well as your suggestion for modifying the PROFILE_GET signature.

profiler.hbase.table=profiler
profiler.hbase.column.family=P

# how profiles are written to hbase
profiler.hbase.salt.divisor=1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a stupid question, but wouldn't we want the salt divisor in zookeeper so the reader and the writer are both sync'd? Profiles written with salt divisors of x can only be read by clients that use salt divisors of x, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We handle that the same way as the period duration, there is a client-side configuration; profiler.client.salt.divisor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please keep it in zookeeper and have the client salt divisor default to it if unspecified? This way if it is changed it needs to be changed in one place in the majority case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could probably do that. There might be some other things we could move. I can look at moving configs to Zk as a follow-on PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think the same arg can be made for some others. No need to retrofit in this PR, but this is a new param added to flux. I think we should try to do it the better way in this PR to set the stage for the followon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which parameter is new? There is no new parameter as part of this PR, unless I am completely forgetting something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I misunderstood and thought the salt divisor was externalized as a flux param as part of this PR. Ok a followon JIRA it is.

}

public static boolean runPredicate(String rule, VariableResolver resolver, Context context) {
StellarPredicateProcessor processor = new StellarPredicateProcessor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you intend on making this a replacement to StellarProcessorUtils then we need to ensure that the same set of validations are done (e.g. validate is called, serialization and deserialization are ensured):

 /**		
    * This ensures the basic contract of a stellar expression is adhered to:		
    * 1. Validate works on the expression		
    * 2. The output can be serialized and deserialized properly		
    *		
    * @param rule		
    * @param variables		
    * @param context		
    * @return		
    */		
   public static Object run(String rule, Map<String, Object> variables, Context context) {		
       StellarProcessor processor = new StellarProcessor();		
       Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));		
       Object ret = processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);		
       byte[] raw = SerDeUtils.toBytes(ret);		
       Object actual = SerDeUtils.fromBytes(raw, Object.class);		
       Assert.assertEquals(ret, actual);		
       return ret;		
   }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RETRACTED BUT KEPT FOR POSTERITY DUE TO MY ILLITERACY.

Sorry, this appears to be doing the right thing. My bad; I don't read so gud. ;)

@mattf-apache
Copy link
Member

Regarding ZkConfigurationManager and the above discussion thread:
This is a complex situation, but my best advice is as follows:

  1. It is definitely worthwhile to refactor a CuratorZkConfigManager class out of ConfiguredBolt. In fact, when we do, we really should propose the Storm team adopt it; it would be tremendously valuable for lots of Storm applications. HOWEVER, this is a significant task in its own right and more than a couple days' work. I believe there is a much simpler solution for the short term, that either avoids or mostly avoids the split implementation problem (depending on a Storm detail that I wasn't able to determine), so I don't recommend we make Nick do it as part of this task.

  2. I agree with @merrimanr and @cestella that we should not lose the established functionality of ConfiguredBolt for managing ProfilerConfigurations, nor create a new config management and caching mechanism, even with good intentions about putting it all back together in the future.

  3. After analyzing what goes into the class hierarchies, I see that:
    a) The way BaseWindowedBolt manages its windowConfiguration is qualitatively different from the Curated Zk based management used by ConfiguredBolt. It would be hard to merge them, and that's what is causing the current problem.
    b) But it isn't necessary, because the config management built into BaseWindowedBolt is just a distraction.

What's necessary to use Storm Windowing is that:

  • the Bolt implements IWindowedBolt interface,
  • a certain set of 7 parameters (defined by example in BaseWindowedBolt) is presented to Storm when it queries the Bolt's getComponentConfiguration() method,
  • and the Storm topology correctly identifies the Bolt as being a Windowed Bolt.

The only question in my mind is the last bullet. Does Storm identify the Bolt as Windowed just because it presents the windowing parameters in getComponentConfiguration() ? Or does it do class reflection on the Bolt? If it uses reflection, is it looking for IWindowedBolt or for BaseWindowedBolt ?

It is likely that one of the first two is sufficient to identify the Bolt as Windowed. If so, there's a very easy solution to all this. Define:

public abstract class ConfiguredWindowedBolt<CONFIG_T extends Configurations> 
	extends ConfiguredBolt<CONFIG_T> implements IWindowedBolt
		{with only a few lines of content needed}
and
public abstract class ConfiguredProfilerBolt 
	extends ConfiguredWindowedBolt<ProfilerConfigurations> 

This compiles on the master codeline; I can share my straw man if desired.
Then change getComponentConfiguration() in ProfileBuilderBolt, to return the 7 windowing-related config parameters instead of TOPOLOGY_TICK_TUPLE_FREQ_SECS. Other than changes to add the windowing params to the ProfilerConfigurations on a per-bolt basis, the current config management code in ConfiguredProfilerBolt and ConfiguredBolt can be used just as is.

If Storm actually requires the Bolt to be of type BaseWindowedBolt, then it is a little uglier, but still doable: Copy the methods of ConfiguredBolt into a new class that extends BaseWindowedBolt, simply overriding any conflicting methods already in BaseWindowedBolt. We have to duplicate the code, but it can be copied verbatim, thereby giving much higher reliability than new code.
Again, there's no need to try to use the configuration management code built into BaseWindowedBolt; only what gets returned in getComponentConfiguration() is important.

I don't expect either of these solutions to give great joy, but it is workable and can be done reliably. We don't need to refactor ConfiguredBolt, and its curated config management can be used whole.

@cestella
Copy link
Member

@mattf-horton @mmiklavc First off, thanks for the perspective, both of you. While I think we should investigate your suggestion, @mattf-horton , it appears on the face of it quite dependent upon implementation decisions and details inside of Storm. That immediately gives me pause. That being said, I'll repeat that I think we should consider it strongly.

I do agree with @mattf-horton that pulling out the ZkConfigurationManager here would be substantially more work if you consider the impact to our tests and the increased amount of testing that we would have to add to this PR. I think, as @mmiklavc suggested, it's a separate PR. I would prefer to get this really good work by @nickwallen in, so maybe I can offer another compromise position to go with @mattf-horton 's one earlier.

I'd like to propose another compromise. The main concern that I have is leaving an second abstraction in place for managing configuration from zookeeper. Given that, what do you think of the following:

  • Remove the ConfigurationManager abstraction from this PR
  • Since it is used only in one class now, directly use the org.apache.curator.framework.recipes.cache.NodeCache in ProfilerSplitterBolt
  • Create a follow-on JIRA and make a comment with a TODO suggesting we replace this with the appropriate abstraction (referencing the JIRA).

This would remove the possibility of a fork and be substantially less work/impact.

Thoughts?

@cestella
Copy link
Member

I also wanted to mention one of the reasons I am particularly sensitive to having multiple components that do very nearly the same thing. I made this mistake early on in the project (and will probably make it again, because..you know, I don't learn ;).

Stellar started as a predicate language only for threat triage rules. As such, when the task of creating Field Transformations came to me, I needed something like Stellar except I needed it to return arbitrary objects, rather than just booleans. In my infinite wisdom, I chose to fork the language, create a second, more specific DSL for field transformations, thereby creating "Metron Query Language" and "Metron Transformation Language."

I felt a nagging feeling at the time that I should just expand the query language, but I convinced myself that it would require too much testing and it would be a change that was too broad in scope. It took 3 months for me to get around to unifying those languages and if we had more people using it, it would have been an absolute nightmare. This may be a "once bitten, twice shy" thing, but I think it's a good policy in general. Pardon the interlude; just wanted to give some context.

@nickwallen
Copy link
Contributor Author

As to @mattf-horton suggestion, I had looked at the possibility of not using the BaseWindowedBolt and implementing that myself. But that seems like worse technical debt to me. I'd be worried about how Storm evolves the functionality in future versions, especially since the functionality is relatively new in Storm. I'm sure you have the same concerns, you're just trying to help find some compromise solution here.

@cestella I see what you're going for, but don't see the logic in reimplementing what I already have working in ZkConfigurationManager. What if I moved ZkConfigurationManager and the related classes to the metron-profiler-common project. This would ensure that it could only be used exclusively by the Profiler. Then as a follow-on PR, I will reach feature parity with ConfiguredBolt, move ZkConfigurationManager back to metron-common, update ConfiguredBolt to use ZkConfigurationManager, and all else that is needed there.

I would suggest another approach as an alternative. This satisfies the concerns for multiple implementations, and results in smaller, more concise PRs.

  1. Retract this PR
  2. Enhance ZkConfigurationManager to reach feature parity with ConfiguredBolt, update ConfiguredBolt to use ZkConfigurationManager, then submit that work as a separate PR.
  3. Create a second PR for the new client-side Profiler functions as this has had its own lengthy discussion back-and-forth.
  4. Create a third PR for the event time processing additions in the Profiler.

@cestella
Copy link
Member

@nickwallen Your suggestion is fine by me. I hate to put the onus on you like this, but I think I like your suggestion best of all because it will result in smaller and more targeted review of your work. Thanks for bearing up like this; your contribution is really great and we very much want it incorporated. :)

@mattf-apache
Copy link
Member

@nickwallen and @cestella, I'm fine with this conclusion, but from a design perspective, the ZkConfigurationManager effort should not be viewed as bringing the current file to parity with ConfiguredBolt. Rather, the starting point should be a refactoring of ConfiguredBolt. ConfiguredBolt is only 60 lines of code that does this thing quite well, with real-time event capture. You want to add caching of the deserialized configs as well as the raw ZK info, and that's good. But that can be added on, it is not clear why starting from scratch is required. Clearly deriving from ConfiguredBolt will also make it way easier to code review and give confidence in the refactoring.

If you feel it is really necessary to start from scratch, please do a brief design document stating why. Thanks.

@nickwallen
Copy link
Contributor Author

I don't understand what your proposal is @mattf-horton . Please clarify

@merrimanr
Copy link
Contributor

I agree with @mattf-horton. ConfiguredBolt should be improved/refactored instead of completely starting over. Keep in mind this was developed over many iterations with input from different people and contains a lot of lessons learned and improvements gained from experience. If we start over we're likely to make the same mistakes all over again, especially if this task is taken on by someone that was not heavily involved in developing ConfiguredBolt.

What are the benefits of starting over?

@nickwallen nickwallen closed this Dec 21, 2016
@mattf-apache
Copy link
Member

Thanks, @merrimanr for expressing that better than I did.
@nickwallen , one of the principles of reliable refactoring[1] is to methodically do only and precisely the changes needed to achieve each design goal, so that implementors and reviewers may reason about the correctness of the changes, and avoid introducing new bugs or side effects.

Thus, by starting from a refactoring of ConfiguredBolt, I would expect to see many lines in ZkConfigurationManager that look exactly like the lines in ConfiguredBolt, in the same order and organization, with only necessary changes to accomodate their new packaging. That should be the starting point. Then new features can be added, such as deserialized caching and perhaps utility APIs to assist the sorts of things ConfiguredProfilerBolt does as a client of the configuration manager.

[1] Martin Fowler and Kent Beck, "Refactoring: Improving the Design of Existing Code"

@nickwallen
Copy link
Contributor Author

Ok, gotcha @mattf-horton. Understand now.

To be fair, in this PR, I wasn't trying to replace ConfiguredBolt so what I have in ZkConfigurationManager was the easiest way I found to get the functionality that I needed for the Profiler. In submitting a separate PR as described in step (2), whose scope is much greater, your guidelines on refactoring sound logical to me.

And I don't want to presume that I should be the one to tackle this even. @merrimanr I'm open to any volunteers who you think are better qualified. I would have prefered not to touch this code at all.

@mattf-apache
Copy link
Member

@nickwallen , totally understand, and I respect your efforts to keep things as simple as possible in the original scope. But I'm much happier with this scoping :-)

@mattf-apache
Copy link
Member

mattf-apache commented Dec 21, 2016

Recording here for posterity, in case anyone is interested: I asked on the storm user list:

I’ve been unable to find this in the Storm documentation:
How does the Storm topology loader identify if a Bolt is a “windowing” bolt and therefore 
needs to be wrapped with WindowedBoltExecutor?  Does it look at:
1.      Whether the Bolt’s getComponentConfiguration() method presents some or all of the 
windowing-related configuration parameters, as BaseWindowedBolt does;
2.      Or does it use reflection to determine if the Bolt implements IWindowedBolt interface;
3.      Or does the Bolt actually have to extend BaseWindowedBolt?
There are indications in the docs that it is #2, but I wasn’t able to become certain.  
Please clarify.
Thanks,
--Matt

and got confirmation that it is #2 only:

From: Arthur Maciejewicz <arthur@signafire.com>
You must satisfy the IWindowedBolt and IComponent interfaces. BaseWindowedBolt is there for 
your convenience. When constructing the topology, there is a setBolt method on TopologyBuilder 
specifically for bolts satisfying the IWindowedBolt interface. It will be wrapped with a 
WindowedBoltExecutor by the TopologyBuilder for you. You can implement windows yourself by 
returning a HashMap from getComponentConfiguration in your custom bolt (as long as they also 
implement the IWindowedBolt interface).

Worth noting, though, that if the Bolt implements BOTH IBolt and IWindowedBolt (as I contemplated), then you might have to cast it to (IWindowedBolt) in the call to setBolt().

@merrimanr
Copy link
Contributor

No problem @nickwallen, I agree you shouldn't have to tackle all of this on your own. I will volunteer for the ConfiguredBolt refactor task. Let me create a Jira and we can start collaborating on a design.

@merrimanr
Copy link
Contributor

Here is the Jira: https://issues.apache.org/jira/browse/METRON-638

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
6 participants