Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-936: Fixes to pcap for performance and testing #585

Closed
wants to merge 4 commits into from

Conversation

mmiklavc
Copy link
Contributor

https://issues.apache.org/jira/browse/METRON-936

I still have some cleanup and additional docs to add, but I wanted to get this out for review.

Contributor Comments

[Please place any comments here. A description of the problem/enhancement, how to reproduce the issue, your testing methodology, etc.]

Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.
Please refer to our Development Guidelines for the complete guide to follow for contributions.
Please refer also to our Build Verification Guidelines for complete smoke testing guides.

In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:

For all changes:

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?

For code changes:

  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?

  • Have you included steps or a guide to how the change may be verified and tested manually?

  • Have you ensured that the full suite of tests and checks have been executed in the root incubating-metron folder via:

    mvn -q clean integration-test install && build_utils/verify_licenses.sh 
    
  • Have you written or updated unit tests and or integration tests to verify your changes?

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?

  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via site-book/target/site/index.html:

    cd site-book
    mvn site
    

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
It is also recommended that travis-ci is set up for your personal repository such that your branches are built there before submitting a pull request.

Copy link
Contributor

@justinleet justinleet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took an initial look through, but still need to spin up. Could you write up a more detailed test plan, etc.?

try {
this.fs = FileSystem.get(new Configuration());
int replicationFactor = config.getReplicationFactor();
if (replicationFactor != -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this > 0? Nothing 0 or less makes sense to actually use. Might also make sense to log any replication factor actually used, for debugging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, > 0 is the right check here.

}
if(config.getHDFSConfig() != null && !config.getHDFSConfig().isEmpty()) {
for(Map.Entry<String, Object> entry : config.getHDFSConfig().entrySet()) {
if(entry.getValue() instanceof Integer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the hood, don't all these fsConfig.setX calls just call String.valueOf()? I'm not sure if that relationship (being all String under the hood is guaranteed or just an implementation detail), but if it is, this if-else chain simplifies significantly. I'll dig in a bit to see if there is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.valueOf() takes a primitive and we don't specifically know the type of X in the incoming config map when calling fsConfig.setX (where X is a type, e.g. Boolean, Integer, etc.).

, SequenceFile.Writer.keyClass(LongWritable.class)
, SequenceFile.Writer.valueClass(BytesWritable.class)
, SequenceFile.Writer.stream(outputStream)
, SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)
);
//reset state
LOG.info("Turning over and writing to " + path);
LOG.info(String.format("Turning over and writing to %s: [duration=%s NS, force=%s, initial=%s, overDuration=%s, tooManyPackets=%s]", path, duration, force, initial, overDuration, tooManyPackets));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this be formatted using the Logger's built-in lazy token replacement, rather than the String.format?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great. I wasn't planning to switch the logging implementation from Log4j in this PR, but I can change the deps if you think we need it now.
org.apache.log4j.Category#info(java.lang.Object, java.lang.Throwable)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing Log4j from all classes in pcap-backend. Submitting shortly.

@@ -28,6 +26,17 @@
public abstract class KeyValueDeserializer implements Serializable {
protected TimestampConverter converter;

public static class Result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this something more informative than Result?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, to be fair, it's KeyValueDeserializer.Result since it's a static inner class, not just Result. I'm inclined to think the naming is fine given the parent class, but I'm open to being convinced otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I'm fine with it given the context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this and decided to tweak the terms a bit in this inner class. After stepping away from it a couple days I couldn't immediately tell what Result.result actually meant :)

@mmiklavc
Copy link
Contributor Author

I found some additional issues with error handling in the HDFSWriterCallback. So I fixed this to throw an IllegalArgumentException when the key is null, but that revealed further problems in our test infrastructure. PCAPTopologyIntegrationTest seems to be relying on data that does not provide a key. Was this by design?

I get the following exception thrown, which is the one that I added as a null check on key:

Running org.apache.metron.pcap.integration.PcapTopologyIntegrationTest
Formatting using clusterid: testClusterID
2017-05-16 11:05:39 ERROR util:0 - Async loop died!
java.lang.IllegalArgumentException: Expected a key but none provided
	at org.apache.metron.spout.pcap.HDFSWriterCallback.apply(HDFSWriterCallback.java:121)
	at org.apache.storm.kafka.CallbackCollector.emit(CallbackCollector.java:59)
	at org.apache.storm.kafka.spout.KafkaSpoutStream.emit(KafkaSpoutStream.java:79)
	at org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics.emit(KafkaSpoutStreamsNamedTopics.java:101)
	at org.apache.storm.kafka.spout.KafkaSpout.emitTupleIfNotEmitted(KafkaSpout.java:280)
	at org.apache.storm.kafka.spout.KafkaSpout.emit(KafkaSpout.java:265)
	at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:212)
	at org.apache.storm.daemon.executor$fn__6503$fn__6518$fn__6549.invoke(executor.clj:651)
	at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.lang.Thread.run(Thread.java:745)
2017-05-16 11:05:39 ERROR executor:0 -
java.lang.IllegalArgumentException: Expected a key but none provided
	at org.apache.metron.spout.pcap.HDFSWriterCallback.apply(HDFSWriterCallback.java:121)
	at org.apache.storm.kafka.CallbackCollector.emit(CallbackCollector.java:59)
	at org.apache.storm.kafka.spout.KafkaSpoutStream.emit(KafkaSpoutStream.java:79)
	at org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics.emit(KafkaSpoutStreamsNamedTopics.java:101)
	at org.apache.storm.kafka.spout.KafkaSpout.emitTupleIfNotEmitted(KafkaSpout.java:280)
	at org.apache.storm.kafka.spout.KafkaSpout.emit(KafkaSpout.java:265)
	at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:212)
	at org.apache.storm.daemon.executor$fn__6503$fn__6518$fn__6549.invoke(executor.clj:651)
	at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.lang.Thread.run(Thread.java:745)
