Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public final class EnrichPolicy implements Writeable, ToXContentFragment {

private static final String ELASTICEARCH_VERSION_DEPRECATION_MESSAGE =
private static final String ELASTICSEARCH_VERSION_DEPRECATION_MESSAGE =
"the [elasticsearch_version] field of an enrich policy has no effect and will be removed in Elasticsearch 9.0";

private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(EnrichPolicy.class);
Expand Down Expand Up @@ -146,7 +146,7 @@ private EnrichPolicy(
deprecationLogger.warn(
DeprecationCategory.OTHER,
"enrich_policy_with_elasticsearch_version",
ELASTICEARCH_VERSION_DEPRECATION_MESSAGE
ELASTICSEARCH_VERSION_DEPRECATION_MESSAGE
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
// If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document
if (searchHits.size() < 1) {
if (searchHits.isEmpty()) {
handler.accept(ingestDocument, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
/**
* A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
* multiple enrich processors with different policies will use this cache.
*
* <p>
* The key of the cache is based on the search request and the enrich index that will be used.
* Search requests that enrich generates target the alias for an enrich policy, this class
* resolves the alias to the actual enrich index and uses that for the cache key. This way
* no stale entries will be returned if a policy execution happens and a new enrich index is created.
*
* <p>
* There is no cleanup mechanism of stale entries in case a new enrich index is created
* as part of a policy execution. This shouldn't be needed as cache entries for prior enrich
* indices will be eventually evicted, because these entries will not end up being used. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor {
) {
super(tag, description, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
parser = new GeometryParser(orientation.getAsBoolean(), true, true);
this.parser = new GeometryParser(orientation.getAsBoolean(), true, true);
}

@Override
Expand All @@ -50,8 +50,4 @@ public QueryBuilder getQueryBuilder(Object fieldValue) {
shapeQuery.relation(shapeRelation);
return shapeQuery;
}

public ShapeRelation getShapeRelation() {
return shapeRelation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void coordinateLookups() {
assert slots.isEmpty() == false;
remoteRequestsTotal.increment();
final MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
slots.forEach(slot -> multiSearchRequest.add(slot.searchRequest));
slots.forEach(slot -> multiSearchRequest.add(slot.request));
lookupFunction.accept(multiSearchRequest, (response, e) -> handleResponse(slots, response, e));
}
}
Expand All @@ -193,13 +193,13 @@ void handleResponse(List<Slot> slots, MultiSearchResponse response, Exception e)
Slot slot = slots.get(i);

if (responseItem.isFailure()) {
slot.actionListener.onFailure(responseItem.getFailure());
slot.listener.onFailure(responseItem.getFailure());
} else {
slot.actionListener.onResponse(responseItem.getResponse());
slot.listener.onResponse(responseItem.getResponse());
}
}
} else if (e != null) {
slots.forEach(slot -> slot.actionListener.onFailure(e));
slots.forEach(slot -> slot.listener.onFailure(e));
} else {
throw new AssertionError("no response and no error");
}
Expand All @@ -208,14 +208,10 @@ void handleResponse(List<Slot> slots, MultiSearchResponse response, Exception e)
coordinateLookups();
}

static class Slot {

final SearchRequest searchRequest;
final ActionListener<SearchResponse> actionListener;

Slot(SearchRequest searchRequest, ActionListener<SearchResponse> actionListener) {
this.searchRequest = Objects.requireNonNull(searchRequest);
this.actionListener = Objects.requireNonNull(actionListener);
record Slot(SearchRequest request, ActionListener<SearchResponse> listener) {
Slot {
Objects.requireNonNull(request);
Objects.requireNonNull(listener);
}
}

Expand Down