Skip to content

Commit

Permalink
Speed up merging field-caps response (#83704)
Browse files Browse the repository at this point in the history
- Sort the index responses before merging to avoid sorting them for each 
  field name

- Track the number of searchable indices to avoid looping to find the 
  number of non-searchable indices for each field name
  • Loading branch information
dnhatn committed Feb 9, 2022
1 parent 5366db9 commit 21533cc
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 60 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83704.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83704
summary: Speed up merging field-caps response
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -476,22 +477,19 @@ static class Builder {
private final String name;
private final String type;
private boolean isMetadataField;
private boolean isSearchable;
private boolean isAggregatable;
private boolean isDimension;
private int searchableIndices = 0;
private int aggregatableIndices = 0;
private int dimensionIndices = 0;
private TimeSeriesParams.MetricType metricType;
private boolean metricTypeIsSet;
private List<IndexCaps> indiceList;
private Map<String, Set<String>> meta;
private boolean hasConflictMetricType;
private final List<IndexCaps> indiceList;
private final Map<String, Set<String>> meta;

Builder(String name, String type) {
this.name = name;
this.type = type;
this.isSearchable = true;
this.isAggregatable = true;
this.isDimension = true;
this.metricType = null;
this.metricTypeIsSet = false;
this.hasConflictMetricType = false;
this.indiceList = new ArrayList<>();
this.meta = new HashMap<>();
}
Expand All @@ -508,81 +506,101 @@ void add(
TimeSeriesParams.MetricType metricType,
Map<String, String> meta
) {
IndexCaps indexCaps = new IndexCaps(index, search, agg, isDimension, metricType);
indiceList.add(indexCaps);
this.isSearchable &= search;
this.isAggregatable &= agg;
assert indiceList.isEmpty() || indiceList.get(indiceList.size() - 1).name.compareTo(index) < 0
: "indices aren't sorted; previous [" + indiceList.get(indiceList.size() - 1).name + "], current [" + index + "]";
if (search) {
searchableIndices++;
}
if (agg) {
aggregatableIndices++;
}
if (isDimension) {
dimensionIndices++;
}
this.isMetadataField |= isMetadataField;
this.isDimension &= isDimension;
// If we have discrepancy in metric types or in some indices this field is not marked as a metric field - we will
// treat is a non-metric field and report this discrepancy in metricConflictsIndices
if (this.metricTypeIsSet) {
if (this.metricType != metricType) {
this.metricType = null;
}
} else {
this.metricTypeIsSet = true;
if (indiceList.isEmpty()) {
this.metricType = metricType;
} else if (this.metricType != metricType) {
hasConflictMetricType = true;
this.metricType = null;
}
IndexCaps indexCaps = new IndexCaps(index, search, agg, isDimension, metricType);
indiceList.add(indexCaps);
for (Map.Entry<String, String> entry : meta.entrySet()) {
this.meta.computeIfAbsent(entry.getKey(), key -> new HashSet<>()).add(entry.getValue());
}
}

List<String> getIndices() {
return indiceList.stream().map(c -> c.name).collect(Collectors.toList());
void getIndices(Collection<String> indices) {
indiceList.forEach(cap -> indices.add(cap.name));
}

FieldCapabilities build(boolean withIndices) {
final String[] indices;
Collections.sort(indiceList, Comparator.comparing(o -> o.name));
if (withIndices) {
indices = indiceList.stream().map(caps -> caps.name).toArray(String[]::new);
} else {
indices = null;
}

// Iff this field is searchable in some indices AND non-searchable in others
// we record the list of non-searchable indices
final boolean isSearchable = searchableIndices == indiceList.size();
final String[] nonSearchableIndices;
if (isSearchable == false && indiceList.stream().anyMatch((caps) -> caps.isSearchable)) {
// Iff this field is searchable in some indices AND non-searchable in others
// we record the list of non-searchable indices
nonSearchableIndices = indiceList.stream()
.filter((caps) -> caps.isSearchable == false)
.map(caps -> caps.name)
.toArray(String[]::new);
} else {
if (isSearchable || searchableIndices == 0) {
nonSearchableIndices = null;
} else {
nonSearchableIndices = new String[indiceList.size() - searchableIndices];
int index = 0;
for (IndexCaps indexCaps : indiceList) {
if (indexCaps.isSearchable == false) {
nonSearchableIndices[index++] = indexCaps.name;
}
}
}

// Iff this field is aggregatable in some indices AND non-aggregatable in others
// we keep the list of non-aggregatable indices
final boolean isAggregatable = aggregatableIndices == indiceList.size();
final String[] nonAggregatableIndices;
if (isAggregatable == false && indiceList.stream().anyMatch((caps) -> caps.isAggregatable)) {
// Iff this field is aggregatable in some indices AND non-searchable in others
// we keep the list of non-aggregatable indices
nonAggregatableIndices = indiceList.stream()
.filter((caps) -> caps.isAggregatable == false)
.map(caps -> caps.name)
.toArray(String[]::new);
} else {
if (isAggregatable || aggregatableIndices == 0) {
nonAggregatableIndices = null;
} else {
nonAggregatableIndices = new String[indiceList.size() - aggregatableIndices];
int index = 0;
for (IndexCaps indexCaps : indiceList) {
if (indexCaps.isAggregatable == false) {
nonAggregatableIndices[index++] = indexCaps.name;
}
}
}

// Collect all indices that have dimension == false if this field is marked as a dimension in at least one index
final boolean isDimension = dimensionIndices == indiceList.size();
final String[] nonDimensionIndices;
if (isDimension == false && indiceList.stream().anyMatch((caps) -> caps.isDimension)) {
// Collect all indices that have dimension == false if this field is marked as a dimension in at least one index
nonDimensionIndices = indiceList.stream()
.filter((caps) -> caps.isDimension == false)
.map(caps -> caps.name)
.toArray(String[]::new);
} else {
if (isDimension || dimensionIndices == 0) {
nonDimensionIndices = null;
} else {
nonDimensionIndices = new String[indiceList.size() - dimensionIndices];
int index = 0;
for (IndexCaps indexCaps : indiceList) {
if (indexCaps.isDimension == false) {
nonDimensionIndices[index++] = indexCaps.name;
}
}
}

final String[] metricConflictsIndices;
if (indiceList.stream().anyMatch((caps) -> caps.metricType != metricType)) {
if (hasConflictMetricType) {
// Collect all indices that have this field. If it is marked differently in different indices, we cannot really
// make a decisions which index is "right" and which index is "wrong" so collecting all indices where this field
// is present is probably the only sensible thing to do here
metricConflictsIndices = indiceList.stream().map(caps -> caps.name).toArray(String[]::new);
metricConflictsIndices = Objects.requireNonNullElseGet(
indices,
() -> indiceList.stream().map(caps -> caps.name).toArray(String[]::new)
);
} else {
metricConflictsIndices = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -219,13 +219,17 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(
}

private FieldCapabilitiesResponse merge(
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap,
boolean includeUnmapped,
List<FieldCapabilitiesFailure> failures
) {
String[] indices = indexResponses.keySet().stream().sorted().toArray(String[]::new);
final List<FieldCapabilitiesIndexResponse> indexResponses = indexResponsesMap.values()
.stream()
.sorted(Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName))
.toList();
final String[] indices = indexResponses.stream().map(FieldCapabilitiesIndexResponse::getIndexName).toArray(String[]::new);
final Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<>();
for (FieldCapabilitiesIndexResponse response : indexResponses.values()) {
for (FieldCapabilitiesIndexResponse response : indexResponses) {
innerMerge(responseMapBuilder, response);
}
final Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
Expand All @@ -245,14 +249,16 @@ private FieldCapabilitiesResponse merge(
}

private void addUnmappedFields(String[] indices, String field, Map<String, FieldCapabilities.Builder> typeMap) {
Set<String> unmappedIndices = new HashSet<>(Arrays.asList(indices));
typeMap.values().forEach((b) -> b.getIndices().forEach(unmappedIndices::remove));
if (unmappedIndices.isEmpty() == false) {
FieldCapabilities.Builder unmapped = new FieldCapabilities.Builder(field, "unmapped");
typeMap.put("unmapped", unmapped);
for (String index : unmappedIndices) {
unmapped.add(index, false, false, false, false, null, Collections.emptyMap());
final Set<String> mappedIndices = new HashSet<>();
typeMap.values().forEach(t -> t.getIndices(mappedIndices));
if (mappedIndices.size() != indices.length) {
final FieldCapabilities.Builder unmapped = new FieldCapabilities.Builder(field, "unmapped");
for (String index : indices) {
if (mappedIndices.contains(index) == false) {
unmapped.add(index, false, false, false, false, null, Collections.emptyMap());
}
}
typeMap.put("unmapped", unmapped);
}
}

Expand Down

0 comments on commit 21533cc

Please sign in to comment.