Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-DataFrame] add HLRC protocol tests for transform state and stats #40766

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public abstract class IndexerJobStats {
private final long indexFailures;
private final long searchFailures;

public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numOuputDocuments = numOutputDocuments;
this.numInvocations = numInvocations;
this.indexTime = indexTime;
this.indexTotal = indexTotal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@

public class DataFrameIndexerTransformStats extends IndexerJobStats {

public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> PARSER = new ConstructingObjectParser<>(
public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true, args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));

static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
return LENIENT_PARSER.parse(parser, null);
}

public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOuputDocuments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameIndexerTransformStats extends IndexerJobStats {
private static final String DEFAULT_TRANSFORM_ID = "_all";
public static final String DEFAULT_TRANSFORM_ID = "_all";

public static final String NAME = "data_frame_indexer_transform_stats";
public static ParseField NUM_PAGES = new ParseField("pages_processed");
Expand All @@ -37,23 +37,25 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");

public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> PARSER = new ConstructingObjectParser<>(
NAME, args -> new DataFrameIndexerTransformStats((String) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9], (long) args[10]));
private static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true,
args -> new DataFrameIndexerTransformStats(args[0] != null ? (String) args[0] : DEFAULT_TRANSFORM_ID,
(long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7],
(long) args[8], (long) args[9], (long) args[10]));

static {
PARSER.declareString(optionalConstructorArg(), DataFrameField.ID);
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
PARSER.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
LENIENT_PARSER.declareString(optionalConstructorArg(), DataFrameField.ID);
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
}

private final String transformId;
Expand Down Expand Up @@ -197,7 +199,7 @@ public int hashCode() {

public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);
return LENIENT_PARSER.parse(parser, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -29,7 +30,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
private final DataFrameIndexerTransformStats transformStats;

public static final ConstructingObjectParser<DataFrameTransformStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
NAME,
NAME, true,
a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2]));

static {
Expand Down Expand Up @@ -110,4 +111,9 @@ public DataFrameIndexerTransformStats getTransformStats() {
public DataFrameTransformState getTransformState() {
return transformState;
}

@Override
public String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStatsTests;

import java.io.IOException;

public class DataFrameIndexerTransformStatsHlrcTests extends AbstractHlrcXContentTestCase<
DataFrameIndexerTransformStats,
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats> {

public static DataFrameIndexerTransformStats fromHlrc(
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats instance) {
return DataFrameIndexerTransformStats.withDefaultTransformId(instance.getNumPages(), instance.getNumDocuments(),
instance.getOutputDocuments(), instance.getNumInvocations(), instance.getIndexTime(), instance.getSearchTime(),
instance.getIndexTotal(), instance.getSearchTotal(), instance.getIndexFailures(), instance.getSearchFailures());
}

@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats doHlrcParseInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats.fromXContent(parser);
}

@Override
public DataFrameIndexerTransformStats convertHlrcToInternal(
org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats instance) {
return fromHlrc(instance);
}

@Override
protected DataFrameIndexerTransformStats createTestInstance() {
return DataFrameIndexerTransformStatsTests.randomStats(DataFrameIndexerTransformStats.DEFAULT_TRANSFORM_ID);
}

@Override
protected DataFrameIndexerTransformStats doParseInstance(XContentParser parser) throws IOException {
return DataFrameIndexerTransformStats.fromXContent(parser);
}

@Override
protected boolean supportsUnknownFields() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;

import java.io.IOException;
import java.util.function.Predicate;

public class DataFrameTransformStateAndStatsHlrcTests extends AbstractHlrcXContentTestCase<DataFrameTransformStateAndStats,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats> {

@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats doHlrcParseInstance(XContentParser parser)
throws IOException {
return org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats.fromXContent(parser);
}

@Override
public DataFrameTransformStateAndStats convertHlrcToInternal(
org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats instance) {
return new DataFrameTransformStateAndStats(instance.getId(),
DataFrameTransformStateHlrcTests.fromHlrc(instance.getTransformState()),
DataFrameIndexerTransformStatsHlrcTests.fromHlrc(instance.getTransformStats()));
}

@Override
protected DataFrameTransformStateAndStats createTestInstance() {
// the transform id is not part of HLRC as it's only to a field for internal storage, therefore use a default id
return DataFrameTransformStateAndStatsTests
.randomDataFrameTransformStateAndStats(DataFrameIndexerTransformStats.DEFAULT_TRANSFORM_ID);
}

@Override
protected DataFrameTransformStateAndStats doParseInstance(XContentParser parser) throws IOException {
return DataFrameTransformStateAndStats.PARSER.apply(parser, null);
}

@Override
protected boolean supportsUnknownFields() {
return true;
}

@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.equals("state.current_position");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe.transforms;
package org.elasticsearch.xpack.core.dataframe.transforms.hlrc;

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.protocol.AbstractHlrcXContentTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.indexing.IndexerState;

import java.io.IOException;
Expand All @@ -16,6 +19,12 @@
public class DataFrameTransformStateHlrcTests extends AbstractHlrcXContentTestCase<DataFrameTransformState,
org.elasticsearch.client.dataframe.transforms.DataFrameTransformState> {

public static DataFrameTransformState fromHlrc(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()), instance.getPosition(), instance.getGeneration(),
instance.getReason());
}

@Override
public org.elasticsearch.client.dataframe.transforms.DataFrameTransformState doHlrcParseInstance(XContentParser parser)
throws IOException {
Expand All @@ -24,9 +33,7 @@ public org.elasticsearch.client.dataframe.transforms.DataFrameTransformState doH

@Override
public DataFrameTransformState convertHlrcToInternal(org.elasticsearch.client.dataframe.transforms.DataFrameTransformState instance) {
return new DataFrameTransformState(DataFrameTransformTaskState.fromString(instance.getTaskState().value()),
IndexerState.fromString(instance.getIndexerState().value()),
instance.getPosition(), instance.getGeneration(), instance.getReason());
return fromHlrc(instance);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private static DataFrameIndexerTransformStats parseFromSource(BytesReference sou
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return DataFrameIndexerTransformStats.PARSER.apply(parser, null);
return DataFrameIndexerTransformStats.fromXContent(parser);
}
}
}