Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1968: Messages are lost when a parser produces multiple messages and batch size is greater than 1 #1330

Closed
wants to merge 20 commits into from

Conversation

merrimanr
Copy link
Contributor

@merrimanr merrimanr commented Feb 5, 2019

Contributor Comments

This PR represents a fairly significant shift in the Writer class architecture. Currently these classes do not support tuples that result in multiple messages, mainly due to a limitation in the BulkMessageWriter interface. The write method accepts separate lists of tuples and messages so there is no way to know which tuples are associated with which message. It has worked so far with parsers that only emit a single message from a tuple because a 1 to 1 relationship is assumed in these classes.

I experimented with several different approaches to fixing this and tried to follow a strategy that would avoid having to significantly rewrite multiple classes (this was unavoidable in certain places). I change the BulkMessageWriter.write interface from:

BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception;

to:

BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Map<String, MESSAGE_T> messages) throws Exception;

The messages being passed in are now represented as a map where the keys are message ids and the values are the messages. This made changes to the BulkMessageWriter implementations easier because they continue to assume a 1 to 1 relationship between message ids (formerly tuples) and messages. Now the writers report which message ids succeeded/failed instead of tuples. The BulkWriterComponent continues track batch sizes and timeouts but no longer manages tuples or error handling. Instead it calls BulkWriterResponseHandler.handleFlush when messages are flushed. This interface is injected into BulkWriterComponent and allows us to properly commit messages according to the requirements of the platform the classes are running on. For now a Storm implementation of BulkWriterResponseHandler is setup and passed into the BulkWriterComponent. Managing tuple to message relationships as well as tuple acking and error handling are now done in a single class. The bolts now have to setup a StormBulkWriterResponseHandler object and add tuples/messages to it.

