Skip to content

Commit

Permalink
[7.x][Transform] implement retention policy to delete data from a tra…
Browse files Browse the repository at this point in the history
…nsform (#67832) (#68814)

add a retention policy to transform to delete data that is considered outdated as part of a
transform checkpoint.

backport #67832
fixes #67916
  • Loading branch information
Hendrik Muhs committed Feb 11, 2021
1 parent f8b9876 commit 040b9a8
Show file tree
Hide file tree
Showing 72 changed files with 2,393 additions and 794 deletions.
Expand Up @@ -8,7 +8,9 @@

package org.elasticsearch.client.transform;

import org.elasticsearch.client.transform.transforms.RetentionPolicyConfig;
import org.elasticsearch.client.transform.transforms.SyncConfig;
import org.elasticsearch.client.transform.transforms.TimeRetentionPolicyConfig;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -22,9 +24,13 @@ public class TransformNamedXContentProvider implements NamedXContentProvider {
@Override
public List<NamedXContentRegistry.Entry> getNamedXContentParsers() {
return Arrays.asList(
new NamedXContentRegistry.Entry(SyncConfig.class,
new ParseField(TimeSyncConfig.NAME),
TimeSyncConfig::fromXContent));
new NamedXContentRegistry.Entry(SyncConfig.class, new ParseField(TimeSyncConfig.NAME), TimeSyncConfig::fromXContent),
new NamedXContentRegistry.Entry(
RetentionPolicyConfig.class,
new ParseField(TimeRetentionPolicyConfig.NAME),
TimeRetentionPolicyConfig::fromXContent
)
);
}

}
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.client.transform.transforms;

import org.elasticsearch.common.xcontent.ToXContentObject;

public interface RetentionPolicyConfig extends ToXContentObject {

/**
* Returns the name of the writeable object
*/
String getName();
}
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.client.transform.transforms;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class TimeRetentionPolicyConfig implements RetentionPolicyConfig {

public static final String NAME = "time";

private static final ParseField FIELD = new ParseField("field");
private static final ParseField MAX_AGE = new ParseField("max_age");

private final String field;
private final TimeValue maxAge;

private static final ConstructingObjectParser<TimeRetentionPolicyConfig, Void> PARSER = new ConstructingObjectParser<>(
"time_retention_policy_config",
true,
args -> new TimeRetentionPolicyConfig((String) args[0], args[1] != null ? (TimeValue) args[1] : TimeValue.ZERO)
);

static {
PARSER.declareString(constructorArg(), FIELD);
PARSER.declareField(
constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_AGE.getPreferredName()),
MAX_AGE,
ObjectParser.ValueType.STRING
);
}

public static TimeRetentionPolicyConfig fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

public TimeRetentionPolicyConfig(String field, TimeValue maxAge) {
this.field = field;
this.maxAge = maxAge;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FIELD.getPreferredName(), field);
builder.field(MAX_AGE.getPreferredName(), maxAge.getStringRep());
builder.endObject();
return builder;
}

public String getField() {
return field;
}

public TimeValue getMaxAge() {
return maxAge;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

final TimeRetentionPolicyConfig that = (TimeRetentionPolicyConfig) other;

return Objects.equals(this.field, that.field) && Objects.equals(this.maxAge, that.maxAge);
}

@Override
public int hashCode() {
return Objects.hash(field, maxAge);
}

@Override
public String getName() {
return NAME;
}
}
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -41,6 +40,7 @@ public class TransformConfig implements ToXContentObject {
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
public static final ParseField LATEST_TRANSFORM = new ParseField("latest");
Expand All @@ -54,6 +54,7 @@ public class TransformConfig implements ToXContentObject {
private final PivotConfig pivotConfig;
private final LatestConfig latestConfig;
private final String description;
private final RetentionPolicyConfig retentionPolicyConfig;
private final Version transformVersion;
private final Instant createTime;

Expand All @@ -70,8 +71,9 @@ public class TransformConfig implements ToXContentObject {
LatestConfig latestConfig = (LatestConfig) args[6];
String description = (String) args[7];
SettingsConfig settings = (SettingsConfig) args[8];
Instant createTime = (Instant) args[9];
String transformVersion = (String) args[10];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[9];
Instant createTime = (Instant) args[10];
String transformVersion = (String) args[11];
return new TransformConfig(
id,
source,
Expand All @@ -82,6 +84,7 @@ public class TransformConfig implements ToXContentObject {
latestConfig,
description,
settings,
retentionPolicyConfig,
createTime,
transformVersion
);
Expand All @@ -98,11 +101,16 @@ public class TransformConfig implements ToXContentObject {
FREQUENCY,
ObjectParser.ValueType.STRING
);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
PARSER.declareNamedObject(optionalConstructorArg(), (p, c, n) -> p.namedObject(SyncConfig.class, n, c), SYNC);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p), LATEST_TRANSFORM);
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), SETTINGS);
PARSER.declareNamedObject(
optionalConstructorArg(),
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
RETENTION_POLICY
);
PARSER.declareField(
optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()),
Expand All @@ -112,14 +120,6 @@ public class TransformConfig implements ToXContentObject {
PARSER.declareString(optionalConstructorArg(), VERSION);
}

