Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114774.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114774
summary: "ESQL: Add support for multivalue fields in Arrow output"
area: ES|QL
type: enhancement
issues: []
1 change: 1 addition & 0 deletions x-pack/plugin/esql/arrow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {

testImplementation project(':test:framework')
testImplementation('org.apache.arrow:arrow-memory-unsafe:16.1.0')
testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}")
}

tasks.named("dependencyLicenses").configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class ArrowResponse implements ChunkedRestResponseBodyPart, Releasable {
public static class Column {
private final BlockConverter converter;
private final String name;
private boolean multivalued;

public Column(String esqlType, String name) {
this.converter = ESQL_CONVERTERS.get(esqlType);
Expand All @@ -61,20 +63,24 @@ public Column(String esqlType, String name) {
public ArrowResponse(List<Column> columns, List<Page> pages) {
this.columns = columns;

// Find multivalued columns
int colSize = columns.size();
for (int col = 0; col < colSize; col++) {
for (Page page : pages) {
if (page.getBlock(col).mayHaveMultivaluedFields()) {
columns.get(col).multivalued = true;
break;
}
}
}

currentSegment = new SchemaResponse(this);
List<ResponseSegment> rest = new ArrayList<>(pages.size());
for (int p = 0; p < pages.size(); p++) {
var page = pages.get(p);

for (Page page : pages) {
rest.add(new PageResponse(this, page));
// Multivalued fields are not supported yet.
for (int b = 0; b < page.getBlockCount(); b++) {
if (page.getBlock(b).mayHaveMultivaluedFields()) {
throw new IllegalArgumentException(
"ES|QL response field [" + columns.get(b).name + "] is multi-valued. This isn't supported yet by the Arrow format"
);
}
}
}

rest.add(new EndResponse(this));
segments = rest.iterator();
}
Expand Down Expand Up @@ -185,6 +191,9 @@ public void close() {}
* @see <a href="https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format">IPC Streaming Format</a>
*/
private static class SchemaResponse extends ResponseSegment {

private static final FieldType LIST_FIELD_TYPE = FieldType.nullable(MinorType.LIST.getType());

private boolean done = false;

SchemaResponse(ArrowResponse response) {
Expand All @@ -204,7 +213,20 @@ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws I
}

private Schema arrowSchema() {
return new Schema(response.columns.stream().map(c -> new Field(c.name, c.converter.arrowFieldType(), List.of())).toList());
return new Schema(response.columns.stream().map(c -> {
var fieldType = c.converter.arrowFieldType();
if (c.multivalued) {
// A variable-sized list is a vector of offsets and a child vector of values
// See https://arrow.apache.org/docs/format/Columnar.html#variable-size-list-layout
var listType = new FieldType(true, LIST_FIELD_TYPE.getType(), null, fieldType.getMetadata());
// Value vector is non-nullable (ES|QL multivalues cannot contain nulls).
var valueType = new FieldType(false, fieldType.getType(), fieldType.getDictionary(), null);
// The nested vector is named "$data$", following what the Arrow/Java library does.
return new Field(c.name, listType, List.of(new Field("$data$", valueType, null)));
} else {
return new Field(c.name, fieldType, null);
}
}).toList());
}
}

Expand Down Expand Up @@ -257,7 +279,14 @@ protected void encodeChunk(int sizeHint, RecyclerBytesStreamOutput out) throws I

@Override
public void write(ArrowBuf buffer) throws IOException {
extraPosition += bufWriters.get(bufIdx++).write(out);
var len = bufWriters.get(bufIdx++).write(out);
// Consistency check
if (len != buffer.writerIndex()) {
throw new IllegalStateException(
"Buffer [" + (bufIdx - 1) + "]: wrote [" + len + "] bytes, but expected [" + buffer.writerIndex() + "]"
);
}
extraPosition += len;
}

@Override
Expand All @@ -277,11 +306,26 @@ public long align() throws IOException {

// Create Arrow buffers for each of the blocks in this page
for (int b = 0; b < page.getBlockCount(); b++) {
var converter = response.columns.get(b).converter;
var column = response.columns.get(b);
var converter = column.converter;

Block block = page.getBlock(b);
nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
converter.convert(block, bufs, bufWriters);
if (column.multivalued) {
// List node.
nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
// Value vector, does not contain nulls.
nodes.add(new ArrowFieldNode(BlockConverter.valueCount(block), 0));
} else {
nodes.add(new ArrowFieldNode(block.getPositionCount(), converter.nullValuesCount(block)));
}
converter.convert(block, column.multivalued, bufs, bufWriters);
}

// Consistency check
if (bufs.size() != bufWriters.size()) {
throw new IllegalStateException(
"Inconsistent Arrow buffers: [" + bufs.size() + "] buffers and [" + bufWriters.size() + "] writers"
);
}

// Create the batch and serialize it
Expand Down
Loading