Skip to content

Commit

Permalink
[ML] adding pivot.max_search_page_size option for setting paging size (
Browse files Browse the repository at this point in the history
…elastic#41920)

* [ML] adding pivot.size option for setting paging size

* Changing field name to address PR comments

* fixing ctor usage

* adjust hlrc for field name change
  • Loading branch information
benwtrent authored and Gurkan Kaymak committed May 27, 2019
1 parent a71fbc3 commit 8fbd059
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,32 +39,39 @@ public class PivotConfig implements ToXContentObject {

private static final ParseField GROUP_BY = new ParseField("group_by");
private static final ParseField AGGREGATIONS = new ParseField("aggregations");
private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");

private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer maxPageSearchSize;

private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>("pivot_config", true,
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1]));
args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2]));

static {
PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS);
PARSER.declareInt(optionalConstructorArg(), MAX_PAGE_SEARCH_SIZE);
}

public static PivotConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) {
PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
this.groups = groups;
this.aggregationConfig = aggregationConfig;
this.maxPageSearchSize = maxPageSearchSize;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(GROUP_BY.getPreferredName(), groups);
builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig);
if (maxPageSearchSize != null) {
builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
}
builder.endObject();
return builder;
}
Expand All @@ -77,6 +84,10 @@ public GroupConfig getGroupConfig() {
return groups;
}

public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -89,12 +100,14 @@ public boolean equals(Object other) {

final PivotConfig that = (PivotConfig) other;

return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
return Objects.equals(this.groups, that.groups)
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
&& Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
}

@Override
public int hashCode() {
return Objects.hash(groups, aggregationConfig);
return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
}