private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
SyncConfig syncConfig = parser.namedObject(SyncConfig.class, parser.currentName(), true);
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
return syncConfig;
}

public static TransformConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
Expand All @@ -136,7 +136,7 @@ public static TransformConfig fromXContent(final XContentParser parser) {
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null);
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null, null);
}

/**
Expand All @@ -151,7 +151,7 @@ public static TransformConfig forPreview(final SourceConfig source, final PivotC
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final LatestConfig latestConfig) {
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null);
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null, null);
}

TransformConfig(
Expand All @@ -164,6 +164,7 @@ public static TransformConfig forPreview(final SourceConfig source, final Latest
final LatestConfig latestConfig,
final String description,
final SettingsConfig settings,
final RetentionPolicyConfig retentionPolicyConfig,
final Instant createTime,
final String version
) {
Expand All @@ -176,6 +177,7 @@ public static TransformConfig forPreview(final SourceConfig source, final Latest
this.latestConfig = latestConfig;
this.description = description;
this.settings = settings;
this.retentionPolicyConfig = retentionPolicyConfig;
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.transformVersion = version == null ? null : Version.fromString(version);
}
Expand Down Expand Up @@ -226,6 +228,11 @@ public SettingsConfig getSettings() {
return settings;
}

@Nullable
public RetentionPolicyConfig getRetentionPolicyConfig() {
return retentionPolicyConfig;
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -258,6 +265,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
if (settings != null) {
builder.field(SETTINGS.getPreferredName(), settings);
}
if (retentionPolicyConfig != null) {
builder.startObject(RETENTION_POLICY.getPreferredName());
builder.field(retentionPolicyConfig.getName(), retentionPolicyConfig);
builder.endObject();
}
if (createTime != null) {
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
}
Expand Down Expand Up @@ -290,13 +302,26 @@ public boolean equals(Object other) {
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.pivotConfig, that.pivotConfig)
&& Objects.equals(this.latestConfig, that.latestConfig);
&& Objects.equals(this.latestConfig, that.latestConfig)
&& Objects.equals(this.retentionPolicyConfig, that.retentionPolicyConfig);
}

@Override
public int hashCode() {
return Objects.hash(
id, source, dest, frequency, syncConfig, settings, createTime, transformVersion, pivotConfig, latestConfig, description);
id,
source,
dest,
frequency,
syncConfig,
settings,
createTime,
transformVersion,
pivotConfig,
latestConfig,
description,
retentionPolicyConfig
);
}

@Override
Expand All @@ -319,6 +344,7 @@ public static class Builder {
private LatestConfig latestConfig;
private SettingsConfig settings;
private String description;
private RetentionPolicyConfig retentionPolicyConfig;

public Builder setId(String id) {
this.id = id;
Expand Down Expand Up @@ -365,9 +391,26 @@ public Builder setSettings(SettingsConfig settings) {
return this;
}

public Builder setRetentionPolicyConfig(RetentionPolicyConfig retentionPolicyConfig) {
this.retentionPolicyConfig = retentionPolicyConfig;
return this;
}

public TransformConfig build() {
return new TransformConfig(
id, source, dest, frequency, syncConfig, pivotConfig, latestConfig, description, settings, null, null);
id,
source,
dest,
frequency,
syncConfig,
pivotConfig,
latestConfig,
description,
settings,
retentionPolicyConfig,
null,
null
);
}
}
}

0 comments on commit 040b9a8

Please sign in to comment.