Changes Included

  • The BulkMessageWriter is updated and all implementations have been adjusted to conform to the new interfaces. Changes to the implementations are small and straightforward.
  • Tuple acking and error handling is moved out of BulkWriterComponent and into StormBulkWriterResponseHandler.
  • Bolts have been updated with the new StormBulkWriterResponseHandler pattern
  • I noticed HBaseWriter and `PcapWriter are no longer being used. Rather than go to the trouble of updating their tests I removed them.
  • Removed WriterHandler.handleAck since acks are no longer handled here.
  • Added logging to make troubleshooting writer issues easier

There were also several significant changes that needed to be made to the tests:

  • The writer tests are updated to match the new BulkMessageWriter.write interface. In most cases these changes were simple however I noticed there were no unit tests for KafkaWriter.write so I added them.
  • The parser integration tests now verify the all tuples were acked.
  • There is now a parser integration test that simulates the use case described in this Jira. The jsonMapQuery parser integration test now produces multiple messages from a single tuple and sets the batch size to 5.
  • A test was added for StormBulkWriterResponseHandler that also simulates the use case described in this Jira. Error handling tests that were originally in BulkWriterComponentTest were migrated here.

Testing

This has been tested in full dev both for regression and for the use case described in this Jira. There are 3 test cases:

  • bro sensor with a batch size set to 5
  • snort sensor with a batch size set to 1
  • jsonMapQuery sensor that produces multiple messages from a tuple with a batch size set to both less than and greater than the number of messages

Setup

  1. Spin up full dev and verify data is landing in both Elasticsearch and HDFS.
  2. Stop all sensors with service sensor-stubs stop.
  3. Create the jsonMapQuery sensor topic:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper node1:2181 --create --topic jsonMapQuery --partitions 1 --replication-factor 1
  1. Copy the jsonMapQuery sample data to full dev:
cd /metron-deployment/development/centos6
vagrant scp ../../../metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput /tmp
  1. Stop Metron Parsers in Ambari.
  2. Navigate to Ambari > Metron > Configs > Parsers and change "Metron Parsers" from bro,snort,yaf to bro,snort,jsonMapQuery.
  3. Start Metron Parsers in Ambari.
  4. Clear the Elasticsearch indices:
curl -XDELETE http://node1:9200/bro_index*
curl -XDELETE http://node1:9200/snort_index*
  1. Stop Metron Indexing in Ambari.
  2. Clear the HDFS files:
su hdfs -c "hdfs dfs -rm /apps/metron/indexing/indexed/bro/*"
su hdfs -c "hdfs dfs -rm /apps/metron/indexing/indexed/snort/*"
  1. Start Metron Indexing in Ambari.

  2. Verify there is no bro or snort data in Elasticsearch or HDFS:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep bro
[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep snort
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json | wc -l
cat: `/apps/metron/indexing/indexed/bro/*.json': No such file or directory
0
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/snort/*.json | wc -l
cat: `/apps/metron/indexing/indexed/snort/*.json': No such file or directory
0
  1. Each topology (parsing, enrichment, indexing) can be configured with a batch size and timeout. The batch timeout defaults to 1/2 the tuple timeout so tests described next should be done reasonably quick (before the timeout happens). To make the test results easier to understand we also need to adjust the batch sizes to 1 for topologies we are not directly testing. Set the bro parser batch size to 1:
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
  "parserClassName": "org.apache.metron.parsers.bro.BasicBroParser",
  "sensorTopic": "bro",
  "parserConfig": {
    "batchSize": 1
  }
}' 'http://user:password@node1:8082/api/v1/sensor/parser/config/bro'
  1. Set the snort parser batch size to 1:
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
  "parserClassName": "org.apache.metron.parsers.snort.BasicSnortParser",
  "sensorTopic": "snort",
  "parserConfig": {
    "batchSize": 1
  }
}' 'http://user:password@node1:8082/api/v1/sensor/parser/config/snort'
  1. Set the enrichment batch size to 1:
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
  "es.clustername": "metron",
  "es.ip": "node1:9200",
  "es.date.format": "yyyy.MM.dd.HH",
  "parser.error.topic": "indexing",
  "update.hbase.table": "metron_update",
  "update.hbase.cf": "t",
  "es.client.settings": {},
  "profiler.client.period.duration": "15",
  "profiler.client.period.duration.units": "MINUTES",
  "user.settings.hbase.table": "user_settings",
  "user.settings.hbase.cf": "cf",
  "bootstrap.servers": "node1:6667",
  "source.type.field": "source:type",
  "threat.triage.score.field": "threat:triage:score",
  "enrichment.writer.batchSize": "1",
  "profiler.writer.batchSize": "15",
  "profiler.writer.batchTimeout": "0",
  "geo.hdfs.file": "/apps/metron/geo/default/GeoLite2-City.tar.gz",
  "asn.hdfs.file": "/apps/metron/asn/default/GeoLite2-ASN.tar.gz"
}' 'http://user:password@node1:8082/api/v1/global/config'
  1. Set the jsonMapQuery indexing batch size to 1 (since we will test this sensor in parsers):
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
  "hdfs": {
    "index": "jsonmapquery",
    "batchSize": 1,
    "enabled": true
  },
  "elasticsearch": {
    "index": "jsonmapquery",
    "batchSize": 1,
    "enabled": true
  },
  "solr": {
    "index": "jsonmapquery",
    "batchSize": 1,
    "enabled": false
  }
}' 'http://user:password@node1:8082/api/v1/sensor/indexing/config/jsonMapQuery'

It can be helpful to keep a Kafka console consumer running for each topic:

/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic bro
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic snort
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic jsonMapQuery
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic enrichments
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic indexing

bro

  1. Bro has a batch size of 5 in indexing by default. Write 4 bro messages:
shuf -n 4 /opt/sensor-stubs/data/bro.out | sed -e "s/\"ts\"\:[0-9]\+\./\"ts\"\:`date +%s`\./g" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic bro

There should be no data in Elasticsearch or HDFS:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep bro
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json | wc -l
cat: `/apps/metron/indexing/indexed/bro/*.json': No such file or directory
0
  1. Write an additional bro message:
shuf -n 1 /opt/sensor-stubs/data/bro.out | sed -e "s/\"ts\"\:[0-9]\+\./\"ts\"\:`date +%s`\./g" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic bro

There should now be 5 messages in Elasticsearch and HDFS:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep bro
bro_index_2019.02.07.22    5
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/bro/*.json | wc -l
5

snort

  1. Snort has a batch size of 1 in indexing by default: Write a snort message:
shuf -n 1 /opt/sensor-stubs/data/snort.out | sed -e "s/[^,]\+ ,/`date +'%m\/%d\/%y-%H:%M:%S'`.000000 ,/g" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic snort

There should now be 1 message in Elasticsearch and HDFS:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep snort
snort_index_2019.02.07.22    1
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/snort/*.json | wc -l
1

jsonMapQuery

  1. The jsonMapQuery sensor produces 10 messages for every tuple. Set the batch size to 5 in the parser topology:
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
  "parserClassName": "org.apache.metron.parsers.json.JSONMapParser",
  "sensorTopic": "jsonMapQuery",
  "parserConfig": {
    "jsonpQuery": "$.foo",
    "batchSize": 5
  }
}' 'http://user:password@node1:8082/api/v1/sensor/parser/config/jsonMapQuery'
  1. Writer a jsonMapQuery message:
cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery

This should cause the issue described in the Jira. Before there was only a single message being written for each tuple. With this PR there should be 10 messages in Elasticsearch and HDFS:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
jsonmapquery_index_2019.02.07.22   10
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
10
  1. Change the batch size to 15:
curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' -d '{
  "parserClassName": "org.apache.metron.parsers.json.JSONMapParser",
  "sensorTopic": "jsonMapQuery",
  "parserConfig": {
    "jsonpQuery": "$.foo",
    "batchSize": 15
  }
}' 'http://user:password@node1:8082/api/v1/sensor/parser/config/jsonMapQuery'
  1. Writer another jsonMapQuery message:
cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery

There should still be 10 messages in Elasticsearch and HDFS (no additional messages written):

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
jsonmapquery_index_2019.02.07.22   10
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
10
  1. Writer another jsonMapQuery message:
cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery

This should cause a batch to flush so we should see 15 additional messages:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
jsonmapquery_index_2019.02.07.22   25
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
25
  1. Writer another jsonMapQuery message:
cat /tmp/jsonMapExampleOutput | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic jsonMapQuery

This should cause another batch to flush since there were 5 messages still in the batch. We should see another 15 messages added:

[root@node1 vagrant]# curl -s -XGET http://node1:9200/_cat/indices?h=index,docs.count | grep jsonmapquery
jsonmapquery_index_2019.02.07.22   40
[root@node1 vagrant]# hdfs dfs -cat /apps/metron/indexing/indexed/jsonMapQuery/*.json | wc -l
40

The various topologies can be tested by adjusting their batch sizes and timeouts. The previous instructions focus on testing batch sizes for bro in indexing, snort in indexing and jsonMapQuery in parsing. Other topologies and scenarios can and should be tested with different batch sizes and timeouts.

Feedback Requested

Outside of the standard code review and function testing, they are a few areas that are not 100% clear and I would like feedback on:

  • Is the fundamental architectural approach solid? Are there any holes I'm not thinking of?
  • The message ids generated by the bolts and used to track messages in the writer classes are Java UUIDs. Is this good enough? Any id that uniquely identifies a message could be used here. Is there something that would perform better?
  • There is the potential for messages to pile up in StormBulkWriterResponseHandler (and probably other BulkWriterComponent too) when tuples time out and are replayed. I think this is also an issue now but should we explore some kind of cache that can evict messages after timeouts?
  • I have added javadocs and am planning on taking another pass and adding more. Are there areas you feel need more/better explanation? My goal is for these classes to be easy to understand and navigate. Is the interaction between the bolts and response handler clear?

Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.
Please refer to our Development Guidelines for the complete guide to follow for contributions.
Please refer also to our Build Verification Guidelines for complete smoke testing guides.

In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:

For all changes:

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?

For code changes:

  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?

  • Have you included steps or a guide to how the change may be verified and tested manually?

  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:

    mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
    
  • Have you written or updated unit tests and or integration tests to verify your changes?

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?

  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via site-book/target/site/index.html:

    cd site-book
    mvn site
    

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
It is also recommended that travis-ci is set up for your personal repository such that your branches are built there before submitting a pull request.

@@ -326,6 +329,10 @@ protected void handleError(String sensorType, byte[] originalMessage, Tuple tupl
ErrorUtils.handleError(collector, error);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we hashing here? This seems like it would be something that would maybe impact performance that we can do another way. Is there no existing field or type we can use? Why not just pass the Tuple itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and it's something I called out in the PR description. What should we use? We can't pass in a Tuple anymore, that's a fundamental change this PR introduces. It's needs to be a String type and it needs to uniquely identify a message. It's possible a Tuple results in multiple messages so that won't work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this more, hashing the message is definitely not the right answer. The most recent commit uses a java UUID instead. We can explore even more efficient options but this should be an improvement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merrimanr I think that is a better change, unless we can find a consistent and already existing id to use ( which I don't see either ).
We should just move ahead with this approach

List<Tuple> tuples = new ArrayList<>();
List<String> messageIds = new ArrayList<>();
for(int i = 0;i < 5;++i) {
Tuple t = mock(Tuple.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the format string could be a static final String constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* This interface is used by the {@link org.apache.metron.writer.BulkWriterComponent} to report that a queue for a
* sensor type has been flushed. Different frameworks may have different requirements for committing processed messages
* so this abstraction provides a way to pass in the appropriate commit logic for the framework in use.
Copy link
Contributor

@ottobackwards ottobackwards Feb 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/abstraction/Interface/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@ottobackwards
Copy link
Contributor

This looks really good. I think that the way this works is important, maybe important enough to be in the architecture documentation.

@justinleet
Copy link
Contributor

@merrimanr Looked like the build failed. Could you look into it when you have a chance?

@merrimanr
Copy link
Contributor Author

The latest commit converts Map<String, MESSAGE_T> to List<BulkWriterMessage<MESSAGE_T>> in the BulkMessageWriter.write method (where BulkWriterMessage contains and id and message). This should make it clearer what the BulkMessageWriter interface is expecting.

@ottobackwards
Copy link
Contributor

really like that last change @merrimanr, nice

@merrimanr
Copy link
Contributor Author

The latest commit implements @nickwallen's suggestion of refactoring the flush decision logic in BulkWriterComponent.write method into separate policies. I think it was a great suggestion and I like it better than what we had.

I also added several javadocs to the various classes included in this PR. I am still reviewing but if you see classes that need some javadoc love, just let me know.

Since this is a fairly significant change I tested everything again in full dev and things are still working.

/**
* This interface is used by the {@link org.apache.metron.writer.BulkWriterComponent} to determine if a batch should be flushed.
* @param <MESSAGE_T> Message type
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask how the MESSAGE_T is relevant to this interface? Don't you just need the size? Why does this need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess you're right. My thinking was we might want a policy that depends on message contents at some point. We don't have an actual use for that now so it is unnecessary. If we do have a need in the future we can just change the interface since it's internal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there is nothing about the function of the interface ( the call itself ) that needs the type. You can always implement the interface with an implementing class that is specialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your initial reaction was correct. Let's keep it simple for now instead of trying to predict the future. Latest commit uses batch size instead.

Copy link
Contributor

@mmiklavc mmiklavc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the direction you guys went with the flush policy extraction. Even if we never go beyond a batch size and timeout flush policy, this improves clarity and readability pretty dramatically.

Copy link
Contributor

@nickwallen nickwallen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good. I have some more to review and get my head around, but here is some initial feedback.

*/
protected long getBatchTimeout(String sensorType, WriterConfiguration configurations) {
int batchTimeoutSecs = configurations.getBatchTimeout(sensorType);
if (batchTimeoutSecs <= 0 || batchTimeoutSecs > defaultBatchTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this logic. If I configure a timeout greater than the default (currently 6 seconds), it is going to just use the default? So if I set the timeout to 10, it is just going to ignore me and use 6?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was preexisting. I don't know the exact reason but I suspect it was a guard against setting the batch timeout to be greater than the tuple timeout. @mmiklavc also brought up a question around default batch timeout and I'm not quite sure if what we have is correct or what the requirements are. I know that the default batch timeout is 1/2 the tuple timeout when running in a bolt and 6 otherwise (test scenarios I think). I also know the default batch timeout is only set once at the beginning and isn't tied to anything dynamically.

How should default batch timeout work? I think we need to keep the default to be 1/2 the tuple time in storm but what about other scenarios? Should this setting be exposed in say global config? Should we be able to change it at runtime?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just do what the user sets it to here. Changing it under the covers like this is just going to cause confusion. Especially since this code will also be run outside of Storm, it shouldn't be bound by subtle Storm constraints.

Perhaps there is somewhere in Storm-specific code that we can add a check and clear log statement or exception, if the configured batch timeout exceeds some percentage of the tuple timeout? ParserBolt maybe or elsewhere?

If this is current behavior, I'd also be open to just tabling this for a separate JIRA/PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, that's a good catch @nickwallen. And I believe @merrimanr is right. The reason we want to restrict the timeout range is so that if the tuple timeout ends up being short enough with respect to our batch timeout, we'll fail a lot of tuples due to timeout. I'm not sure if there's a better way to capture that reasoning in the code, but we should definitely refactor or add comments to make this clearer.

It seems likely that the only reason this was configurable (the default, that is) in the first place was for testing. And oddly enough, that capability and flexibility made this less clear for us now. I think ultimately in Storm world we should have that default lockstep with the Storm concept of tuple timeout. So if it's 1/2 the tuple timeout, let's leave that be. Considering other streaming frameworks, that concept of where we derive the default should be more general.

I would be in favor of renaming "defaultBatchTimeout" to "maxBatchTimeout" and then having a method "setMaxBatchTimeout" or constructor arg that clearly sets that value. Any reliance on tuples or Storm-specific config I think should be kept out of the timeout policy implementation and provided externally. In this case, this is an option that is not dynamic bc changing the tuple timeout requires a topology restart. We should continue to pull the current tuple timeout from the Storm Bolt config and pass that in per the calculation as the "max timeout." Our Storm code cares about the max being 1/2 tuple timeout whereas the policy class only cares that there's the concept of a max value in the first place. i.e. let's leave the construction of that value to the classes that are instantiating the policies. How's that sound to you gents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit changes "defaultBatchTimeout" to "maxBatchTimeout", moves that setting to the constructor(s) and removes the set methods. A default value no longer lives in BatchTimeoutPolicy and instead an exception is thrown if the value is less than or equal to 0. The bolts now pass down the value into the BulkWriterComponent constructor which uses it to create the policy.

There was a default value of 6 being used previously. As I was making these changes I realized the WriterBolt class relies on this default instead of calculating what it should be. I moved the default to this class instead so it continues to function as it does now. Rather than going to the trouble of figuring out the right way to set this I think we should replace it with the BulkMessageWriterBolt. They do the same thing and are redundant. Planning on opening a separate PR for that anyways. If anyone disagrees with this and would like to discuss the correct way to set "maxBatchTimeout" in this bolt let me know.

nickwallen pushed a commit to nickwallen/metron that referenced this pull request Feb 15, 2019
…s and batch size is greater than 1 (merrimanr via nickwallen) closes apache#1330
@merrimanr merrimanr closed this Feb 19, 2019
@merrimanr merrimanr reopened this Feb 19, 2019
@merrimanr merrimanr closed this Feb 19, 2019
@merrimanr merrimanr reopened this Feb 19, 2019
# Conflicts:
#	metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
#	metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@merrimanr merrimanr closed this Feb 19, 2019
@merrimanr merrimanr reopened this Feb 19, 2019
# Conflicts:
#	metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterResponse.java
#	metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
#	metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
Copy link
Contributor

@nickwallen nickwallen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 by inspection only. Thanks for all the hard work on this. This looks good to me, pending the other reviewers.

@justinleet
Copy link
Contributor

The implementation of this seems pretty reasonable, from taking a look at things. I didn't dig as deep as the other reviewers, but I'm good with where this is at code-wise.

@ottobackwards requested some architecture documentation and I didn't see any updates there. This feels like a good opportunity to get a core part of our architecture well documented and understood to make issues like this easier to understand in the future. Would you be able to write up some docs around this?

@merrimanr
Copy link
Contributor Author

@justinleet yes was planning on circling back to that once we made it through the code review. Now that this PR is in a good spot I'll start on that documentation. Thanks.

@ottobackwards
Copy link
Contributor

If you want to put out a discuss for how to document this and like things and control scope, I would not be opposed to a follow on, as long as it get's done before next release.

@merrimanr
Copy link
Contributor Author

Thanks @ottobackwards I appreciate that. Would save me from the difficult task of keeping this big PR up to date with master. I will start a discuss thread now on the topic. If there are no objections there I will create a follow on Jira and commit to getting it done within a few days.

@ottobackwards
Copy link
Contributor

I think we need some way to document architectural concepts that is not as onerous as a full architecture document. I can imagine almost an architectural FAQ type of thing:

Q. How to the Indexers batch writes?
A. index_batching.md

@merrimanr
Copy link
Contributor Author

@ottobackwards just started a discuss thread on the architectural documentation topic.

@merrimanr
Copy link
Contributor Author

Follow on Jira is here.

@justinleet
Copy link
Contributor

justinleet commented Feb 25, 2019

I'm +1, with the follow-on for documentation (and can you post the Jira here for posterity?)

Edit: And assuming everyone else is good.

@merrimanr
Copy link
Contributor Author

@ottobackwards
Copy link
Contributor

I am +1 as well. Thanks for working through this with us @merrimanr

@mmiklavc
Copy link
Contributor

Nice work @merrimanr, I am +1 pending agreement from @JonZeolla in the DISCUSS thread put out about documentation. Also, @merrimanr would you be able to link that thread to the Jira and tag it as required for our next release?

@merrimanr
Copy link
Contributor Author

Here is the Jira from the DISCUSS thread: https://issues.apache.org/jira/browse/METRON-2014

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants