Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testAppendingToTheSameField() {
execProcessor(processor, ingestDocument, (result, e) -> {});
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
ingestDocument.removeField("_ingest._value");
assertThat(ingestDocument, equalTo(originalIngestDocument));
assertIngestDocument(ingestDocument, originalIngestDocument);
}

public void testRemovingFromTheSameField() {
Expand All @@ -355,7 +355,7 @@ public void testRemovingFromTheSameField() {
execProcessor(processor, ingestDocument, (result, e) -> {});
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
ingestDocument.removeField("_ingest._value");
assertThat(ingestDocument, equalTo(originalIngestDocument));
assertIngestDocument(ingestDocument, originalIngestDocument);
}

public void testMapIteration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testMatchWithoutCaptures() throws Exception {
MatcherWatchdog.noop()
);
processor.execute(doc);
assertThat(doc, equalTo(originalDoc));
assertIngestDocument(doc, originalDoc);
}

public void testNullField() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,17 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
}

public SimulateDocumentBaseResult(IngestDocument ingestDocument) {
Exception failure = null;
WriteableIngestDocument wid = null;
if (ingestDocument != null) {
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
} else {
this.ingestDocument = null;
try {
wid = new WriteableIngestDocument(ingestDocument);
} catch (Exception ex) {
failure = ex;
}
}
this.failure = null;
this.ingestDocument = wid;
this.failure = failure;
}

public SimulateDocumentBaseResult(Exception failure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,19 @@ public SimulateProcessorResult(
) {
this.processorTag = processorTag;
this.description = description;
this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument);
WriteableIngestDocument wid = null;
if (ingestDocument != null) {
try {
wid = new WriteableIngestDocument(ingestDocument);
} catch (Exception ex) {
// if there was a failure already, then track it as a suppressed exception
if (failure != null) {
ex.addSuppressed(failure);
}
failure = ex;
}
}
this.ingestDocument = wid;
this.failure = failure;
this.conditionalWithResult = conditionalWithResult;
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
Expand Down Expand Up @@ -57,7 +56,7 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
sourceAndMetadata.put(Metadata.VERSION_TYPE.getFieldName(), a[4]);
}
Map<String, Object> ingestMetadata = (Map<String, Object>) a[6];
return new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, ingestMetadata));
return new WriteableIngestDocument(sourceAndMetadata, ingestMetadata);
}
);
static {
Expand All @@ -83,17 +82,30 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
PARSER.declareObject(constructorArg(), INGEST_DOC_PARSER, new ParseField(DOC_FIELD));
}

/**
* Builds a writeable ingest document that wraps a copy of the passed-in, non-null ingest document.
*
* @throws IllegalArgumentException if the passed-in ingest document references itself
*/
WriteableIngestDocument(IngestDocument ingestDocument) {
assert ingestDocument != null;
this.ingestDocument = ingestDocument;
this.ingestDocument = new IngestDocument(ingestDocument); // internal defensive copy
}

WriteableIngestDocument(StreamInput in) throws IOException {
Map<String, Object> sourceAndMetadata = in.readMap();
Map<String, Object> ingestMetadata = in.readMap();
/**
* Builds a writeable ingest document by constructing the wrapped ingest document from the passed-in maps.
* <p>
* This is intended for cases like deserialization, where we know the passed-in maps aren't self-referencing,
* and where a defensive copy is unnecessary.
*/
private WriteableIngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object> ingestMetadata) {
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
}

WriteableIngestDocument(StreamInput in) throws IOException {
this(in.readMap(), in.readMap());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeGenericMap(ingestDocument.getSourceAndMetadata());
Expand Down Expand Up @@ -127,23 +139,6 @@ public static WriteableIngestDocument fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WriteableIngestDocument that = (WriteableIngestDocument) o;
return Objects.equals(ingestDocument, that.ingestDocument);
}

@Override
public int hashCode() {
return Objects.hash(ingestDocument);
}