public static Builder builder() {
Expand All @@ -104,6 +117,7 @@ public static Builder builder() {
public static class Builder {
private GroupConfig groups;
private AggregationConfig aggregationConfig;
private Integer maxPageSearchSize;

/**
* Set how to group the source data
Expand Down Expand Up @@ -135,8 +149,22 @@ public Builder setAggregations(AggregatorFactories.Builder aggregations) {
return this;
}

/**
* Sets the paging maximum paging maxPageSearchSize that date frame transform can use when
* pulling the data from the source index.
*
* If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
*
* @param maxPageSearchSize Integer value between 10 and 10_000
* @return the {@link Builder} with the paging maxPageSearchSize set.
*/
public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
this.maxPageSearchSize = maxPageSearchSize;
return this;
}

public PivotConfig build() {
return new PivotConfig(groups, aggregationConfig);
return new PivotConfig(groups, aggregationConfig, maxPageSearchSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
public class PivotConfigTests extends AbstractXContentTestCase<PivotConfig> {

public static PivotConfig randomPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
// end::put-data-frame-transform-agg-config
// tag::put-data-frame-transform-pivot-config
PivotConfig pivotConfig = PivotConfig.builder()
.setGroups(groupConfig)
.setAggregationConfig(aggConfig)
.setGroups(groupConfig) // <1>
.setAggregationConfig(aggConfig) // <2>
.setMaxPageSearchSize(1000) // <3>
.build();
// end::put-data-frame-transform-pivot-config
// tag::put-data-frame-transform-config
Expand Down
5 changes: 5 additions & 0 deletions docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-pivot-config]
--------------------------------------------------
<1> The `GroupConfig` to use in the pivot
<2> The aggregations to use
<3> The maximum paging size for the transform when pulling data
from the source. The size dynamically adjusts as the transform
is running to recover from and prevent OOM issues.

===== GroupConfig
The grouping terms. Defines the group by and destination fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public final class DataFrameField {
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");

/**
* Fields for checkpointing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public static Request fromXContent(final XContentParser parser, final String id)
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if(config.getPivotConfig() != null
&& config.getPivotConfig().getMaxPageSearchSize() != null
&& (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) {
validationException = addValidationError(
"pivot.max_page_search_size [" +
config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000",
validationException);
}
for(String failure : config.getPivotConfig().aggFieldValidation()) {
validationException = addValidationError(failure, validationException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.transforms.pivot;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -35,6 +36,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
private static final String NAME = "data_frame_transform_pivot";
private final GroupConfig groups;
private final AggregationConfig aggregationConfig;
private final Integer maxPageSearchSize;

private static final ConstructingObjectParser<PivotConfig, Void> STRICT_PARSER = createParser(false);
private static final ConstructingObjectParser<PivotConfig, Void> LENIENT_PARSER = createParser(true);
Expand All @@ -61,33 +63,39 @@ private static ConstructingObjectParser<PivotConfig, Void> createParser(boolean
throw new IllegalArgumentException("Required [aggregations]");
}

return new PivotConfig(groups, aggregationConfig);
return new PivotConfig(groups, aggregationConfig, (Integer)args[3]);
});

parser.declareObject(constructorArg(),
(p, c) -> (GroupConfig.fromXContent(p, lenient)), DataFrameField.GROUP_BY);

parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS);
parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS);
parser.declareInt(optionalConstructorArg(), DataFrameField.MAX_PAGE_SEARCH_SIZE);

return parser;
}

public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) {
public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName());
this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName());
this.maxPageSearchSize = maxPageSearchSize;
}

public PivotConfig(StreamInput in) throws IOException {
this.groups = new GroupConfig(in);
this.aggregationConfig = new AggregationConfig(in);
this.maxPageSearchSize = in.readOptionalInt();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups);
builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig);
if (maxPageSearchSize != null) {
builder.field(DataFrameField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
}
builder.endObject();
return builder;
}
Expand All @@ -113,6 +121,7 @@ public void toCompositeAggXContent(XContentBuilder builder, Params params) throw
public void writeTo(StreamOutput out) throws IOException {
groups.writeTo(out);
aggregationConfig.writeTo(out);
out.writeOptionalInt(maxPageSearchSize);
}

public AggregationConfig getAggregationConfig() {
Expand All @@ -123,6 +132,11 @@ public GroupConfig getGroupConfig() {
return groups;
}

@Nullable
public Integer getMaxPageSearchSize() {
return maxPageSearchSize;
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -135,12 +149,14 @@ public boolean equals(Object other) {

final PivotConfig that = (PivotConfig) other;

return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
return Objects.equals(this.groups, that.groups)
&& Objects.equals(this.aggregationConfig, that.aggregationConfig)
&& Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
}

@Override
public int hashCode() {
return Objects.hash(groups, aggregationConfig);
return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
}

public boolean isValid() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
public class PivotConfigTests extends AbstractSerializingDataFrameTestCase<PivotConfig> {

public static PivotConfig randomPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}

public static PivotConfig randomInvalidPivotConfig() {
return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig());
return new PivotConfig(GroupConfigTests.randomGroupConfig(),
AggregationConfigTests.randomInvalidAggregationConfig(),
randomBoolean() ? null : randomIntBetween(10, 10_000));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,13 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat

protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
return createPivotConfig(groups, aggregations, null);
}

protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
AggregatorFactories.Builder aggregations,
Integer size) throws Exception {
return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size);
}

protected DataFrameTransformConfig createTransformConfig(String id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testGetProgress() throws Exception {
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,
destConfig,
Expand All @@ -149,7 +149,7 @@ public void testGetProgress() throws Exception {


QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig);
config = new DataFrameTransformConfig("get_progress_transform",
sourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio
* the page size, the type of aggregations and the data. As the page size is the number of buckets we return
* per page the page size is a multiplier for the costs of aggregating bucket.
*
* Initially this returns a default, in future it might inspect the configuration and base the initial size
* on the aggregations used.
* The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
* the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
*
* In future we might inspect the configuration and base the initial size on the aggregations used.
*
* @return the page size
*/
public int getInitialPageSize() {
return DEFAULT_INITIAL_PAGE_SIZE;
return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize();
}

public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
Expand All @@ -39,7 +41,10 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -169,19 +174,23 @@ public void setUpMocks() {
}

public void testPageSizeAdapt() throws InterruptedException {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
DataFrameTransformConfig config = new DataFrameTransformConfig(randomAlphaOfLength(10),
randomSourceConfig(),
randomDestConfig(),
null,
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);

final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] {
new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) });
};

Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);

Consumer<Exception> failureConsumer = e -> {
fail("expected circuit breaker exception to be handled");
};
Consumer<Exception> failureConsumer = e -> fail("expected circuit breaker exception to be handled");

final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
Expand All @@ -197,8 +206,8 @@ public void testPageSizeAdapt() throws InterruptedException {
latch.countDown();
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
long pageSizeAfterFirstReduction = indexer.getPageSize();
assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction);
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));

// run indexer a 2nd time
final CountDownLatch secondRunLatch = indexer.newLatch(1);
Expand All @@ -211,8 +220,8 @@ public void testPageSizeAdapt() throws InterruptedException {
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);

// assert that page size has been reduced again
assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize());
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize()));
assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));

} finally {
executor.shutdownNow();
Expand Down
Loading

0 comments on commit 8fbd059

Please sign in to comment.