Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3705] ApproximateUnique resets its accumulator with each firing. #4688

Merged
merged 4 commits into from
Feb 23, 2018

Conversation

rangadi
Copy link
Contributor

@rangadi rangadi commented Feb 14, 2018

extractOutput() ended up resetting underlying aggregation. This is due to use of extractOrderedList() which removes all the elements from the heap. extractOrderedList() is costly and is not required either. extractOutput() does not mutate now and is cheaper too.

Merging was not tested as direct-runner does not seem to use combiner. Added test to merge and extract as happens in a window with multiple firings.

@rangadi
Copy link
Contributor Author

rangadi commented Feb 14, 2018

+R: @kennknowles, @reuvenlax

}
return heap;
}

@Override
public Long extractOutput(LargestUnique heap) {
List<Long> largestHashes = heap.extractOrderedList();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved implementation of extractOutput to LargestUnique. There is no reason to assume sampleSize is same for both. As such is based entirely on state within LargestUnique.

@jkff
Copy link
Contributor

jkff commented Feb 14, 2018

Quick comment: can you use CombineFnTester?

@rangadi
Copy link
Contributor Author

rangadi commented Feb 14, 2018

can you use CombineFnTester?

That is great. Updated the test.

I reverted my 'improvement' to add(), it was buggy. It is fine as it is.

@reuvenlax
Copy link
Contributor

reuvenlax commented Feb 14, 2018 via email

@rangadi
Copy link
Contributor Author

rangadi commented Feb 14, 2018

I don't see a any Approx.* combiners in python. Python Beam can simply use existing reducers like 'sum'.

@rangadi
Copy link
Contributor Author

rangadi commented Feb 15, 2018

R: @jkff

to use of `extractOrderedList()` which removes all the elements from
the heap. `extractOrderedList()` is costly and is not required either.
`extractOutput()` does not mutate now and is cheaper too.

Updated LargestUnique.add() to avoid 'heap.contains()' call in common
case with large input.

Merging was not tested as direct-runner does not seem to use combiner.
Added test using ConbineFnTester.

Put back add() improvement. contains() is an O(n) operation.
Avoid it in common case. I think the extractOrderedList() existed mainly
to avoid this.
@rangadi
Copy link
Contributor Author

rangadi commented Feb 15, 2018

Squashed commits.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@@ -102,6 +102,7 @@
} else {
accumulator = fn.mergeAccumulators(Arrays.asList(accumulator, inputAccum));
}
fn.extractOutput(accumulator); // Extract output to simulate multiple firings.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm don't understand this comment. Is there a way to more directly simulate multiple firings, and assert that the results of each firing are correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to invoking fn.extractOutput() multiple times as it seems to happen on the runners like Dataflow when there are multiple firings. I am not sure if there is an easy way to add validation for each firing here. Since the final extractOutput() is verified, intermediate values have some indirect validation. Without this, the test added in ApproximateUniqueTest passes even without the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any suggestions for improving this?

if (heap.size() >= sampleSize && value < heap.element()) {
return false; // Common case as input size increases.
}
if (!heap.contains(value)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the complexity of contains? Is it O(log n) or O(n)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O(n) in java.util.PriorityQueue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems quite bad - O(n) for every add(). Is this check actually needed?

break; // The remainder of this list is all smaller.
}
}
iterator.next().heap.forEach(h -> heap.add(h));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's rather confusing that "heap" refers both to PriorityQueue objects and LargestUnique objects, which have different behavior of .add() (one is limited and deduplicates, the other doesn't). In this line there's 2 things called "heap" referring to both of these. Maybe rename the variable "heap" to be called "accum"? (applies throughout the PR wherever a LargestUnique variable, field or parameter is called "heap")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I felt it too. will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (uniqueCount <= sampleSize) {
return is(uniqueCount);
} else {
long maxError = (long) Math.ceil(2.0 * uniqueCount / Math.sqrt(sampleSize));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking: is this a probabilistic guarantee (holds 99% of the time), or a hard guarantee (holds 100% of the time)? Wouldn't want the test to be flaky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I didn't check. In the case of these tests, the input does not change run to run. Once it passes it should always pass. At the least it won't be flacky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find where the error bound comes from or what the cutoff probability is. The estimate is roughly k * hash_key_space / (max_hash - k_th_largest_hash). ApproximateUnique is likely to be replaced by ApproximateDistinct which estimates using HyperLogLog. The test are not flacky as the input is deterministic.

* Test ApproximateUniqueCombineFn. TestPipeline does not use combiners.
*/
@RunWith(Parameterized.class)
public static class ApproximateUniqueCombineFnTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's only 3 parameters - I'd prefer a private utility method and 3 test methods calling it with each of the 3 parameters...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -276,6 +301,50 @@ public void testApproximateUniqueWithDifferentSampleSizes() {
}
}

