Skip to content

Commit

Permalink
[Transform] cleanup code from < 7.17 wire serialization checks (#91150)
Browse files Browse the repository at this point in the history
remove unnecessary wire serialization checks and parsers
  • Loading branch information
Hendrik Muhs committed Oct 27, 2022
1 parent bb44f7f commit bf12b75
Show file tree
Hide file tree
Showing 17 changed files with 104 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -58,11 +57,7 @@ public DestConfig(String index, String pipeline) {

public DestConfig(final StreamInput in) throws IOException {
index = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
pipeline = in.readOptionalString();
} else {
pipeline = null;
}
pipeline = in.readOptionalString();
}

public String getIndex() {
Expand All @@ -85,9 +80,7 @@ public void checkForDeprecations(String id, NamedXContentRegistry namedXContentR
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalString(pipeline);
}
out.writeOptionalString(pipeline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,10 @@ public SettingsConfig(
public SettingsConfig(final StreamInput in) throws IOException {
this.maxPageSearchSize = in.readOptionalInt();
this.docsPerSecond = in.readOptionalFloat();
if (in.getVersion().onOrAfter(Version.V_7_11_0)) {
this.datesAsEpochMillis = in.readOptionalInt();
} else {
this.datesAsEpochMillis = DEFAULT_DATES_AS_EPOCH_MILLIS;
}
if (in.getVersion().onOrAfter(Version.V_7_15_0)) {
this.alignCheckpoints = in.readOptionalInt();
} else {
this.alignCheckpoints = DEFAULT_ALIGN_CHECKPOINTS;
}
if (in.getVersion().onOrAfter(Version.V_7_16_1)) {
this.usePit = in.readOptionalInt();
} else {
this.usePit = DEFAULT_USE_PIT;
}
this.datesAsEpochMillis = in.readOptionalInt();
this.alignCheckpoints = in.readOptionalInt();
this.usePit = in.readOptionalInt();

if (in.getVersion().onOrAfter(Version.V_8_1_0)) {
deduceMappings = in.readOptionalInt();
} else {
Expand Down Expand Up @@ -286,15 +275,10 @@ public void checkForDeprecations(String id, NamedXContentRegistry namedXContentR
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInt(maxPageSearchSize);
out.writeOptionalFloat(docsPerSecond);
if (out.getVersion().onOrAfter(Version.V_7_11_0)) {
out.writeOptionalInt(datesAsEpochMillis);
}
if (out.getVersion().onOrAfter(Version.V_7_15_0)) {
out.writeOptionalInt(alignCheckpoints);
}
if (out.getVersion().onOrAfter(Version.V_7_16_1)) {
out.writeOptionalInt(usePit);
}
out.writeOptionalInt(datesAsEpochMillis);
out.writeOptionalInt(alignCheckpoints);
out.writeOptionalInt(usePit);

if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
out.writeOptionalInt(deduceMappings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -100,11 +99,7 @@ public SourceConfig(String[] index, QueryConfig queryConfig, Map<String, Object>
public SourceConfig(final StreamInput in) throws IOException {
index = in.readStringArray();
queryConfig = new QueryConfig(in);
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
runtimeMappings = in.readMap();
} else {
runtimeMappings = Collections.emptyMap();
}
runtimeMappings = in.readMap();
}

public String[] getIndex() {
Expand Down Expand Up @@ -142,9 +137,7 @@ public boolean requiresRemoteCluster() {
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(index);
queryConfig.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
out.writeGenericMap(runtimeMappings);
}
out.writeGenericMap(runtimeMappings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,16 @@

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.transform.TransformField;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Checkpoint stats data for 1 checkpoint
*
Expand All @@ -37,28 +32,6 @@ public class TransformCheckpointStats implements Writeable, ToXContentObject {
private final long timestampMillis;
private final long timeUpperBoundMillis;

static final ConstructingObjectParser<TransformCheckpointStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpoint_stats",
true,
args -> {
long checkpoint = args[0] == null ? 0L : (Long) args[0];
TransformIndexerPosition position = (TransformIndexerPosition) args[1];
TransformProgress checkpointProgress = (TransformProgress) args[2];
long timestamp = args[3] == null ? 0L : (Long) args[3];
long timeUpperBound = args[4] == null ? 0L : (Long) args[4];

return new TransformCheckpointStats(checkpoint, position, checkpointProgress, timestamp, timeUpperBound);
}
);

static {
LENIENT_PARSER.declareLong(optionalConstructorArg(), TransformField.CHECKPOINT);
LENIENT_PARSER.declareObject(optionalConstructorArg(), TransformIndexerPosition.PARSER, TransformField.POSITION);
LENIENT_PARSER.declareObject(optionalConstructorArg(), TransformProgress.PARSER, TransformField.CHECKPOINT_PROGRESS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), TransformField.TIMESTAMP_MILLIS);
LENIENT_PARSER.declareLong(optionalConstructorArg(), TransformField.TIME_UPPER_BOUND_MILLIS);
}

public TransformCheckpointStats(
final long checkpoint,
final TransformIndexerPosition position,
Expand All @@ -74,21 +47,15 @@ public TransformCheckpointStats(
}

public TransformCheckpointStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.checkpoint = in.readVLong();
if (in.readBoolean()) {
this.position = new TransformIndexerPosition(in);
} else {
this.position = null;
}
if (in.readBoolean()) {
this.checkpointProgress = new TransformProgress(in);
} else {
this.checkpointProgress = null;
}
this.checkpoint = in.readVLong();
if (in.readBoolean()) {
this.position = new TransformIndexerPosition(in);
} else {
this.checkpoint = 0;
this.position = null;
}
if (in.readBoolean()) {
this.checkpointProgress = new TransformProgress(in);
} else {
this.checkpointProgress = null;
}
this.timestampMillis = in.readLong();
Expand Down Expand Up @@ -145,20 +112,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeVLong(checkpoint);
if (position != null) {
out.writeBoolean(true);
position.writeTo(out);
} else {
out.writeBoolean(false);
}
if (checkpointProgress != null) {
out.writeBoolean(true);
checkpointProgress.writeTo(out);
} else {
out.writeBoolean(false);
}
out.writeVLong(checkpoint);
if (position != null) {
out.writeBoolean(true);
position.writeTo(out);
} else {
out.writeBoolean(false);
}
if (checkpointProgress != null) {
out.writeBoolean(true);
checkpointProgress.writeTo(out);
} else {
out.writeBoolean(false);
}
out.writeLong(timestampMillis);
out.writeLong(timeUpperBoundMillis);
Expand Down Expand Up @@ -187,8 +152,4 @@ public boolean equals(Object other) {
&& this.timestampMillis == that.timestampMillis
&& this.timeUpperBoundMillis == that.timeUpperBoundMillis;
}

public static TransformCheckpointStats fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,12 @@

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.time.TimeUtils;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -145,60 +139,21 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi
null
);

public static final ParseField LAST_CHECKPOINT = new ParseField("last");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");
public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time");
public static final String LAST_CHECKPOINT = "last";
public static final String NEXT_CHECKPOINT = "next";
public static final String OPERATIONS_BEHIND = "operations_behind";
public static final String CHANGES_LAST_DETECTED_AT = "changes_last_detected_at";
public static final String CHANGES_LAST_DETECTED_AT_HUMAN = CHANGES_LAST_DETECTED_AT + "_string";

public static final String LAST_SEARCH_TIME = "last_search_time";
public static final String LAST_SEARCH_TIME_HUMAN = LAST_SEARCH_TIME + "_string";

private final TransformCheckpointStats last;
private final TransformCheckpointStats next;
private final long operationsBehind;
private final Instant changesLastDetectedAt;
private final Instant lastSearchTime;

private static final ConstructingObjectParser<TransformCheckpointingInfo, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];
Instant changesLastDetectedAt = (Instant) a[3];
Instant lastSearchTime = (Instant) a[4];
return new TransformCheckpointingInfo(
a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0],
a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1],
behind,
changesLastDetectedAt,
lastSearchTime
);
}
);

static {
LENIENT_PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
TransformCheckpointStats.LENIENT_PARSER::apply,
LAST_CHECKPOINT
);
LENIENT_PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
TransformCheckpointStats.LENIENT_PARSER::apply,
NEXT_CHECKPOINT
);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
LENIENT_PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE
);
LENIENT_PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()),
LAST_SEARCH_TIME,
ObjectParser.ValueType.VALUE
);
}

