Skip to content

Commit

Permalink
[7.x] Optimistic concurrency control for updating ingest pipelines (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Oct 18, 2021
1 parent 171ae21 commit 8ebc800
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
---
"Test pipeline versioned updates":
- skip:
version: " - 7.15.99"
reason: "added versioned updates in 7.16.0"

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

# conditional update fails because of missing version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"version": 1,
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 1 }

# required version does not match specified version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
# may not update to same version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
# cannot conditionally update non-existent pipeline
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline2"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
# conditionally update to specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 99,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 99 }

# conditionally update without specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 100 }

- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
]
},
"params":{
"if_version":{
"type":"int",
"description":"Required version for optimistic concurrency control for pipeline updates"
},
"master_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to master node"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -22,27 +23,39 @@

public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {

private String id;
private BytesReference source;
private XContentType xContentType;
private final String id;
private final BytesReference source;
private final XContentType xContentType;
private final Integer version;

/**
* Create a new pipeline request with the id and source along with the content type of the source
*/
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, Integer version) {
this.id = Objects.requireNonNull(id);
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.version = version;
}

public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
this(id, source, xContentType, null);
}

public PutPipelineRequest(StreamInput in) throws IOException {
super(in);
id = in.readString();
source = in.readBytesReference();
xContentType = in.readEnum(XContentType.class);
if (in.getVersion().onOrAfter(Version.V_7_16_0)) {
version = in.readOptionalInt();
} else {
version = null;
}
}

PutPipelineRequest() {
this(null, null, null, null);
}

@Override
Expand All @@ -62,12 +75,19 @@ public XContentType getXContentType() {
return xContentType;
}

public Integer getVersion() {
return version;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBytesReference(source);
out.writeEnum(xContentType);
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
out.writeOptionalInt(version);
}
}

@Override
Expand Down
59 changes: 57 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -55,7 +56,9 @@
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -394,7 +397,9 @@ public void putPipeline(

Map<String, Object> pipelineConfig = null;
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
if (request.getVersion() == null &&
currentIngestMetadata != null &&
currentIngestMetadata.getPipelines().containsKey(request.getId())) {
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
PipelineConfiguration currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
Expand Down Expand Up @@ -494,16 +499,66 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound
return processorMetrics;
}

// visible for testing
public static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);

BytesReference pipelineSource = request.getSource();
if (request.getVersion() != null) {
PipelineConfiguration currentPipeline = currentIngestMetadata != null
? currentIngestMetadata.getPipelines().get(request.getId())
: null;
if (currentPipeline == null) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but no pipeline was found",
request.getVersion(),
request.getId()
));
}

final Integer currentVersion = currentPipeline.getVersion();
if (Objects.equals(request.getVersion(), currentVersion) == false) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but current version is [%s]",
request.getVersion(),
request.getId(),
currentVersion
));
}

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"cannot update pipeline [%s] with the same version [%s]",
request.getId(),
request.getVersion()
));
}

// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
if (specifiedVersion == null) {
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
try {
XContentBuilder builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
pipelineSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}

Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) {
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
} else {
pipelines = new HashMap<>();
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metadata(Metadata.builder(currentState.getMetadata())
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ BytesReference getConfig() {
return config;
}

public Integer getVersion() {
Map<String, Object> configMap = getConfigAsMap();
if (configMap.containsKey("version")) {
Object o = configMap.get("version");
if (o == null) {
return null;
} else if (o instanceof Number) {
return ((Number) o).intValue();
} else {
throw new IllegalStateException("unexpected version type [" + o.getClass().getName() + "]");
}
} else {
return null;
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down

0 comments on commit 8ebc800

Please sign in to comment.