Skip to content

Commit

Permalink
[7.13] [ML] [Transform] fix bug where group_by ordering could break w…
Browse files Browse the repository at this point in the history
…hen serializing between nodes (#72016) (#72027)

* [ML] [Transform] fix bug where group_by ordering could break when serializing between nodes (#72016)

If a PUT request for a transform is not made against a master node, it is possible that the transform config is serialized to another node. When that occurs, the group_by keys could be arbitrarily ordered (because the READING from the wire stream did not respect insertion order).

This commit adds a test to make sure that group_by order is respected when:

serializing between nodes
serializing to/from xcontent
when writing composite aggregation sources
The bug fix is rather simple, change the wire stream input to respect the insertion order.
  • Loading branch information
benwtrent committed Apr 21, 2021
1 parent 45067c0 commit cb87e46
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public GroupConfig(final Map<String, Object> source, final Map<String, SingleGro

public GroupConfig(StreamInput in) throws IOException {
source = in.readMap();
groups = in.readMap(StreamInput::readString, (stream) -> {
groups = in.readOrderedMap(StreamInput::readString, (stream) -> {
SingleGroupSource.Type groupType = SingleGroupSource.Type.fromId(stream.readByte());
switch (groupType) {
case TERMS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteClusterMinimumVersionValidation;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
Expand All @@ -28,10 +32,13 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.TestMatchers.matchesPattern;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomInvalidSourceConfig;
Expand All @@ -41,6 +48,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.contains;

public class TransformConfigTests extends AbstractSerializingTransformTestCase<TransformConfig> {

Expand Down Expand Up @@ -588,6 +596,65 @@ public void testGetAdditionalValidations_WithRuntimeMappings() throws IOExceptio
assertThat(remoteClusterMinimumVersionValidation.getReason(), is(equalTo("source.runtime_mappings field was set")));
}

public void testGroupByStayInOrder() throws IOException {
String json = "{"
+ " \"id\" : \"" + transformId +"\","
+ " \"source\" : {"
+ " \"index\":\"src\""
+ "},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"time\": {"
+ " \"date_histogram\": {"
+ " \"field\": \"timestamp\","
+ " \"fixed_interval\": \"1d\""
+ "} },"
+ " \"alert\": {"
+ " \"terms\": {"
+ " \"field\": \"alert\""
+ "} },"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
TransformConfig transformConfig = createTransformConfigFromString(json, transformId, true);
List<String> originalGroups = new ArrayList<>(transformConfig.getPivotConfig().getGroupConfig().getGroups().keySet());
assertThat(
originalGroups,
contains("time", "alert", "id")
);
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
// Wire serialization order guarantees
TransformConfig serialized = this.copyInstance(transformConfig);
List<String> serializedGroups = new ArrayList<>(serialized.getPivotConfig().getGroupConfig().getGroups().keySet());
assertThat(serializedGroups, equalTo(originalGroups));
CompositeAggregationBuilder compositeAggregationBuilder = createCompositeAggregationSources(serialized.getPivotConfig());
assertThat(
compositeAggregationBuilder.sources().stream().map(CompositeValuesSourceBuilder::name).collect(Collectors.toList()),
equalTo(originalGroups)
);

// Now test xcontent serialization and parsing on wire serialized object
XContentType xContentType = randomFrom(XContentType.values());
BytesReference ref = XContentHelper.toXContent(serialized, xContentType, getToXContentParams(), false);
XContentParser parser = this.createParser(XContentFactory.xContent(xContentType), ref);
TransformConfig parsed = doParseInstance(parser);
List<String> parsedGroups = new ArrayList<>(parsed.getPivotConfig().getGroupConfig().getGroups().keySet());
assertThat(parsedGroups, equalTo(originalGroups));
compositeAggregationBuilder = createCompositeAggregationSources(parsed.getPivotConfig());
assertThat(
compositeAggregationBuilder.sources().stream().map(CompositeValuesSourceBuilder::name).collect(Collectors.toList()),
equalTo(originalGroups)
);
}
}

private TransformConfig createTransformConfigFromString(String json, String id) throws IOException {
return createTransformConfigFromString(json, id, false);
}
Expand All @@ -597,4 +664,22 @@ private TransformConfig createTransformConfigFromString(String json, String id,
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
return TransformConfig.fromXContent(parser, id, lenient);
}

private CompositeAggregationBuilder createCompositeAggregationSources(PivotConfig config) throws IOException {
CompositeAggregationBuilder compositeAggregation;

try (XContentBuilder builder = jsonBuilder()) {
config.toCompositeAggXContent(builder);
XContentParser parser = builder.generator()
.contentType()
.xContent()
.createParser(
xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
BytesReference.bytes(builder).streamInput()
);
compositeAggregation = CompositeAggregationBuilder.PARSER.parse(parser, "composite_agg");
}
return compositeAggregation;
}
}

0 comments on commit cb87e46

Please sign in to comment.