2017-05-16 11:05:39 ERROR util:0 - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
	at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
	at clojure.lang.RestFn.invoke(RestFn.java:423)
	at org.apache.storm.daemon.worker$fn__7172$fn__7173.invoke(worker.clj:761)
	at org.apache.storm.daemon.executor$mk_executor_data$fn__6388$fn__6389.invoke(executor.clj:275)
	at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:494)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.lang.Thread.run(Thread.java:745)

When I attempt to view the PCAP file with the PcapInspector in the IDE, I get this exception.

Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable
	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2254)
	at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2306)
	at org.apache.metron.utils.PcapInspector.main(PcapInspector.java:142)

Process finished with exit code 1

@cestella
Copy link
Member

@mmiklavc it depends on which test case you're talking about. We have two modes of operation in the pcap topology and 2 test cases in the integration test and these are defined by the flux property kafka.pcap.ts_scheme. These modes define the deserialization logic used in the topology to convert kafka key/values to bytes suitable for writing to HDFS:

  • FROM_PACKET: which expects a fully-formed packet (with headers) and parses the packet and extracts the timestamp from the value. This is a legacy mode, which functioned with pycapa prior to rewriting. We should eventually deprecate this and remove it. This is associated with the FromPacketDeserializer
  • FROM_KEY : which expects raw data and a timestamp from the key. This is by far the dominant mode of operation and the one you will see in pycapa or fastcapa. This is associated with the FromKeyDeserializer

It appears that you are doing the null check in the HDFSWriterCallback. I would recommend doing this null check in FromKeyDeserializer as a null key is not an illegal state for the FromPacketDeserializer.

@@ -116,7 +117,11 @@ public HDFSWriterCallback withConfig(HDFSWriterConfig config) {
public List<Object> apply(List<Object> tuple, EmitContext context) {
byte[] key = (byte[]) tuple.get(0);
byte[] value = (byte[]) tuple.get(1);
if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyValue as a whole is unused now, right? Can we just delete the class entirely at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks that way, good catch.

@cestella
Copy link
Member

Regarding the test data, it's not a sequence file in the format suitable for reading in PcapInspector. Depending on the test case, we construct the appropriate kafka representation. The value is what is being used, modified to be suitable for the test case (e.g. with headers and no key or without headers and with a key) and fed into kafka. This is being done in the readPcaps method of the Integration Test.

@mmiklavc
Copy link
Contributor Author

I see that. re: keys and methods for retrieving and saving them. I'll save refactoring and cleaning that up to a separate PR.

@cestella
Copy link
Member

+1 by inspection, great addition.

@asfgit asfgit closed this in c0b0825 May 17, 2017
zezutom pushed a commit to zezutom/metron that referenced this pull request May 18, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
3 participants