Skip to content

Commit

Permalink
ESQL: Add type to layout (#99327)
Browse files Browse the repository at this point in the history
We want it in a few places.
  • Loading branch information
nik9000 committed Sep 8, 2023
1 parent 93b56d7 commit 443c53c
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.openjdk.jmh.annotations.Warmup;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -133,9 +134,7 @@ private static FieldAttribute intField() {

private static Layout layout(FieldAttribute... fields) {
Layout.Builder layout = new Layout.Builder();
for (FieldAttribute field : fields) {
layout.appendChannel(field.id());
}
layout.append(Arrays.asList(fields));
return layout.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Block eval(Page page) {
return page.getBlock(channel);
}
}
int channel = layout.getChannel(attr.id());
int channel = layout.get(attr.id()).channel();
return () -> new Attribute(channel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public final PhysicalOperation groupingPhysicalOperation(

// append channels to the layout
if (mode == AggregateExec.Mode.FINAL) {
layout.appendChannels(aggregates);
layout.append(aggregates);
} else {
layout.appendChannels(aggregateMapper.mapNonGrouping(aggregates));
layout.append(aggregateMapper.mapNonGrouping(aggregates));
}
// create the agg factories
aggregatesToFactory(
Expand All @@ -87,8 +87,8 @@ public final PhysicalOperation groupingPhysicalOperation(
if (groupAttribute == null) {
throw new EsqlIllegalArgumentException("Unexpected non-named expression[{}] as grouping in [{}]", group, aggregateExec);
}
Set<NameId> grpAttribIds = new HashSet<>();
grpAttribIds.add(groupAttribute.id());
Layout.ChannelSet groupAttributeLayout = new Layout.ChannelSet(new HashSet<>(), groupAttribute.dataType());
groupAttributeLayout.nameIds().add(groupAttribute.id());

/*
* Check for aliasing in aggregates which occurs in two cases (due to combining project + stats):
Expand All @@ -99,10 +99,9 @@ public final PhysicalOperation groupingPhysicalOperation(
if (agg instanceof Alias a) {
if (a.child() instanceof Attribute attr) {
if (groupAttribute.id().equals(attr.id())) {
grpAttribIds.add(a.id());
groupAttributeLayout.nameIds().add(a.id());
// TODO: investigate whether a break could be used since it shouldn't be possible to have multiple
// attributes
// pointing to the same attribute
// attributes pointing to the same attribute
}
// partial mode only
// check if there's any alias used in grouping - no need for the final reduction since the intermediate data
Expand All @@ -117,18 +116,19 @@ else if (mode == AggregateExec.Mode.PARTIAL) {
}
}
}
layout.appendChannel(grpAttribIds);
groupSpecs.add(new GroupSpec(source.layout.getChannel(groupAttribute.id()), groupAttribute));
layout.append(groupAttributeLayout);
Layout.ChannelAndType groupInput = source.layout.get(groupAttribute.id());
groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), groupAttribute));
}

if (mode == AggregateExec.Mode.FINAL) {
for (var agg : aggregates) {
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction) {
layout.appendChannel(alias.id());
layout.append(alias);
}
}
} else {
layout.appendChannels(aggregateMapper.mapGrouping(aggregates));
layout.append(aggregateMapper.mapGrouping(aggregates));
}

// create the agg factories
Expand Down Expand Up @@ -252,7 +252,7 @@ private void aggregatesToFactory(
params[i] = aggParams.get(i).fold();
}

List<Integer> inputChannels = sourceAttr.stream().map(NamedExpression::id).map(layout::getChannel).toList();
List<Integer> inputChannels = sourceAttr.stream().map(attr -> layout.get(attr.id()).channel()).toList();
assert inputChannels != null && inputChannels.size() > 0 && inputChannels.stream().allMatch(i -> i >= 0);
if (aggregateFunction instanceof ToAggregator agg) {
consumer.accept(new AggFunctionSupplierContext(agg.supplier(bigArrays, inputChannels), aggMode));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.xpack.ql.expression.NameId;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

class DefaultLayout implements Layout {
private final Map<NameId, ChannelAndType> layout;
private final int numberOfChannels;

DefaultLayout(Map<NameId, ChannelAndType> layout, int numberOfChannels) {
this.layout = layout;
this.numberOfChannels = numberOfChannels;
}

@Override
public ChannelAndType get(NameId id) {
return layout.get(id);
}

/**
* @return the total number of channels in the layout.
*/
@Override
public int numberOfChannels() {
return numberOfChannels;
}

@Override
public Map<Integer, Set<NameId>> inverse() {
Map<Integer, Set<NameId>> inverse = new HashMap<>();
for (Map.Entry<NameId, ChannelAndType> entry : layout.entrySet()) {
NameId key = entry.getKey();
Integer value = entry.getValue().channel();
inverse.computeIfAbsent(value, k -> new HashSet<>()).add(key);
}
return inverse;
}

/**
* @return creates a builder to append to this layout.
*/
@Override
public Layout.Builder builder() {
return new Builder(numberOfChannels, layout);
}

@Override
public String toString() {
return "Layout{" + "layout=" + layout + ", numberOfChannels=" + numberOfChannels + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi

PhysicalOperation op = source;
for (Attribute attr : fieldExtractExec.attributesToExtract()) {
layout.appendChannel(attr.id());
layout.append(attr);
Layout previousLayout = op.layout;

var sources = ValueSources.sources(
Expand All @@ -71,7 +71,7 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
LocalExecutionPlanner.toElementType(attr.dataType())
);

int docChannel = previousLayout.getChannel(sourceAttr.id());
int docChannel = previousLayout.get(sourceAttr.id()).channel();

op = op.with(
new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(sources, docChannel, attr.name()),
Expand Down Expand Up @@ -137,9 +137,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
);
}
Layout.Builder layout = new Layout.Builder();
for (int i = 0; i < esQueryExec.output().size(); i++) {
layout.appendChannel(esQueryExec.output().get(i).id());
}
layout.append(esQueryExec.output());
int instanceCount = Math.max(1, luceneFactory.taskConcurrency());
context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount));
return PhysicalOperation.fromSource(luceneFactory, layout.build());
Expand All @@ -155,7 +153,7 @@ public final Operator.OperatorFactory ordinalGroupingOperatorFactory(
LocalExecutionPlannerContext context
) {
var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child());
int docChannel = source.layout.getChannel(sourceAttribute.id());
int docChannel = source.layout.get(sourceAttribute.id()).channel();
// The grouping-by values are ready, let's group on them directly.
// Costin: why are they ready and not already exposed in the layout?
return new OrdinalsGroupingOperator.OrdinalsGroupingOperatorFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,32 @@

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.xpack.ql.expression.NameId;

import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptyMap;

/**
* Decorating layout that creates the NameId -> Value lazily based on the calls made to its content.
* Essentially it maps the existing (old) NameIds to the new ones.
*/
class ExchangeLayout extends Layout {

private final Map<NameId, Integer> delegate;
class ExchangeLayout implements Layout {
private final Layout delegate;
private final Map<Integer, Set<NameId>> inverse;
private final Map<NameId, NameId> mappingToOldLayout;
private int counter;

ExchangeLayout(Layout layout) {
super(emptyMap(), 0);
this.delegate = layout.internalLayout();
this.mappingToOldLayout = Maps.newMapWithExpectedSize(delegate.size());
this.inverse = Maps.newMapWithExpectedSize(delegate.size());

for (Map.Entry<NameId, Integer> entry : delegate.entrySet()) {
NameId key = entry.getKey();
Integer value = entry.getValue();
inverse.computeIfAbsent(value, k -> new HashSet<>()).add(key);
}
ExchangeLayout(Layout delegate) {
this.delegate = delegate;
this.inverse = delegate.inverse();
this.mappingToOldLayout = new HashMap<>(inverse.size());
}

@Override
public Integer getChannel(NameId id) {
public ChannelAndType get(NameId id) {
var oldId = mappingToOldLayout.get(id);
if (oldId == null && counter < delegate.size()) {
if (oldId == null && counter < inverse.size()) {
var names = inverse.get(counter++);
for (var name : names) {
oldId = name;
Expand All @@ -54,12 +43,22 @@ public Integer getChannel(NameId id) {
}

@Override
public int numberOfIds() {
return delegate.size();
public int numberOfChannels() {
return delegate.numberOfChannels();
}

@Override
public int numberOfChannels() {
return inverse.size();
public String toString() {
return "ExchangeLayout{" + delegate + '}';
}

@Override
public Builder builder() {
throw new UnsupportedOperationException();
}

@Override
public Map<Integer, Set<NameId>> inverse() {
throw new UnsupportedOperationException();
}
}

0 comments on commit 443c53c

Please sign in to comment.