Skip to content

Subtotal queries give incorrect results if query has a limit spec#10685

Closed
loquisgon wants to merge 5 commits intoapache:masterfrom
loquisgon:imply-5009-subtotal-limit
Closed

Subtotal queries give incorrect results if query has a limit spec#10685
loquisgon wants to merge 5 commits intoapache:masterfrom
loquisgon:imply-5009-subtotal-limit

Conversation

@loquisgon
Copy link

@loquisgon loquisgon commented Dec 16, 2020

Description
For calculating the results for a subtotal spec, we reuse the buffer iterator that was used in the base query. However, not all the implementations of such iterators are reusable. Specifically, LimitedBufferHashGrouper#iterator is one of those non-reusable iterators. This results in subtotal specs not working if there is also a limit used as part of the query.

This PR deeps copy some shared state in the iterator makeHeapIteratormethod to make sure that it is reusable. It also added a unit test to verify that it can be reused now.

To summarize:

  • Issue is that makeHeapIteratormethod copies overwrite some shared state
  • Solution in this ticket is to actually copy the state so that the iterator is reusable

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.

@loquisgon loquisgon changed the title Imply 5009 subtotal limit Subtotal queries give incorrect results if query has a limit spec Dec 16, 2020
setParameters(SqlQuery.getParameterList(parameters));
planAndAuthorize(authenticationResult);
result = execute();
result = execute(); //agt
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this debug comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed...along with some style cleanup

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good except for unit tests.

private CloseableIterator<Entry<KeyType>> makeHeapIterator()
{
final int initialHeapSize = offsetHeap.getHeapSize();
ByteBufferMinMaxOffsetHeap newHeap = offsetHeap.copy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here on why we are creating another copy?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in next commit

this.heapIndexUpdater = heapIndexUpdater;
}

public ByteBufferMinMaxOffsetHeap copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add some unit tests for this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in next commit

@@ -318,6 +336,8 @@ public void close()
private CloseableIterator<Entry<KeyType>> makeHeapIterator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some unit tests in the LimitedBufferHashGrouperTest where an iterator is created multiple times and still returns the same results.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in next commit

Comment on lines +211 to +243
// With minimum buffer size, after the first swap, every new key added will result in a swap
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(224, grouper.getGrowthCount());
Assert.assertEquals(104, grouper.getSize());
Assert.assertEquals(209, grouper.getBuckets());
Assert.assertEquals(104, grouper.getMaxSize());
} else {
Assert.assertEquals(899, grouper.getGrowthCount());
Assert.assertEquals(101, grouper.getSize());
Assert.assertEquals(202, grouper.getBuckets());
Assert.assertEquals(101, grouper.getMaxSize());
}
Assert.assertEquals(100, grouper.getLimit());

// Aggregate slightly different row
// Since these keys are smaller, they will evict the previous 100 top entries
// First 100 of these new rows will be the expected results.
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < numRows; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
}
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(474, grouper.getGrowthCount());
Assert.assertEquals(104, grouper.getSize());
Assert.assertEquals(209, grouper.getBuckets());
Assert.assertEquals(104, grouper.getMaxSize());
} else {
Assert.assertEquals(1899, grouper.getGrowthCount());
Assert.assertEquals(101, grouper.getSize());
Assert.assertEquals(202, grouper.getBuckets());
Assert.assertEquals(101, grouper.getMaxSize());
}
Assert.assertEquals(100, grouper.getLimit());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// With minimum buffer size, after the first swap, every new key added will result in a swap
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(224, grouper.getGrowthCount());
Assert.assertEquals(104, grouper.getSize());
Assert.assertEquals(209, grouper.getBuckets());
Assert.assertEquals(104, grouper.getMaxSize());
} else {
Assert.assertEquals(899, grouper.getGrowthCount());
Assert.assertEquals(101, grouper.getSize());
Assert.assertEquals(202, grouper.getBuckets());
Assert.assertEquals(101, grouper.getMaxSize());
}
Assert.assertEquals(100, grouper.getLimit());
// Aggregate slightly different row
// Since these keys are smaller, they will evict the previous 100 top entries
// First 100 of these new rows will be the expected results.
columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("value", 11L)));
for (int i = 0; i < numRows; i++) {
Assert.assertTrue(String.valueOf(i), grouper.aggregate(i).isOk());
}
if (NullHandling.replaceWithDefault()) {
Assert.assertEquals(474, grouper.getGrowthCount());
Assert.assertEquals(104, grouper.getSize());
Assert.assertEquals(209, grouper.getBuckets());
Assert.assertEquals(104, grouper.getMaxSize());
} else {
Assert.assertEquals(1899, grouper.getGrowthCount());
Assert.assertEquals(101, grouper.getSize());
Assert.assertEquals(202, grouper.getBuckets());
Assert.assertEquals(101, grouper.getMaxSize());
}
Assert.assertEquals(100, grouper.getLimit());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these look unrelated to the unit test and are already being verified in a different unit test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you @loquisgon

Comment on lines +71 to +83
// deep copy buf
ByteBuffer buffer = ByteBuffer.allocateDirect(buf.capacity());

int bufPosition = buf.position();
int bufLimit = buf.limit();
buf.rewind();

buffer.put(buf);
buffer.position(bufPosition);
buffer.limit(bufLimit);

buf.position(bufPosition);
buf.limit(bufLimit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It seems like this should be moved in to a separate function since it's used in 2 places. I tried a little googling, and didn't find any libraries that implement deep copying of ByteBuffers which I thought was surprising.

Looking at an implementation I found in http://www.java2s.com/example/java/java.nio/performs-a-deep-copy-on-a-byte-buffer.html, it looks like this implementation does not copy the order of the original ByteBuffer. I can't tell if this is important or not. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed. Code should be refactored to abstract out the duplicated block.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't hurt to copy the order I guess. I will look more into it.

.orElse(null);

// deep copy buf
ByteBuffer buffer = ByteBuffer.allocateDirect(buf.capacity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupBy queries use both processing buffers and merge buffers. The former is used when you compute per-segment results while the later is for all other purposes (merging per-segment results, computing subtotals, etc). Especially the broker only uses the merge buffers to process subtotals and subqueries.

The merge buffers are maintained in a BlockingPool to manage the memory usage in brokers and historicals. This is important to avoid query failures due to OOM errors. Here, you should not allocate memory directly, but get one from the merge buffer pool. Check out how the merge buffer is currently acquired.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. My biggest concern with this code is that it is not obvious to the caller that creating new iterators of the type being changed here will allocate new off-heap memory in an unbounded fashion. This is ok if we think that not "too many" copies will be done but I cannot affirm that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. My biggest concern with this code is that it is not obvious to the caller that creating new iterators of the type being changed here will allocate new off-heap memory in an unbounded fashion. This is ok if we think that not "too many" copies will be done but I cannot affirm that.

It seems pretty dangerous to me as users are not expected to be aware of this behavior. The blocking merge buffer pool is used to avoid unexpectedly using "too many" copies at the same time like in this problematic scenario.

By the way, on the second look, I'm wondering why we copy the buffer instead of fixing the iterator of LimitedBufferHashGrouper to be re-iterable. This seems a better fix to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with @loganlinn offline, we agreed fixing LimitedBufferHashGrouper would be a better fix.

@suneet-s
Copy link
Contributor

Fixed by #10743

@suneet-s suneet-s closed this Jan 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants