Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NMS-13282: Fix traffic volume tests #29

Merged
merged 1 commit into from
Aug 9, 2021
Merged

NMS-13282: Fix traffic volume tests #29

merged 1 commit into from
Aug 9, 2021

Conversation

swachter
Copy link
Contributor

Includes two minor changes:

  • log output is generated again during tests
  • no additional Flink minicluster is started (this is already part of the local Flink runner)

and one significant change.

The significant change is that the RandomFlowIT now compares the volumes of flow summaries that are output by Nephron with expected results that are calculated in-memory. The generation rate is no more considered. This test does not test the flow generation logic but tests if the summaries calculated by Nephron are correct under various conditions.

This requires that Nephron's calculation is not interrupted but finishes by itself because the end of its input is reached. This is accomplished by a specific timestamp policy factory that advances the watermark to the end when the input is idle for some amount of time.

The functionality of the Handler class is extracted into two independent pieces, namely the FlowDocGen class that generates flow documents based on flow reports and the FlowDocSender class that sends flow documents to Kafka. This split was necessary to do the in-memory-calculation that is based on flow documents.

Copy link
Member

@fooker fooker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to discuss if this way writing tests is something that we want. My main concern is twofold here:

  1. By re-implementing/duplicating the code into the test and re-doing the calculations there we are not testing the correctness of the calculations but only the correctness of the underlying system. This maybe is something that we want, but the original purpose of the tests are to check the calculations itself.

  2. The test structure has moved from a system test to something more like a integration test by interacting with the pipeline using a different approach. The oddities that the flink cluster (consisting of more than one node) may introduce are not tested anymore.

In addition, this PR is huge. I have no idea on how to review these changes in any sensible way.

@swachter
Copy link
Contributor Author

swachter commented Jul 7, 2021

I would like to discuss if this way writing tests is something that we want. My main concern is twofold here:

1. By re-implementing/duplicating the code into the test and re-doing the calculations there we are not testing the correctness of the calculations but only the correctness of the underlying system. This maybe is something that we want, but the original purpose of the tests are to check the calculations itself.

IMHO that kind of test makes a lot of sense. However, if you insist I can remove that part from RandomFlowIT. There is TotalVolumeTest that does something similar, using synthetic input, though, and therefore not including Kafka's watermark logic.

2. The test structure has moved from a system test to something more like a integration test by interacting with the pipeline using a different approach. The oddities that the flink cluster (consisting of more than one node) may introduce are not tested anymore.

I do not understand what your point is here. If the argument --runner=FlinkRunner is applied then the test runs on a flink mini cluster.

In addition, this PR is huge. I have no idea on how to review these changes in any sensible way.

If you closely look at the changes then you find that all of these changes are straight-forward:

  • The Handler class was split into two new classes FlowDocGen and FlowDocSender.
  • New functionality was added to the testing module to support tests related to clock skew and tests that use "splitted" input sources.
  • The the pipeline termination logic in the RandomFlowIT was changed. In the old version the thread running the pipeline was interrupted after some given time. This can result in indeterministic results depending on the computing environment. Now the pipeline terminates orderly when it's input is idle for some time. This makes the test more deterministic.

@swachter
Copy link
Contributor Author

I removed the e2e test part from the PR.

@swachter swachter requested a review from fooker July 23, 2021 06:03
@@ -152,7 +152,7 @@ public void tearDown() throws IOException {
}

@Test
public void canPee() throws Exception {
public void testRates() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awww, changing the name? LOL

Comment on lines 534 to 538
private org.apache.beam.sdk.Pipeline createPipeline(NephronOptions options) {
var pipeline = org.apache.beam.sdk.Pipeline.create(options);
registerCoders(pipeline);
Map<String, Object> kafkaConsumerConfig = new HashMap<>();
kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, options.getGroupId());
// Auto-commit should be disabled when checkpointing is on:
// the state in the checkpoints are used to derive the offsets instead
kafkaConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, options.getAutoCommit());

// use a timestamp policy that finishes processing when the input is idle for some time
TimestampPolicyFactory<byte[], FlowDocument> tpf = timestampPolicyFactory(
org.joda.time.Duration.millis(options.getDefaultMaxInputDelayMs()),
org.joda.time.Duration.standardSeconds(5)
);

var readFromKafka = new Pipeline.ReadFromKafka(
options.getBootstrapServers(),
options.getFlowSourceTopic(),
kafkaConsumerConfig,
tpf
);

pipeline.apply(readFromKafka)
.apply(new Pipeline.CalculateFlowStatistics(options))
.apply(new Pipeline.WriteToElasticsearch(options));

return pipeline;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast to the "real" pipeline used in production: is this only required to patch in the changed timestamp policy?

Would be nice to avoid this - even if it results in longer test run time. Do we need the pipeline to end to flush things out? If not, why not let the pipeline run forever and kill it as soon as we got the expected results from ES?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the pipeline creation is slightly tweaked only in order to allow the use of the special timestamp policy.

I prefer not to kill the pipeline. When we wait for results to appear in ES we never can be sure that all flow summaries for a specific timestamp have been persisted. (Some late panes may not yet have been persisted.) Waiting a couple of seconds may remedy this but how long should we wait? Circle-CI runs show sometimes big outliers in their required time.

Code duplication could be reduced by refactoring the pipeline creation logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to avoid the code duplication if the effort is not to high.

Copy link
Member

@fooker fooker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beside the possible removal of code duplication, this is fine.

@swachter swachter merged commit f01c2fc into master Aug 9, 2021
@swachter swachter deleted the jira/NMS-13282 branch August 9, 2021 09:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants