-
Notifications
You must be signed in to change notification settings - Fork 507
METRON-694: Index Errors from Topologies #453
Conversation
Did you name this PR correctly? Should it be METRON-695? |
METRON-694 includes both the topology changes and the Ambari MPack changes. I started on METRON-695 but decided to include both in a single PR, hence the branch being named METRON-695 but the PR name referencing METRON-694. |
Can you provide an acceptance test plan for validation on vagrant or a cluster? |
Whoops sorry I see that you did mention a way to test it out. Sorry, on a phone ;) |
So from what I see we have a new topology and every error message has a source type of "error", correct? Is there any reason we didn't just use the normal indexing topology and index errors with the usual messages? Presumably their volume will be small. |
No you are correct, we need a more comprehensive test plan. I'm still thinking about it. Triggering errors at each point in the topologies is not straightforward. Sending in a message that won't parse is an easy one and will cover a lot but it won't test everything.
… On Feb 13, 2017, at 7:35 AM, Casey Stella ***@***.***> wrote:
Whoops sorry I see that you did mention a way to test it out. Sorry, on a phone ;)
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.
|
Based on this PR, what I'd probably do is:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good so far; a few questions outstanding around how we're handling invalid data vs errors and how we're dealing with the writer configs for errors. It looked like there's one config but the error messages don't all have source type of "error", which would mean that they don't actually use the error writer config. Also, I didn't see any README.md changes here. We will need a big section around error handling added.
@@ -0,0 +1,17 @@ | |||
{ | |||
"hdfs" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused here. Are we sending every error message through with a source type of "error" so that this config can be found in the error writer topology?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error is too generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is correct. There is an "error_type" type fields that distinguishes between the different type of errors. If you think there is a strong reason to make "source.type" the "error_type" field and add indexing configs for all the different types we can do that.
@@ -74,7 +81,11 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll | |||
this.writerComponent = new BulkWriterComponent<>(collector); | |||
this.collector = collector; | |||
super.prepare(stormConf, context, collector); | |||
messageGetter = MessageGetters.valueOf(messageGetterStr); | |||
if (messageGetStrategyType.equals(MessageGetters.JSON_FROM_FIELD.name()) && messageGetField != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this special case for JSON_FROM_FIELD required? Is it because of the arg? If so, it's probably better to adjust MessageGetters
rather than doing it here. I'd suggest making a factory rather than a strategy in this case for it:
public enum MessageGetters {
BYTES_FROM_POSITION(arg -> new BytesFromPosition(arg)),
JSON_FROM_POSITION(arg -> new JSONFromPosition(arg)),
JSON_FROM_FIELD(arg -> new JSONFromField(arg));
Function<Object, MessageGetStrategy> messageGetStrategy;
MessageGetters(Function<Object, MessageGetStrategy> messageGetStrategy) {
this.messageGetStrategy = messageGetStrategy;
}
public MessageGetStrategy get(Object arg) {
return messageGetStrategy.apply(arg);
}
}
Then it becomes MessageGetters.valueOf(messageGetStrategyType).get(messageGetField)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about it, you might want to change the arg
to String
from Object
and either:
- insist that the implementing
MessageGetter
's provide an appropriate constructor - convert in the enum (e.g.
arg -> BytesFromPosition(ConversionUtils.convert(arg, Integer.class)
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes because of the arg.
I like your suggestion. Let me play around with that idea.
errorMessage.put(ErrorFields.ERROR_FIELDS.getName(), errorFieldString); | ||
List<String> hashElements = errorFields.stream().map(errorField -> | ||
String.format("%s-%s", errorField, rawJSON.get(errorField))).collect(Collectors.toList()); | ||
errorMessage.put(ErrorFields.ERROR_HASH.getName(), DigestUtils.sha256Hex(String.join("|", hashElements).getBytes(UTF_8))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know there was some discussion around hashing in the messageboard discussion of this, but could you recap for posterity the justification for this again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do the hashing, it should be done in metron-common so that we can use consistent hashing approaches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hashing was suggested to provide a unique identifier of the original information, and thus giving you something to key in on and create a count of unique events/prioritize more prevalent issues without the concern of cyclical issues (for instance, if the issue is with indexing a specific message, and you try to index it again, it will just fail in a loop).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good @ottobackwards. I will move that to a util class.
} | ||
|
||
protected List<Byte> toByteArrayList(byte[] list) { | ||
List<Byte> ret = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use guava's Bytes.asList(list)
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was carried over from ErrorUtils. Happy to change it.
Just real quick before I start to review:
|
@@ -37,6 +37,12 @@ | |||
<display-name>Metron apps indexed HDFS dir</display-name> | |||
</property> | |||
<property> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these HDSF directories more specifically pertain to topology processing? As the other use cases came into being, just 'error' may be too generic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we move the error type into the source.type field it will solve this. I believe the source type is added to the end of the HDFS path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually you can control the file name in the index config so separate index configs solves that
errorMessage.put(Constants.SENSOR_TYPE, "error"); | ||
|
||
/* | ||
* Save full stack trace in object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having a method with multiple null checks....
Maybe have a method that just calls other methods, and put the checks in there
handleThrowable(errorMessage);
handleRawMessages(errorMessage);
etc etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a good idea. I'll break that method down into smaller methods.
errorMessage.put(ErrorFields.ERROR_FIELDS.getName(), errorFieldString); | ||
List<String> hashElements = errorFields.stream().map(errorField -> | ||
String.format("%s-%s", errorField, rawJSON.get(errorField))).collect(Collectors.toList()); | ||
errorMessage.put(ErrorFields.ERROR_HASH.getName(), DigestUtils.sha256Hex(String.join("|", hashElements).getBytes(UTF_8))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do the hashing, it should be done in metron-common so that we can use consistent hashing approaches
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicated in the MPACK isn't it? Is there not a way to have this defined once and used in multiple places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless we decide we only want one indexing topology, we need separate configs. It looks like a duplicate but it's slightly different.
@@ -0,0 +1,17 @@ | |||
{ | |||
"hdfs" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error is too generic.
Sorry I should have included this in the original description. I still need to update the various READMEs, that task is outstanding and this should not be merged until that is done. This is a big PR so I want to get further along in the review process so I don't have to rewrite the documentation several times. |
That makes complete sense, we should call that stuff out pre-review. I think what we are seeing is that folks have some really good ideas and are willing to contribute to documentation on this project, we should take advantage. |
In response to "Is there any reason we didn't just use the normal indexing topology". Here are the issues I see with doing that. First, I think we should be careful about putting additional load on the indexing topology. For parser errors I think it's a wash, an error gets indexed with the original message instead of the parsed message. But for enrichment errors, new error messages are generated and passed along. If we send all errors and messages to the same indexing topology a burst of errors could affect the latency of other messages. Here is another scenario that could cause problems. Imagine a message fails to index and an error is thrown in the indexing topology. If we send all errors and messages to the same topology, that error could be replayed in the indexing topology over and over. We could add some logic to keep that from happening but now writer bolts are more complex and specific to the indexing topology. You will notice the error indexing topology is different in that it does not write back out to Kafka. I feel like by mixing error messages and legitimate messages we are asking for trouble. Having them separate allows us to tune the topologies separately and avoid the problems I mention above. Tuning the indexing topology as it stands now is challenging enough. I can understand not wanting to allocate additional worker slots though. Maybe I am being too conservative. |
On the topic of invalid messages, they are now treated as error messages. They can still be distinguished as invalid message though. Is there any reason they should be treated differently? As mentioned above, it's easy enough to change the source.type and add an indexing config. Is there a case where source.type is not error? If so then that is a mistake. |
What comes to mind is the 'source' of an error. Is this error wrong because METRON thinks it is invalid, or it it wrong because of some other configuration specific evaluation. I don't think we do this now with stellar ( IF (BUSINESS_RULE(rule_valid_field,field1) == FALSE) DROP_ERROR()) however. |
@cestella - how can we utilize the error indexing if we were going to say - output errors or warnings that there were deprecated stellar statements? |
Ok, catching up on this: Regarding invalid vs errors, I'm ok with the notion of treating them the same, but it's different from how we've done this in the past. Prior to this, invalid messages went into a queue untouched. Now, they will be sent along with the errors wrapped in a JSON structure (thought the raw messages will be captured as a field). I'm not necessarily against this, but it's different than as we originally designed and I want to call it out. I guess I'm ok with adding a new topology, but it's added a fair amount of complexity to this JIRA and I'm not sure I buy that indexing errors (what should be a rare event, IMO, in a healthy set of topologies) would materially change how we tune the indexing topology, which has to be able to handle the unified traffic of all of the parsers anyway. I will agree that we will need to ensure that errors in indexing for a given writer are skipped by that writer on the second time around. So, if I have an error in a message with a source type of
|
I think removing the error indexing topology is fine as long as we're careful to avoid error messages getting stuck in a loop in the case of a failed index write. I agree that it will remove complexity and make this PR easier to digest. As of this JIRA all errors go to a single error index with source type and error type as JSON fields. To answer your question: the error indexing config. I can see the advantage of being able to configure each source type/error type combination separately but I'm not sure it matters that much since we expect it to be very low volume. I will think about that more this weekend. From an implementation standpoint it's easy to modify so the design should be driven by the queries we expect. Same applies to invalid messages. Easy to do it either way, just need to decide what makes the most sense from a user's perspective. |
@ottobackwards to answer some of your questions (sorry was at RSA all week):
|
I lean toward having a separate kafka queue for errors. Errors are lower in volume, but are a lot more verbose. You are essentially indexing large text document when you do this and you can easily bring down ES when something goes wrong and you get lots of errors. I want the ability to first tune my cluster enough where the volume of errors is rather low. In my daily workflow I would just monitor the kafka queue via the command line consumer until I reduce the number of errors. Then if i needed to (and i want this to be optional) I can index them into ES. Or I can just dump them out to file straight out of kafka |
Can we make the Kafka queue used for errors parameterized in the various topologies? For those who want one indexing topology, we can use the "indexing" queue and for those who want a separate topology we can use an error queue. If we don't, resource constraints are going to end up with us not spinning this separate topology up in vagrant. This will end in errors slipping through the cracks as this functionality won't be tested during the normal course of testing. Because it cuts across the entire system, I consider having this functioning in vagrant to be a prerequisite for my acceptance of this PR. |
I think for a single node this is not an issue because all your storm logs are in the same place so you can just pipe to grep. This feature is meant to address gathering errors from different workers in large cluster deployments |
Right, I am less concerned with the act of finding errors in single node vagrant. I'm more interested in mimicking the mechanisms used in a multi node deployment in our acceptance testing environment. If we don't do that then errors will crop up around this that we do not find because it's not being exercised. We should make vagrant as similar to a real deployment as possible; if we do not then we will pay the price in the form of bugs. |
I agree with @cestella here. I think by default we should have a single indexing topology, for errors and everything else. We should be able to make the code configurable enough, so that if a user runs into performance issues caused by indexing telemetry and errors in the same topology, they can easily deploy a second indexing topology that is dedicated to indexing the errors. This would give them the flexibility that @merrimanr was going for when they need it. But that should be a non-default, "advanced" configuration of Metron. |
I think a configurable error topic is a reasonable request. The requirements need a little clarification though. How granular should this configuration property be? Should it be kept in the global config? Do we want to expose this property per topology? In that case it would need to be added to parser configs and flux properties. Do we want it to be per sensor? In that case it would be added to sensor, enrichment and indexing configs. Should this topology be on by default or not? If it's off by default then there is no ansible work to be done other than deploying configurations. @cestella, in the case that it's off by default, what would you consider functioning in vagrant to be? |
# Conflicts: # metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java # metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java # metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
The latest commit includes the changes discussed. Most error messages now go to the indexing topic by default so there is no need for another error indexing topology. The error topic setting for the enrichment and indexing topologies are already exposed through flux properties so I left them there. I added a "parser.error.topic" property to the global config for parser topologies (should indexing topic be the default if that property is missing or should we let it fail?). For the concern about indexing errors getting in a loop, I thought of a simple solution that required no extra work. If an error happens in the indexing topology, simply send it to a different error topic. Would this be acceptable? I wanted to throw this out there before implementing custom logic in the indexing topology classes. The error message structure did change slightly. In my testing I realized that the type for the "raw_message" field could be different (byte[] vs JSON) depending on where the error happened (pre vs post parse). If an error message with byte[] as raw_message was indexed first, Elasticsearch would error on subsequent error messages with JSON as raw_message. To address this, I serialized JSON raw messages before storing them in this field. Now raw_message is always and string and raw_message_bytes is always a byte array. All tests are passing and it has been tested in quick dev. Some tips on testing the different cases:
@cestella, would be good to get your opinion on the MessageGetter classes and where that ended up. Not sure I love the syntax for default message getters. The MPack changes are now minimal but it should still be tested there and that test is pending. The documentation still needs to be updated so please don't merge this until that is done. |
I like the flexibility of the new MessageGetter stuff. I don't understand when I would use a |
Default in this case is JSON_FROM_FIELD without specifying the field (defaults to "message"). I couldn't think of an ideal way to do this so open to ideas. We could just treat a null field to mean default and get rid of DEFAULT_JSON_FROM_FIELD. I will add unit tests. |
I think how you have it as a fine approach. Just need to doc it as such. |
Hmm, that seems like it would be somewhat confusing to me. Unless I'm missing something, that error would just not show up in the "typical" topic/dashboard. How about if there's an error during indexing, retry with more bare-bones data and adding a k/v pair that indicates such? |
Technically generating an error message and sending it back through the indexing topology should be ok now because the original message is serialized into a string and shouldn't cause another failure. Does the loop problem still exist given this info? |
EDIT: If you set |
I think you're right, that could happen (didn't this happen to you at one point?). So what is the correct approach then? Do we leave off the raw message field if the error happens in the indexing topology? |
Yeah. So, I edited my prior message but for clarity, I think we should just set |
I think focusing on the one specific error that we've seen is not the right way to think about this. Many different types of errors would cause unexpected looping, no? When unexpected looping occurs, it is very difficult to see and diagnose, right? Considering that, I would suggest adding a simple decrementing TTL counter to avoid unexpected loops. |
I agree, I recommended the same thing on the dev list and in IRC in the past. In the case where the TTL expires, we should do something. Maybe we attempt to place a standard, simple message to be indexed which has a very high likelihood of successfully being stored and that is exempt from any sort of retries or TTLs. That said, there are no other specific looping scenarios that I'm currently able to envision, which is why I wasn't pushing that concept further for this PR. Given the one that we are aware of, if we only took the TTL route, it would just loop and then fail the entire message, where adding |
I tried running this up and discovered that there's at least one error that doesn't get caught. Json parsing errors, e.g. if someone gives outright badly formatted messages to indexing (e.g. missing closing '}'), don't get caught and indexed right now. I don't believe we ever handled this type of error, because I don't think it ever occurs from our code directly. I'm inclined to not worry about it for this PR given that we never worried about it to being with, but we may want to create a follow on Jira to ensure that we handle cases like this well. As we add and increase visibility to extension points, we don't want things like this getting tripped by custom code. Anyone have objections to that? |
Hi guys, this PR is built on one fundamental assumption: kafka is always available. The source of truth for errors, therefore, is a kafka topic. In a production setting errors should go into their own topic and the retention period (size) of that topic queue should be set very high so that you can retain as many errors as you can. The reason we are are making this configurable is so that we can easier test this in Ansible by throwing both errors and valid telemetry into the same topic. In production we would not do this and would have a dedicated topic and a dedicated topology to error writing with parallelism tuned way down to prioritize ingest of actual valid telemetry over errors. The writing topology should attempt to write errors from the queue exactly to either ES, HDFS, or both exactly once. If it cannot do that then it should ping whatever infrastructure monitoring component that you are using that your ES or HDFS is down. That, however, is a different PR and is out of context here. I will need to file this PR as follow-on work. With that said, I personally see no problem with the way this PR is implemented. It allows for a dedicated topic and writing of errors into ES or HDFS exactly once if running in production setting. There is an option to configure the topic so you can have telemetry and errors in the same topic for testing on Ansible. So +1 from me |
… added tests and javadocs for MessageGetters and commented out rawMessageBytes field
Just pushed out a commit to address recent comments. Commit includes:
I also commented out the rawMessageBytes field because it's unclear how it will be used. |
# Conflicts: # metron-platform/metron-common/pom.xml
Does anyone have any outstanding comments that they need addressed? |
I will create an issue to track the possibility for looping. Other than that, this looks good. +1 |
This PR addresses METRON-695, including updates to the Ambari MPack. A summary of the changes:
There are 2 areas of review that I would like to highlight. The first is the abstraction for handling errors in the various Metron topologies. Error handling logic is not consistent across topologies, ranging from calling the ErrorUtils.handleError method to simply logging the exception. The ErrorUtils.handleError was the most common method used so I decided to extend that to accommodate other bolts and topologies. As I worked through the details I found myself either having to add several additional ErrorUtils.handleError methods to cover all the different combinations of error message properties or creating several empty Optional objects, making the code more verbose and confusing than it should be. So I moved the error message generating logic previously in ErrorUtils.generateErrorMessage to a separate MetronError class that follows a builder/fluent design pattern. With this change, capturing an error now follows this pattern:
MetronError error = new MetronError()
.withErrorProperty(errorProperty) // error properties could include caught exception, raw message(s), error type, etc
... // add as many properties as needed
ErrorUtils.handleError(collector, error);
I expect there will be many opinions about the correct approach here.
The other abstraction involves retrieving a message from a tuple. The primary driver for this is logging the original message that caused a failure. The BulkWriterComponent class is passed both a tuple and a message which is then passed to the BulkMessageWriter.write method. There are 2 challenges with this. The first is that the returned BulkWriterResponse object only contains the tuples that failed and not the messages. The other is that a message could have been transformed before being passed to the BulkWriterComponent so we need a way to get the original message again in case of a failure so that it may be replayed. To solve this, I created a MessageGetStrategy interface that can be passed around between components to retrieve the original message if needed. This could become a useful abstraction for other uses cases as well, for example making the BulkMessageWriterBolt configurable through flux.
All applicable unit and integration tests were updated (or created if they didn't already exist). This can be tested in Quick Dev by editing the elasticsearch_error.properties to match the environment and starting the topology with the start_elasticsearch_error_topology.sh script. The Bro sensor in that environment includes messages that fail to parse so error messages should be generated by default as long as that sensor is running. These steps are manual because I'm assuming Ansible is being deprecated. For a more automated approach, this can (and should) be tested with the Ambari MPack. I am still trying to think of ways to simulate errors in other parts of the topology so if anyone has ideas let me know.
I apologize for the size of this PR but error handling cuts across many different modules and classes and required a lot of updates so it was unavoidable. I decided to not update the profiler topology yet because it does not include a bolt for handling errors like the other topologies. This would require changes to the profiler topology architecture, adding to an already very large PR. I am happy to take that on in a separate pull request or this one if everyone feels it should be included now.
A special thanks to @justinleet for helping me with the Ambari MPack and @cestella for helping with the MessageGetStrategy abstraction (that he had already started with MessageGetters). Looking forward to some feedback!