@Override
public String toString() {
return ingestDocument.toString();
Expand Down
33 changes: 14 additions & 19 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.ingest;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -94,14 +95,26 @@ public IngestDocument(String index, String id, long version, String routing, Ver

/**
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided.
*
* @throws IllegalArgumentException if the passed-in ingest document references itself
*/
public IngestDocument(IngestDocument other) {
this(
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
new IngestCtxMap(deepCopyMap(ensureNoSelfReferences(other.ctxMap.getSource())), other.ctxMap.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
}

/**
* Internal helper utility method to get around the issue that a {@code this(...) } constructor call must be the first statement
* in a constructor. This is only for use in the {@link IngestDocument#IngestDocument(IngestDocument)} copy constructor, it's not a
* general purpose method.
*/
private static Map<String, Object> ensureNoSelfReferences(Map<String, Object> source) {
CollectionUtils.ensureNoSelfReferences(source, null);
return source;
}

/**
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
*/
Expand Down Expand Up @@ -898,24 +911,6 @@ public void doNoSelfReferencesCheck(boolean doNoSelfReferencesCheck) {
this.doNoSelfReferencesCheck = doNoSelfReferencesCheck;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

IngestDocument other = (IngestDocument) obj;
return Objects.equals(ctxMap, other.ctxMap) && Objects.equals(ingestMetadata, other.ingestMetadata);
}

@Override
public int hashCode() {
return Objects.hash(ctxMap, ingestMetadata);
}

@Override
public String toString() {
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
pipelineProcessor.getType(),
pipelineProcessor.getTag(),
pipelineProcessor.getDescription(),
new IngestDocument(ingestDocument),
ingestDocument,
e,
conditionalWithResult
)
Expand Down Expand Up @@ -148,7 +148,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
actualProcessor.getType(),
actualProcessor.getTag(),
actualProcessor.getDescription(),
new IngestDocument(ingestDocument),
ingestDocument,
e,
conditionalWithResult
)
Expand All @@ -172,7 +172,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
actualProcessor.getType(),
actualProcessor.getTag(),
actualProcessor.getDescription(),
new IngestDocument(ingestDocument),
ingestDocument,
conditionalWithResult
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected Predicate<String> getRandomFieldsExcludeFilter() {
}

public static void assertEqualDocs(SimulateDocumentBaseResult response, SimulateDocumentBaseResult parsedResponse) {
assertEquals(response.getIngestDocument(), parsedResponse.getIngestDocument());
assertIngestDocument(response.getIngestDocument(), parsedResponse.getIngestDocument());
if (response.getFailure() != null) {
assertNotNull(parsedResponse.getFailure());
assertThat(parsedResponse.getFailure().getMessage(), containsString(response.getFailure().getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testExecuteItem() throws Exception {
assertThat(processor.getInvokedCounter(), equalTo(2));
assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
assertThat(simulateDocumentBaseResult.getIngestDocument(), equalTo(ingestDocument));
assertIngestDocument(simulateDocumentBaseResult.getIngestDocument(), ingestDocument);
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected Predicate<String> getRandomFieldsExcludeFilter() {

static void assertEqualProcessorResults(SimulateProcessorResult response, SimulateProcessorResult parsedResponse) {
assertEquals(response.getProcessorTag(), parsedResponse.getProcessorTag());
assertEquals(response.getIngestDocument(), parsedResponse.getIngestDocument());
assertIngestDocument(response.getIngestDocument(), parsedResponse.getIngestDocument());
if (response.getFailure() != null) {
assertNotNull(parsedResponse.getFailure());
assertThat(parsedResponse.getFailure().getMessage(), containsString(response.getFailure().getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand All @@ -41,74 +40,18 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;

public class WriteableIngestDocumentTests extends AbstractXContentTestCase<WriteableIngestDocument> {

public void testEqualsAndHashcode() throws Exception {
Map<String, Object> sourceAndMetadata = RandomDocumentPicks.randomSource(random());
int numFields = randomIntBetween(1, IngestDocument.Metadata.values().length);
sourceAndMetadata.put(VERSION.getFieldName(), TestIngestDocument.randomVersion());
for (int i = 0; i < numFields; i++) {
Tuple<String, Object> metadata = TestIngestDocument.randomMetadata();
sourceAndMetadata.put(metadata.v1(), metadata.v2());
}
Map<String, Object> ingestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
ingestMetadata.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
WriteableIngestDocument ingestDocument = new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, ingestMetadata));

boolean changed = false;
Map<String, Object> otherSourceAndMetadata;
if (randomBoolean()) {
otherSourceAndMetadata = RandomDocumentPicks.randomSource(random());
otherSourceAndMetadata.put(VERSION.getFieldName(), TestIngestDocument.randomVersion());
changed = true;
} else {
otherSourceAndMetadata = new HashMap<>(sourceAndMetadata);
}
if (randomBoolean()) {
numFields = randomIntBetween(1, IngestDocument.Metadata.values().length);
for (int i = 0; i < numFields; i++) {
Tuple<String, Object> metadata = randomValueOtherThanMany(
t -> t.v2().equals(sourceAndMetadata.get(t.v1())),
TestIngestDocument::randomMetadata
);
otherSourceAndMetadata.put(metadata.v1(), metadata.v2());
}
changed = true;
}

Map<String, Object> otherIngestMetadata;
if (randomBoolean()) {
otherIngestMetadata = new HashMap<>();
numFields = randomIntBetween(1, 5);
for (int i = 0; i < numFields; i++) {
otherIngestMetadata.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10));
}
changed = true;
} else {
otherIngestMetadata = Collections.unmodifiableMap(ingestMetadata);
}
@Override
protected boolean assertToXContentEquivalence() {
return false;
}

WriteableIngestDocument otherIngestDocument = new WriteableIngestDocument(
new IngestDocument(otherSourceAndMetadata, otherIngestMetadata)
);
if (changed) {
assertThat(ingestDocument, not(equalTo(otherIngestDocument)));
assertThat(otherIngestDocument, not(equalTo(ingestDocument)));
} else {
assertThat(ingestDocument, equalTo(otherIngestDocument));
assertThat(otherIngestDocument, equalTo(ingestDocument));
assertThat(ingestDocument.hashCode(), equalTo(otherIngestDocument.hashCode()));
WriteableIngestDocument thirdIngestDocument = new WriteableIngestDocument(
new IngestDocument(Collections.unmodifiableMap(sourceAndMetadata), Collections.unmodifiableMap(ingestMetadata))
);
assertThat(thirdIngestDocument, equalTo(ingestDocument));
assertThat(ingestDocument, equalTo(thirdIngestDocument));
assertThat(ingestDocument.hashCode(), equalTo(thirdIngestDocument.hashCode()));
}
@Override
protected void assertEqualInstances(WriteableIngestDocument expectedInstance, WriteableIngestDocument newInstance) {
assertIngestDocument(expectedInstance.getIngestDocument(), newInstance.getIngestDocument());
}

public void testSerialization() throws IOException {
Expand Down Expand Up @@ -164,8 +107,7 @@ public void testToXContent() throws IOException {
sourceAndMetadata.putAll(toXContentSource);
ingestDocument.getMetadata().keySet().forEach(k -> sourceAndMetadata.put(k, ingestDocument.getMetadata().get(k)));
IngestDocument serializedIngestDocument = new IngestDocument(sourceAndMetadata, toXContentIngestMetadata);
// TODO(stu): is this test correct? Comparing against ingestDocument fails due to incorrectly failed byte array comparisons
assertThat(serializedIngestDocument, equalTo(serializedIngestDocument));
assertIngestDocument(writeableIngestDocument.getIngestDocument(), serializedIngestDocument);
}

public void testXContentHashSetSerialization() throws Exception {
Expand All @@ -183,6 +125,14 @@ public void testXContentHashSetSerialization() throws Exception {
}
}

public void testCopiesTheIngestDocument() {
IngestDocument document = createRandomIngestDoc();
WriteableIngestDocument wid = new WriteableIngestDocument(document);

assertIngestDocument(wid.getIngestDocument(), document);
assertThat(wid.getIngestDocument(), not(sameInstance(document)));
}

static IngestDocument createRandomIngestDoc() {
XContentType xContentType = randomFrom(XContentType.values());
BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType);
Expand Down
Loading