/**
* Test ApproximateUniqueCombineFn. TestPipeline does not use combiners.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does (otherwise how would it evaluate the Combine transform), just not as extensively as CombineFnTester.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should clarify, it does not seem to merge() at all. I had changed merge() to return null, and it had no effect. In that sense it does not combine. I will update the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, mostly LG, one concern.

if (heap.size() >= sampleSize && value < heap.element()) {
return false; // Common case as input size increases.
}
if (!heap.contains(value)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems quite bad - O(n) for every add(). Is this check actually needed?

@rangadi
Copy link
Contributor Author

rangadi commented Feb 22, 2018

Review status: 0 of 3 files reviewed at latest revision, 6 unresolved discussions, some commit checks failed.


sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java, line 342 at r1 (raw file):

Previously, jkff (Eugene Kirpichov) wrote…

That seems quite bad - O(n) for every add(). Is this check actually needed?

Yes. The algorithm requires keeping track of top-k unique hashes. There is no option to insert() in PriorityQueue. That's why reordering the conditions is important (more efficient with as number of elements increases). If we need to increase performance here, I would consider using datastruct using primitive longs from fastutil.. may be a sorted set. For now its ok.. I haven't looked into why we have both ApproximateDistinct (using HLL+) and this.


Comments from Reviewable

@jkff
Copy link
Contributor

jkff commented Feb 22, 2018

Can we just use a TreeSet instead of the PriorityQueue? It seems strictly better, and very easy to switch to.

@rangadi
Copy link
Contributor Author

rangadi commented Feb 22, 2018

Updated to use TreeSet. We maintain minHash so that there is no log(n) look up in fast path (i.e. when number elements > sampleSize, which is more likely to happen during merges). We can't be very certain this is better in practice, might need a micro benchmark. PriorityQueue uses an array. Since we always remove min, it could cause more rebalancing than a typical tree set. Either way, it can't be much worse. This is fine. We could use primitive long based tree if we are really concerned about the performance.


Review status: 0 of 3 files reviewed at latest revision, 6 unresolved discussions, some commit checks failed.


Comments from Reviewable

@jkff
Copy link
Contributor

jkff commented Feb 22, 2018

I don't see new changes; forgot to push?

@rangadi
Copy link
Contributor Author

rangadi commented Feb 22, 2018

Oops. Just pushed my changes.

@jkff
Copy link
Contributor

jkff commented Feb 22, 2018

LGTM, please ping me when tests are green and I'll merge.

@jkff
Copy link
Contributor

jkff commented Feb 23, 2018

Seems CombineFnTesterTest is failing now.

…ping track of number of merges in accumulator.
@rangadi
Copy link
Contributor Author

rangadi commented Feb 23, 2018

I was just pushing a fix. I updated the failing test to keep track of number of merges in the accumulator itself rather than expecting extractOutput() to be called exactly once per test.

@rangadi
Copy link
Contributor Author

rangadi commented Feb 23, 2018

@jkff Thanks for the review. Looks like the tests have passed.

@jkff jkff merged commit 0be4c54 into apache:master Feb 23, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants