Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Page shouldn't close a block twice #100370

Merged
merged 6 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/100370.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 100370
summary: "ESQL: Page shouldn't close a block twice"
area: ES|QL
type: bug
issues:
- 100356
- 100365
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Objects;

/**
Expand Down Expand Up @@ -88,9 +90,7 @@ private Page(Page prev, Block[] toAdd) {
this.positionCount = prev.positionCount;

this.blocks = Arrays.copyOf(prev.blocks, prev.blocks.length + toAdd.length);
for (int i = 0; i < toAdd.length; i++) {
this.blocks[prev.blocks.length + i] = toAdd[i];
}
System.arraycopy(toAdd, 0, this.blocks, prev.blocks.length, toAdd.length);
Comment on lines -91 to +93
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small improvement.

}

public Page(StreamInput in) throws IOException {
Expand Down Expand Up @@ -169,8 +169,8 @@ public Page appendPage(Page toAdd) {
@Override
public int hashCode() {
int result = Objects.hash(positionCount);
for (int i = 0; i < blocks.length; i++) {
result = 31 * result + Objects.hashCode(blocks[i]);
for (Block block : blocks) {
result = 31 * result + Objects.hashCode(block);
}
return result;
}
Expand Down Expand Up @@ -221,7 +221,48 @@ public void writeTo(StreamOutput out) throws IOException {
* Release all blocks in this page, decrementing any breakers accounting for these blocks.
*/
public void releaseBlocks() {
if (blocksReleased) {
return;
}

blocksReleased = true;
Releasables.closeExpectNoException(blocks);

// blocks can be used as multiple columns
var map = new IdentityHashMap<Block, Boolean>(mapSize(blocks.length));
for (Block b : blocks) {
if (map.putIfAbsent(b, Boolean.TRUE) == null) {
Releasables.closeExpectNoException(b);
}
}
}

/**
* Returns a Page from the given blocks and closes all blocks that are not included, from the current Page.
* That is, allows clean-up of the current page _after_ external manipulation of the blocks.
* The current page should no longer be used and be considered closed.
*/
public Page newPageAndRelease(Block... keep) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've introduced this method for handling the case of reusing some blocks from an old page into a new one.
This happens in ProjectExec and OutputExec (inside the planner) - I've moved this method from the Project into the page and clean-it up a bit (the usual reduction complexity from using two lists - O(N*M) to O(N) with the extra memory for the map).

if (blocksReleased) {
throw new IllegalStateException("can't create new page from already released page");
}

blocksReleased = true;

var newPage = new Page(positionCount, keep);
var set = Collections.newSetFromMap(new IdentityHashMap<Block, Boolean>(mapSize(keep.length)));
set.addAll(Arrays.asList(keep));

// close blocks that have been left out
for (Block b : blocks) {
if (set.contains(b) == false) {
Releasables.closeExpectNoException(b);
}
}

return newPage;
}

static int mapSize(int expectedSize) {
return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0);
Copy link
Contributor

@bpintea bpintea Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CollectionUtils.mapSize()?
Edit: not avail in compute package, disregard.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -70,30 +68,7 @@ protected Page process(Page page) {
var block = page.getBlock(source);
blocks[b++] = block;
}
closeUnused(page, blocks);
return new Page(page.getPositionCount(), blocks);
}

