Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Page shouldn't close a block twice #100370

Merged
merged 6 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/100370.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 100370
summary: "ESQL: Page shouldn't close a block twice"
area: ES|QL
type: bug
issues:
- 100356
- 100365
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.Objects;

/**
Expand Down Expand Up @@ -88,9 +89,7 @@ private Page(Page prev, Block[] toAdd) {
this.positionCount = prev.positionCount;

this.blocks = Arrays.copyOf(prev.blocks, prev.blocks.length + toAdd.length);
for (int i = 0; i < toAdd.length; i++) {
this.blocks[prev.blocks.length + i] = toAdd[i];
}
System.arraycopy(toAdd, 0, this.blocks, prev.blocks.length, toAdd.length);
Comment on lines -91 to +93
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small improvement.

}

public Page(StreamInput in) throws IOException {
Expand Down Expand Up @@ -169,8 +168,8 @@ public Page appendPage(Page toAdd) {
@Override
public int hashCode() {
int result = Objects.hash(positionCount);
for (int i = 0; i < blocks.length; i++) {
result = 31 * result + Objects.hashCode(blocks[i]);
for (Block block : blocks) {
result = 31 * result + Objects.hashCode(block);
}
return result;
}
Expand Down Expand Up @@ -222,6 +221,43 @@ public void writeTo(StreamOutput out) throws IOException {
*/
public void releaseBlocks() {
blocksReleased = true;
Releasables.closeExpectNoException(blocks);
// blocks can be used as multiple columns
var map = new IdentityHashMap<Block, Object>(mapSize(blocks.length));
var DUMMY = new Object();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be moved into a static final var.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or...

var map = new IdentityHashMap<Block, Boolean>(mapSize(blocks.length));
for (Block b : blocks) {
    if (map.putIfAbsent(b, Boolean.TRUE) == null) { ..

for (Block b : blocks) {
if (map.putIfAbsent(b, DUMMY) == null) {
Releasables.closeExpectNoException(b);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this improves the issue, but there still can be problems because non-identical blocks can be backed by the same vector or array.

Additionally, we create and populate a new hash map on each page release; that's probably quite expensive.

We can merge this to fix the problem right now, but IMO should mark this with a todo comment to remove this logic once possible. We want ref counting, this will fix the problem more idiomatically.

Copy link
Member Author

@costin costin Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-identical blocks can be backed by the same vector or array.

Why would they?

we create and populate a new hash map on each page release; that's probably quite expensive.

I've added a check to skip the release once it's one - we can change that if we want a page to only be released once.

}

/**
* Returns a Page from the given blocks and closes all blocks that are not included, from the current Page.
* That is, allows clean-up of the current page _after_ external manipulation of the blocks.
* The current page should no longer be used and be considered closed.
*/
public Page newPageAndRelease(Block... keep) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've introduced this method for handling the case of reusing some blocks from an old page into a new one.
This happens in ProjectExec and OutputExec (inside the planner) - I've moved this method from the Project into the page and clean-it up a bit (the usual reduction complexity from using two lists - O(N*M) to O(N) with the extra memory for the map).

blocksReleased = true;

var newPage = new Page(positionCount, keep);
var map = new IdentityHashMap<Block, Object>(mapSize(keep.length));
var DUMMY = new Object();

// create identity set
for (Block b : keep) {
map.putIfAbsent(b, DUMMY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subjective:

  // create identity set
  var set = Collections.newSetFromMap(new IdentityHashMap<Block, Boolean>(mapSize(keep.length)));
  set.addAll(Arrays.asList(keep));
  ..
      if (set.contains(b) == false) { ..

}
// close blocks that have been left out
for (Block b : blocks) {
if (map.containsKey(b) == false) {
Releasables.closeExpectNoException(b);
}
}

return newPage;
}

static int mapSize(int expectedSize) {
return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0);
Copy link
Contributor

@bpintea bpintea Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CollectionUtils.mapSize()?
Edit: not avail in compute package, disregard.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -70,30 +68,7 @@ protected Page process(Page page) {
var block = page.getBlock(source);
blocks[b++] = block;
}
closeUnused(page, blocks);
return new Page(page.getPositionCount(), blocks);
}

/**
* Close all {@link Block}s that are in {@code page} but are not in {@code blocks}.
*/
public static void closeUnused(Page page, Block[] blocks) {
List<Releasable> blocksToRelease = new ArrayList<>();

for (int i = 0; i < page.getBlockCount(); i++) {
boolean used = false;
var current = page.getBlock(i);
for (int j = 0; j < blocks.length; j++) {
if (current == blocks[j]) {
used = true;
break;
}
}
if (used == false) {
blocksToRelease.add(current);
}
}
Releasables.close(blocksToRelease);
return page.newPageAndRelease(blocks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.indices.breaker.CircuitBreakerService;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.LongStream;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -59,15 +56,12 @@ public void testProjection() {
var out = projection.getOutput();
assertThat(randomProjection.size(), lessThanOrEqualTo(out.getBlockCount()));

Set<Block> blks = new HashSet<>();
for (int i = 0; i < out.getBlockCount(); i++) {
var block = out.<IntBlock>getBlock(i);
assertEquals(block, page.getBlock(randomProjection.get(i)));
blks.add(block);
assertEquals(blocks[randomProjection.get(i)], block);
}

// close all blocks separately since the same block can be used by multiple columns (aliased)
Releasables.closeWhileHandlingException(blks.toArray(new Block[0]));
out.releaseBlocks();
}

private List<Integer> randomProjection(int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ x:integer | z:integer
4 | 8
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
renameProjectEval-Ignore
renameProjectEval
from employees | sort emp_no | eval y = languages | rename languages as x | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3;

x:integer | y:integer | x2:integer | y2:integer
Expand All @@ -94,8 +93,7 @@ x:integer | y:integer | x2:integer | y2:integer
4 | 4 | 5 | 6
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
duplicateProjectEval-Ignore
duplicateProjectEval
from employees | eval y = languages, x = languages | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3;

x:integer | y:integer | x2:integer | y2:integer
Expand Down Expand Up @@ -160,8 +158,7 @@ y:integer | x:date
10061 | 1985-09-17T00:00:00.000Z
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
renameIntertwinedWithSort-Ignore
renameIntertwinedWithSort
FROM employees | eval x = salary | rename x as y | rename y as x | sort x | rename x as y | limit 10;

avg_worked_seconds:l | birth_date:date | emp_no:i | first_name:s | gender:s | height:d | height.float:d | height.half_float:d | height.scaled_float:d| hire_date:date | is_rehired:bool | job_positions:s | languages:i | languages.byte:i | languages.long:l | languages.short:i | last_name:s | salary:i | salary_change:d | salary_change.int:i | salary_change.keyword:s | salary_change.long:l | still_hired:bool | y:i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.Build;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -67,7 +66,6 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100365")
public class EsqlActionIT extends AbstractEsqlIntegTestCase {

long epoch = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
import org.elasticsearch.compute.operator.ProjectOperator;
import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory;
import org.elasticsearch.compute.operator.ShowOperator;
import org.elasticsearch.compute.operator.SinkOperator;
Expand Down Expand Up @@ -334,8 +333,7 @@ private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs,
for (int i = 0; i < blocks.length; i++) {
blocks[i] = p.getBlock(mappedPosition[i]);
}
ProjectOperator.closeUnused(p, blocks);
return new Page(blocks);
return p.newPageAndRelease(blocks);
} : Function.identity();

return transformer;
Expand Down