Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] adding pivot.max_search_page_size option for setting paging size #41920

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,39 @@ public class PivotConfig implements ToXContentObject {

private static final ParseField GROUP_BY = new ParseField("group_by");
private static final ParseField AGGREGATIONS = new ParseField("aggregations");
private static final ParseField SIZE = new ParseField("size");

private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer size;

private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>("pivot_config", true,
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1]));
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2]));

static {
PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS);
PARSER.declareInt(optionalConstructorArg(), SIZE);
}

public static PivotConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) {
PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer size) {
this.groups = groups;
this.aggregationConfig = aggregationConfig;
this.size = size;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(GROUP_BY.getPreferredName(), groups);
builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig);
if (size != null) {
builder.field(SIZE.getPreferredName(), size);
}
builder.endObject();
return builder;
}
Expand All @@ -77,6 +84,10 @@ public GroupConfig getGroupConfig() {
return groups;
}

public Integer getSize() {
return size;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -89,12 +100,14 @@ public boolean equals(Object other) {

final PivotConfig that = (PivotConfig) other;

return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
return Objects.equals(this.groups, that.groups)
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
&& Objects.equals(this.size, that.size);
}

@Override
public int hashCode() {
return Objects.hash(groups, aggregationConfig);
return Objects.hash(groups, aggregationConfig, size);
}

public static Builder builder() {
Expand All @@ -104,6 +117,7 @@ public static Builder builder() {
public static class Builder {
private GroupConfig groups;
private AggregationConfig aggregationConfig;
private Integer size;

/**
* Set how to group the source data
Expand Down Expand Up @@ -135,8 +149,22 @@ public Builder setAggregations(AggregatorFactories.Builder aggregations) {
return this;
}

/**
* Sets the paging maximum paging size that date frame transform can use when
* pulling the data from the source index.
*
* If OOM is triggered, the paging size is dynamically reduced so that the transform can continue to gather data.
*
* @param size Integer value between 10 and 10_000
* @return the {@link Builder} with the paging size set.
*/
public Builder setSize(Integer size) {
this.size = size;
return this;
}

public PivotConfig build() {
return new PivotConfig(groups, aggregationConfig);
return new PivotConfig(groups, aggregationConfig, size);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
public class PivotConfigTests extends AbstractXContentTestCase<PivotConfig> {

public static PivotConfig randomPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
// end::put-data-frame-transform-agg-config
// tag::put-data-frame-transform-pivot-config
PivotConfig pivotConfig = PivotConfig.builder()
.setGroups(groupConfig)
.setAggregationConfig(aggConfig)
.setGroups(groupConfig) // <1>
.setAggregationConfig(aggConfig) // <2>
.setSize(1000) // <3>
.build();
// end::put-data-frame-transform-pivot-config
// tag::put-data-frame-transform-config
Expand Down
5 changes: 5 additions & 0 deletions docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-pivot-config]
--------------------------------------------------
<1> The `GroupConfig` to use in the pivot
<2> The aggregations to use
<3> The maximum paging size for the transform when pulling data
from the source. The size dynamically adjusts as the transform
is running to recover from and prevent OOM issues.

===== GroupConfig
The grouping terms. Defines the group by and destination fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public final class DataFrameField {
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField SIZE = new ParseField("size");
Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou May 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know size is what the search expects and in the context of a search it is a size. However, in datafeeds and now data frame transforms, this is better described as the scroll_size or page_size. I'm not feeling too strongly about this but it's worth considering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, rollup uses page_size, I find size to unspecific


/**
* Fields for checkpointing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class PutDataFrameTransformAction extends Action<AcknowledgedResponse> {

public static final PutDataFrameTransformAction INSTANCE = new PutDataFrameTransformAction();
Expand Down Expand Up @@ -53,7 +55,15 @@ public static Request fromXContent(final XContentParser parser, final String id)

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if(config.getPivotConfig() != null
&& config.getPivotConfig().getSize() != null
&& (config.getPivotConfig().getSize() < 10 || config.getPivotConfig().getSize() > 10_000)) {
validationException = addValidationError(
"pivot.size [" + config.getPivotConfig().getSize() + "] must be greater than 10 and less than 10,000",
validationException);
}
return validationException;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.transforms.pivot;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -29,6 +30,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_pivot";
private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On having size in pivot instead of the transform object itself:

I think the decision on this depends on whether we imagine that a transform will always use a composite aggregation. If yes, then size should be on the top level.


private static final ConstructingObjectParser<PivotConfig, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<PivotConfig, Void> LENIENT_PARSER = createParser(true);
Expand All @@ -55,33 +57,39 @@ private static ConstructingObjectParser<PivotConfig, Void> createParser(boolean
throw new IllegalArgumentException("Required [aggregations]");
}

return new PivotConfig(groups, aggregationConfig);
return new PivotConfig(groups, aggregationConfig, (Integer)args[3]);
});

parser.declareObject(constructorArg(),
(p, c) -> (GroupConfig.fromXContent(p, lenient)), DataFrameField.GROUP_BY);

parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS);
parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS);
parser.declareInt(optionalConstructorArg(), DataFrameField.SIZE);

return parser;
}

public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) {
public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer size) {
this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName());
this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName());
this.size = size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could require a non-null value for this. Then from the REST action we set it to the default value if the user did not specify it. This way the default is not hidden and the user becomes aware of the parameter. We tend to follow this approach unless the default value is dynamically calculated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the current approach as I would not promote this setting like query. My opinion: should be an expert setting.

}

public PivotConfig(StreamInput in) throws IOException {
this.groups = new GroupConfig(in);
this.aggregationConfig = new AggregationConfig(in);
this.size = in.readOptionalInt();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups);
builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig);
if (size != null) {
builder.field(DataFrameField.SIZE.getPreferredName(), size);
}
builder.endObject();
return builder;
}
Expand All @@ -107,6 +115,7 @@ public void toCompositeAggXContent(XContentBuilder builder, Params params) throw
public void writeTo(StreamOutput out) throws IOException {
groups.writeTo(out);
aggregationConfig.writeTo(out);
out.writeOptionalInt(size);
}

public AggregationConfig getAggregationConfig() {
Expand All @@ -117,6 +126,11 @@ public GroupConfig getGroupConfig() {
return groups;
}

@Nullable
public Integer getSize() {
return size;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -129,12 +143,14 @@ public boolean equals(Object other) {

final PivotConfig that = (PivotConfig) other;

return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
return Objects.equals(this.groups, that.groups)
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
&& Objects.equals(this.size, that.size);
}

@Override
public int hashCode() {
return Objects.hash(groups, aggregationConfig);
return Objects.hash(groups, aggregationConfig, size);
}

public boolean isValid() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
public class PivotConfigTests extends AbstractSerializingDataFrameTestCase<PivotConfig> {

public static PivotConfig randomPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}

public static PivotConfig randomInvalidPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomInvalidAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,13 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat

protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
return createPivotConfig(groups, aggregations, null);
}

protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations,
Integer size) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size);
}

protected DataFrameTransformConfig createTransformConfig(String id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio
* the page size, the type of aggregations and the data. As the page size is the number of buckets we return
* per page the page size is a multiplier for the costs of aggregating bucket.
*
* Initially this returns a default, in future it might inspect the configuration and base the initial size
* on the aggregations used.
* The user may set a maximum in the {@link PivotConfig#getSize()}, but if that is not provided,
* the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
*
* In future we might inspect the configuration and base the initial size on the aggregations used.
*
* @return the page size
*/
public int getInitialPageSize() {
return DEFAULT_INITIAL_PAGE_SIZE;
return config.getSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getSize();
}

public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
Expand All @@ -39,7 +41,10 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -169,19 +174,23 @@ public void setUpMocks() {
}

public void testPageSizeAdapt() throws InterruptedException {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
DataFrameTransformConfig config = new DataFrameTransformConfig(randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);

final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] {
new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) });
};

Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);

Consumer<Exception> failureConsumer = e -> {
fail("expected circuit breaker exception to be handled");
};
Consumer<Exception> failureConsumer = e -> fail("expected circuit breaker exception to be handled");

final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
Expand All @@ -197,8 +206,8 @@ public void testPageSizeAdapt() throws InterruptedException {
latch.countDown();
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
long pageSizeAfterFirstReduction = indexer.getPageSize();
assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction);
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));

// run indexer a 2nd time
final CountDownLatch secondRunLatch = indexer.newLatch(1);
Expand All @@ -211,8 +220,8 @@ public void testPageSizeAdapt() throws InterruptedException {
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);

// assert that page size has been reduced again
assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize());
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize()));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));

} finally {
executor.shutdownNow();
Expand Down
Loading