/**
* Close all {@link Block}s that are in {@code page} but are not in {@code blocks}.
*/
public static void closeUnused(Page page, Block[] blocks) {
List<Releasable> blocksToRelease = new ArrayList<>();

for (int i = 0; i < page.getBlockCount(); i++) {
boolean used = false;
var current = page.getBlock(i);
for (int j = 0; j < blocks.length; j++) {
if (current == blocks[j]) {
used = true;
break;
}
}
if (used == false) {
blocksToRelease.add(current);
}
}
Releasables.close(blocksToRelease);
return page.newPageAndRelease(blocks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,25 @@ public void testSerializationListPages() throws IOException {
}
}

public void testPageMultiRelease() {
int positions = randomInt(1024);
var block = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock();
Page page = new Page(block);
page.releaseBlocks();
assertThat(block.isReleased(), is(true));
page.releaseBlocks();
}

public void testNewPageAndRelease() {
int positions = randomInt(1024);
var blockA = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock();
var blockB = new IntArrayVector(IntStream.range(0, positions).toArray(), positions).asBlock();
Page page = new Page(blockA, blockB);
Page newPage = page.newPageAndRelease(blockA);
assertThat(blockA.isReleased(), is(false));
assertThat(blockB.isReleased(), is(true));
}

BytesRefArray bytesRefArrayOf(String... values) {
var array = new BytesRefArray(values.length, bigArrays);
Arrays.stream(values).map(BytesRef::new).forEach(array::append);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.indices.breaker.CircuitBreakerService;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.LongStream;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -59,15 +56,12 @@ public void testProjection() {
var out = projection.getOutput();
assertThat(randomProjection.size(), lessThanOrEqualTo(out.getBlockCount()));

Set<Block> blks = new HashSet<>();
for (int i = 0; i < out.getBlockCount(); i++) {
var block = out.<IntBlock>getBlock(i);
assertEquals(block, page.getBlock(randomProjection.get(i)));
blks.add(block);
assertEquals(blocks[randomProjection.get(i)], block);
}

// close all blocks separately since the same block can be used by multiple columns (aliased)
Releasables.closeWhileHandlingException(blks.toArray(new Block[0]));
out.releaseBlocks();
}

private List<Integer> randomProjection(int size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ x:integer | z:integer
4 | 8
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
renameProjectEval-Ignore
renameProjectEval
from employees | sort emp_no | eval y = languages | rename languages as x | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3;

x:integer | y:integer | x2:integer | y2:integer
Expand All @@ -94,8 +93,7 @@ x:integer | y:integer | x2:integer | y2:integer
4 | 4 | 5 | 6
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
duplicateProjectEval-Ignore
duplicateProjectEval
from employees | eval y = languages, x = languages | keep x, y | eval x2 = x + 1 | eval y2 = y + 2 | limit 3;

x:integer | y:integer | x2:integer | y2:integer
Expand Down Expand Up @@ -160,8 +158,7 @@ y:integer | x:date
10061 | 1985-09-17T00:00:00.000Z
;

# AwaitsFix https://github.com/elastic/elasticsearch/issues/100356
renameIntertwinedWithSort-Ignore
renameIntertwinedWithSort
FROM employees | eval x = salary | rename x as y | rename y as x | sort x | rename x as y | limit 10;

avg_worked_seconds:l | birth_date:date | emp_no:i | first_name:s | gender:s | height:d | height.float:d | height.half_float:d | height.scaled_float:d| hire_date:date | is_rehired:bool | job_positions:s | languages:i | languages.byte:i | languages.long:l | languages.short:i | last_name:s | salary:i | salary_change:d | salary_change.int:i | salary_change.keyword:s | salary_change.long:l | still_hired:bool | y:i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.action;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.Build;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand Down Expand Up @@ -67,7 +66,6 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100365")
public class EsqlActionIT extends AbstractEsqlIntegTestCase {

long epoch = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
import org.elasticsearch.compute.operator.ProjectOperator;
import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory;
import org.elasticsearch.compute.operator.ShowOperator;
import org.elasticsearch.compute.operator.SinkOperator;
Expand Down Expand Up @@ -334,8 +333,7 @@ private static Function<Page, Page> alignPageToAttributes(List<Attribute> attrs,
for (int i = 0; i < blocks.length; i++) {
blocks[i] = p.getBlock(mappedPosition[i]);
}
ProjectOperator.closeUnused(p, blocks);
return new Page(blocks);
return p.newPageAndRelease(blocks);
} : Function.identity();

return transformer;
Expand Down