/**
* Create checkpoint stats object with checkpoint information about the last and next checkpoint as well as the current state
* of source.
Expand Down Expand Up @@ -227,16 +182,8 @@ public TransformCheckpointingInfo(StreamInput in) throws IOException {
last = new TransformCheckpointStats(in);
next = new TransformCheckpointStats(in);
operationsBehind = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
changesLastDetectedAt = in.readOptionalInstant();
} else {
changesLastDetectedAt = null;
}
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
lastSearchTime = in.readOptionalInstant();
} else {
lastSearchTime = null;
}
changesLastDetectedAt = in.readOptionalInstant();
lastSearchTime = in.readOptionalInstant();
}

public TransformCheckpointStats getLast() {
Expand All @@ -262,26 +209,18 @@ public Instant getLastSearchTime() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(LAST_CHECKPOINT.getPreferredName(), last);
builder.field(LAST_CHECKPOINT, last);
if (next.getCheckpoint() > 0) {
builder.field(NEXT_CHECKPOINT.getPreferredName(), next);
builder.field(NEXT_CHECKPOINT, next);
}
if (operationsBehind > 0) {
builder.field(OPERATIONS_BEHIND.getPreferredName(), operationsBehind);
builder.field(OPERATIONS_BEHIND, operationsBehind);
}
if (changesLastDetectedAt != null) {
builder.timeField(
CHANGES_LAST_DETECTED_AT.getPreferredName(),
CHANGES_LAST_DETECTED_AT.getPreferredName() + "_string",
changesLastDetectedAt.toEpochMilli()
);
builder.timeField(CHANGES_LAST_DETECTED_AT, CHANGES_LAST_DETECTED_AT_HUMAN, changesLastDetectedAt.toEpochMilli());
}
if (lastSearchTime != null) {
builder.timeField(
LAST_SEARCH_TIME.getPreferredName(),
LAST_SEARCH_TIME.getPreferredName() + "_string",
lastSearchTime.toEpochMilli()
);
builder.timeField(LAST_SEARCH_TIME, LAST_SEARCH_TIME_HUMAN, lastSearchTime.toEpochMilli());
}
builder.endObject();
return builder;
Expand All @@ -292,16 +231,8 @@ public void writeTo(StreamOutput out) throws IOException {
last.writeTo(out);
next.writeTo(out);
out.writeLong(operationsBehind);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeOptionalInstant(changesLastDetectedAt);
}
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
out.writeOptionalInstant(lastSearchTime);
}
}

public static TransformCheckpointingInfo fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
out.writeOptionalInstant(changesLastDetectedAt);
out.writeOptionalInstant(lastSearchTime);
}

@Override
Expand Down

0 comments on commit bf12b75

Please sign in to comment.