Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change exact match processor to match processor. #46041

Merged
merged 6 commits into from Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,7 +38,7 @@ public class EnrichIT extends ESRestHighLevelClientTestCase {
public void testCRUD() throws Exception {
final EnrichClient enrichClient = highLevelClient().enrich();
PutPolicyRequest putPolicyRequest =
new PutPolicyRequest("my-policy", "exact_match", List.of("my-index"), "enrich_key", List.of("enrich_value"));
new PutPolicyRequest("my-policy", "match", List.of("my-index"), "enrich_key", List.of("enrich_value"));
AcknowledgedResponse putPolicyResponse = execute(putPolicyRequest, enrichClient::putPolicy, enrichClient::putPolicyAsync);
assertThat(putPolicyResponse.isAcknowledged(), is(true));

Expand All @@ -50,9 +50,9 @@ public void testCRUD() throws Exception {
List<?> responsePolicies = (List<?>) responseBody.get("policies");
assertThat(responsePolicies.size(), equalTo(1));
Map<?, ?> responsePolicy = (Map<?, ?>) responsePolicies.get(0);
assertThat(XContentMapValues.extractValue("exact_match.indices", responsePolicy), equalTo(putPolicyRequest.getIndices()));
assertThat(XContentMapValues.extractValue("exact_match.match_field", responsePolicy), equalTo(putPolicyRequest.getMatchField()));
assertThat(XContentMapValues.extractValue("exact_match.enrich_fields", responsePolicy),
assertThat(XContentMapValues.extractValue("match.indices", responsePolicy), equalTo(putPolicyRequest.getIndices()));
assertThat(XContentMapValues.extractValue("match.match_field", responsePolicy), equalTo(putPolicyRequest.getMatchField()));
assertThat(XContentMapValues.extractValue("match.enrich_fields", responsePolicy),
equalTo(putPolicyRequest.getEnrichFields()));

DeletePolicyRequest deletePolicyRequest = new DeletePolicyRequest("my-policy");
Expand Down
Expand Up @@ -49,7 +49,7 @@ public void testPutPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();
// tag::enrich-put-policy-request
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "exact_match", List.of("users"),
"users-policy", "match", List.of("users"),
"email", List.of("address", "zip", "city", "state"));
// end::enrich-put-policy-request

Expand Down Expand Up @@ -95,7 +95,7 @@ public void testDeletePolicy() throws Exception {
{
// Add a policy, so that it can be deleted:
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "exact_match", List.of("users"),
"users-policy", "match", List.of("users"),
"email", List.of("address", "zip", "city", "state"));
client.enrich().putPolicy(putPolicyRequest, RequestOptions.DEFAULT);
}
Expand Down
30 changes: 16 additions & 14 deletions docs/reference/ingest/ingest-node.asciidoc
Expand Up @@ -811,7 +811,7 @@ The policy type of the policy determines what kind of enrichment an `enrich` pro

The following policy types are currently supported:

* `exact_match` - Can lookup exactly one document and use its content to enrich the document being ingested.
* `match` - Can lookup documents by running a term query and use the retrieved content to enrich the document being ingested.

[[enrich-processor-getting-started]]
=== Getting started
Expand Down Expand Up @@ -843,7 +843,7 @@ Create an enrich policy:
--------------------------------------------------
PUT /_enrich/policy/users-policy
{
"exact_match": {
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
Expand Down Expand Up @@ -923,15 +923,17 @@ Which returns:
"_seq_no": 55,
"_primary_term": 1,
"_source": {
"user": {
"email": "mardy.brown@email.me",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA"
},
"user": [
{
"email": "mardy.brown@email.me",
"first_name": "Mardy",
"last_name": "Brown",
"zip": 70116,
"address": "6649 N Blue Gum St",
"city": "New Orleans",
"state": "LA"
}
],
"email": "mardy.brown@email.me"
}
}
Expand Down Expand Up @@ -976,7 +978,7 @@ Request:
--------------------------------------------------
PUT /_enrich/policy/my-policy
{
"exact_match": {
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "address", "city", "zip", "state"]
Expand Down Expand Up @@ -1016,7 +1018,7 @@ Response:
{
"policies": [
{
"exact_match": {
"match": {
"name" : "my-policy",
"indices" : ["users"],
"match_field" : "email",
Expand Down Expand Up @@ -1053,7 +1055,7 @@ Response:
{
"policies": [
{
"exact_match": {
"match": {
"name" : "my-policy",
"indices" : ["users"],
"match_field" : "email",
Expand Down
1 change: 1 addition & 0 deletions docs/reference/ingest/processors/enrich.asciidoc
Expand Up @@ -17,5 +17,6 @@ check out the <<enrich-processor-getting-started,getting started>> to get famili
| `target_field` | yes | - | The field that will be used for the enrichment data.
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `max_matches` | no | 1 | The maximum number of matched documents to include under the configured target field. In order to avoid documents getting too large, the maximum allowed value is 128.
include::common-options.asciidoc[]
|======
Expand Up @@ -33,8 +33,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {

public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";

public static final String EXACT_MATCH_TYPE = "exact_match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};
public static final String MATCH_TYPE = "match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};

private static final ParseField QUERY = new ParseField("query");
private static final ParseField INDICES = new ParseField("indices");
Expand Down
Expand Up @@ -28,13 +28,13 @@
public abstract class CommonEnrichRestTestCase extends ESRestTestCase {

@After
private void deletePolicies() throws Exception {
public void deletePolicies() throws Exception {
Map<String, Object> responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy")));
@SuppressWarnings("unchecked")
List<Map<?,?>> policies = (List<Map<?,?>>) responseMap.get("policies");

for (Map<?, ?> entry: policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("exact_match.name", entry)));
client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("match.name", entry)));
}
}

Expand Down Expand Up @@ -71,7 +71,8 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> _source = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
List<?> entries = (List<?>) ((Map<?, ?>) response.get("_source")).get("entry");
Map<?, ?> _source = (Map<?, ?>) entries.get(0);
assertThat(_source.size(), equalTo(4));
assertThat(_source.get("host"), equalTo("elastic.co"));
assertThat(_source.get("tld"), equalTo("co"));
Expand Down Expand Up @@ -132,7 +133,7 @@ public void testDeleteExistingPipeline() throws Exception {
}

public static String generatePolicySource(String index) throws IOException {
XContentBuilder source = jsonBuilder().startObject().startObject("exact_match");
XContentBuilder source = jsonBuilder().startObject().startObject("match");
{
source.field("indices", index);
if (randomBoolean()) {
Expand Down
Expand Up @@ -5,7 +5,7 @@
enrich.put_policy:
name: policy-crud
body:
exact_match:
match:
indices: ["bar*"]
match_field: baz
enrich_fields: ["a", "b"]
Expand All @@ -20,18 +20,18 @@
enrich.get_policy:
name: policy-crud
- length: { policies: 1 }
- match: { policies.0.exact_match.name: policy-crud }
- match: { policies.0.exact_match.indices: ["bar*"] }
- match: { policies.0.exact_match.match_field: baz }
- match: { policies.0.exact_match.enrich_fields: ["a", "b"] }
- match: { policies.0.match.name: policy-crud }
- match: { policies.0.match.indices: ["bar*"] }
- match: { policies.0.match.match_field: baz }
- match: { policies.0.match.enrich_fields: ["a", "b"] }

- do:
enrich.get_policy: {}
- length: { policies: 1 }
- match: { policies.0.exact_match.name: policy-crud }
- match: { policies.0.exact_match.indices: ["bar*"] }
- match: { policies.0.exact_match.match_field: baz }
- match: { policies.0.exact_match.enrich_fields: ["a", "b"] }
- match: { policies.0.match.name: policy-crud }
- match: { policies.0.match.indices: ["bar*"] }
- match: { policies.0.match.match_field: baz }
- match: { policies.0.match.enrich_fields: ["a", "b"] }

- do:
enrich.delete_policy:
Expand Down
Expand Up @@ -192,9 +192,9 @@ private void validateField(Map<?, ?> properties, String fieldName, boolean field
}

private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.EXACT_MATCH_TYPE, which is a keyword type
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
String keyType;
if (EnrichPolicy.EXACT_MATCH_TYPE.equals(policy.getType())) {
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword";
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
Expand Down
Expand Up @@ -49,12 +49,16 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin

boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");;
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
int maxMatches = ConfigurationUtils.readIntProperty(TYPE, tag, config, "max_matches", 1);
if (maxMatches < 1 || maxMatches > 128) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 1024");
}

switch (policyType) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled);
case EnrichPolicy.MATCH_TYPE:
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled, maxMatches);
default:
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
}
Expand Down
Expand Up @@ -18,54 +18,60 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

public final class ExactMatchProcessor extends AbstractEnrichProcessor {

static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
public final class MatchProcessor extends AbstractEnrichProcessor {

private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
private final String targetField;
private final String matchField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;

ExactMatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled) {
private final int maxMatches;

MatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
int maxMatches) {
this(
tag,
createSearchRunner(client),
policyName,
field,
targetField,
matchField, ignoreMissing,
overrideEnabled
matchField,
ignoreMissing,
overrideEnabled,
maxMatches
);
}

ExactMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled) {
MatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
String matchField,
boolean ignoreMissing,
boolean overrideEnabled,
int maxMatches) {
super(tag, policyName);
this.searchRunner = searchRunner;
this.field = field;
this.targetField = targetField;
this.matchField = matchField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.maxMatches = maxMatches;
}

@Override
Expand All @@ -82,7 +88,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(1);
searchBuilder.size(maxMatches);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
Expand All @@ -105,16 +111,15 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
if (searchHits.length < 1) {
handler.accept(ingestDocument, null);
return;
} else if (searchHits.length > 1) {
handler.accept(null, new IllegalStateException("more than one doc id matching for [" + matchField + "]"));
return;
}

// If a document is returned, add its fields to the document
Map<String, Object> enrichDocument = searchHits[0].getSourceAsMap();
assert enrichDocument != null : "enrich document for id [" + field + "] was empty despite non-zero search hits length";
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
ingestDocument.setFieldValue(targetField, enrichDocument);
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});
Expand Down Expand Up @@ -153,6 +158,10 @@ boolean isOverrideEnabled() {
return overrideEnabled;
}

int getMaxMatches() {
return maxMatches;
}

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
Expand Down