Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency issues of the _parent field data. #9030

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSortedSet;

import org.apache.lucene.index.*;
import org.apache.lucene.index.MultiDocValues.OrdinalMap;
import org.apache.lucene.util.Accountable;
Expand All @@ -35,6 +36,8 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -271,71 +274,49 @@ public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
}
}

private static OrdinalMap buildOrdinalMap(AtomicParentChildFieldData[] atomicFD, String parentType) throws IOException {
final SortedDocValues[] ordinals = new SortedDocValues[atomicFD.length];
for (int i = 0; i < ordinals.length; ++i) {
ordinals[i] = atomicFD[i].getOrdinalsValues(parentType);
}
return OrdinalMap.build(null, ordinals, PackedInts.DEFAULT);
}

private static class OrdinalMapAndAtomicFieldData {
final OrdinalMap ordMap;
final AtomicParentChildFieldData[] fieldData;

public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldData[] fieldData) {
this.ordMap = ordMap;
this.fieldData = fieldData;
}
}

@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
final long startTime = System.nanoTime();
final Map<String, SortedDocValues[]> types = new HashMap<>();
final Set<String> parentTypes = new HashSet<>();
synchronized (lock) {
for (BytesRef type : parentTypes) {
final SortedDocValues[] values = new SortedDocValues[indexReader.leaves().size()];
Arrays.fill(values, DocValues.emptySorted());
types.put(type.utf8ToString(), values);
for (BytesRef type : this.parentTypes) {
parentTypes.add(type.utf8ToString());
}
}

for (Map.Entry<String, SortedDocValues[]> entry : types.entrySet()) {
final String parentType = entry.getKey();
final SortedDocValues[] values = entry.getValue();
long ramBytesUsed = 0;
final Map<String, OrdinalMapAndAtomicFieldData> perType = new HashMap<>();
for (String type : parentTypes) {
final AtomicParentChildFieldData[] fieldData = new AtomicParentChildFieldData[indexReader.leaves().size()];
for (LeafReaderContext context : indexReader.leaves()) {
SortedDocValues vals = load(context).getOrdinalsValues(parentType);
if (vals != null) {
values[context.ord] = vals;
}
fieldData[context.ord] = load(context);
}
final OrdinalMap ordMap = buildOrdinalMap(fieldData, type);
ramBytesUsed += ordMap.ramBytesUsed();
perType.put(type, new OrdinalMapAndAtomicFieldData(ordMap, fieldData));
}

long ramBytesUsed = 0;
@SuppressWarnings("unchecked")
final Map<String, SortedDocValues>[] global = new Map[indexReader.leaves().size()];
for (Map.Entry<String, SortedDocValues[]> entry : types.entrySet()) {
final String parentType = entry.getKey();
final SortedDocValues[] values = entry.getValue();
final OrdinalMap ordinalMap = OrdinalMap.build(null, entry.getValue(), PackedInts.DEFAULT);
ramBytesUsed += ordinalMap.ramBytesUsed();
for (int i = 0; i < values.length; ++i) {
final SortedDocValues segmentValues = values[i];
final LongValues globalOrds = ordinalMap.getGlobalOrds(i);
final SortedDocValues globalSortedValues = new SortedDocValues() {
@Override
public BytesRef lookupOrd(int ord) {
final int segmentNum = ordinalMap.getFirstSegmentNumber(ord);
final int segmentOrd = (int) ordinalMap.getFirstSegmentOrd(ord);
return values[segmentNum].lookupOrd(segmentOrd);
}

@Override
public int getValueCount() {
return (int) ordinalMap.getValueCount();
}

@Override
public int getOrd(int docID) {
final int segmentOrd = segmentValues.getOrd(docID);
// TODO: is there a way we can get rid of this branch?
if (segmentOrd >= 0) {
return (int) globalOrds.get(segmentOrd);
} else {
return segmentOrd;
}
}
};
Map<String, SortedDocValues> perSegmentGlobal = global[i];
if (perSegmentGlobal == null) {
perSegmentGlobal = new HashMap<>(1);
global[i] = perSegmentGlobal;
}
perSegmentGlobal.put(parentType, globalSortedValues);
}
final AtomicParentChildFieldData[] fielddata = new AtomicParentChildFieldData[indexReader.leaves().size()];
for (int i = 0; i < fielddata.length; ++i) {
fielddata[i] = new GlobalAtomicFieldData(parentTypes, perType, i);
}

breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed);
Expand All @@ -346,59 +327,102 @@ public int getOrd(int docID) {
);
}

return new GlobalFieldData(indexReader, global, ramBytesUsed);
return new GlobalFieldData(indexReader, fielddata, ramBytesUsed);
}

private class GlobalFieldData implements IndexParentChildFieldData, Accountable {
private static class GlobalAtomicFieldData extends AbstractAtomicParentChildFieldData {

private final AtomicParentChildFieldData[] atomicFDs;
private final IndexReader reader;
private final long ramBytesUsed;
private final Set<String> types;
private final Map<String, OrdinalMapAndAtomicFieldData> atomicFD;
private final int segmentIndex;

GlobalFieldData(IndexReader reader, final Map<String, SortedDocValues>[] globalValues, long ramBytesUsed) {
this.reader = reader;
this.ramBytesUsed = ramBytesUsed;
this.atomicFDs = new AtomicParentChildFieldData[globalValues.length];
for (int i = 0; i < globalValues.length; ++i) {
final int ord = i;
atomicFDs[i] = new AbstractAtomicParentChildFieldData() {
@Override
public long ramBytesUsed() {
return 0;
}
public GlobalAtomicFieldData(Set<String> types, Map<String, OrdinalMapAndAtomicFieldData> atomicFD, int segmentIndex) {
this.types = types;
this.atomicFD = atomicFD;
this.segmentIndex = segmentIndex;
}

@Override
public Iterable<Accountable> getChildResources() {
// TODO: is this really the best?
return Collections.emptyList();
}
@Override
public Set<String> types() {
return types;
}

@Override
public void close() {
}
@Override
public SortedDocValues getOrdinalsValues(String type) {
final OrdinalMapAndAtomicFieldData atomicFD = this.atomicFD.get(type);
final OrdinalMap ordMap = atomicFD.ordMap;
final SortedDocValues[] allSegmentValues = new SortedDocValues[atomicFD.fieldData.length];
for (int i = 0; i < allSegmentValues.length; ++i) {
allSegmentValues[i] = atomicFD.fieldData[i].getOrdinalsValues(type);
}
final SortedDocValues segmentValues = allSegmentValues[segmentIndex];
if (segmentValues.getValueCount() == ordMap.getValueCount()) {
// ords are already global
return segmentValues;
}
final LongValues globalOrds = ordMap.getGlobalOrds(segmentIndex);
return new SortedDocValues() {

@Override
public BytesRef lookupOrd(int ord) {
final int segmentIndex = ordMap.getFirstSegmentNumber(ord);
final int segmentOrd = (int) ordMap.getFirstSegmentOrd(ord);
return allSegmentValues[segmentIndex].lookupOrd(segmentOrd);
}

@Override
public Set<String> types() {
return Collections.unmodifiableSet(globalValues[ord].keySet());
}
@Override
public int getValueCount() {
return (int) ordMap.getValueCount();
}

@Override
public SortedDocValues getOrdinalsValues(String type) {
SortedDocValues dv = globalValues[ord].get(type);
if (dv == null) {
dv = DocValues.emptySorted();
}
return dv;
@Override
public int getOrd(int docID) {
final int segmentOrd = segmentValues.getOrd(docID);
// TODO: is there a way we can get rid of this branch?
if (segmentOrd >= 0) {
return (int) globalOrds.get(segmentOrd);
} else {
return segmentOrd;
}
};
}
}
};
}

@Override
public long ramBytesUsed() {
// this class does not take memory on its own, the index-level field data does
// it through the use of ordinal maps
return 0;
}

@Override
public Iterable<Accountable> getChildResources() {
return Collections.emptyList();
}

@Override
public void close() throws ElasticsearchException {
List<Releasable> closeables = new ArrayList<>();
for (OrdinalMapAndAtomicFieldData fds : atomicFD.values()) {
closeables.addAll(Arrays.asList(fds.fieldData));
}
Releasables.close(closeables);
}

}

private class GlobalFieldData implements IndexParentChildFieldData, Accountable {

private final AtomicParentChildFieldData[] fielddata;
private final IndexReader reader;
private final long ramBytesUsed;

GlobalFieldData(IndexReader reader, AtomicParentChildFieldData[] fielddata, long ramBytesUsed) {
this.reader = reader;
this.ramBytesUsed = ramBytesUsed;
this.fielddata = fielddata;
}

@Override
public Names getFieldNames() {
return ParentChildIndexFieldData.this.getFieldNames();
Expand All @@ -412,7 +436,7 @@ public FieldDataType getFieldDataType() {
@Override
public AtomicParentChildFieldData load(LeafReaderContext context) {
assert context.reader().getCoreCacheKey() == reader.leaves().get(context.ord).reader().getCoreCacheKey();
return atomicFDs[context.ord];
return fielddata[context.ord];
}

@Override
Expand Down Expand Up @@ -445,6 +469,11 @@ public long ramBytesUsed() {
return ramBytesUsed;
}

@Override
public Iterable<Accountable> getChildResources() {
return Collections.emptyList();
}

@Override
public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
if (indexReader.getCoreCacheKey() == reader.getCoreCacheKey()) {
Expand Down
Expand Up @@ -23,19 +23,34 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.*;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource;
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.search.MultiValueMode;
import org.junit.Before;
import org.junit.Test;

import static org.hamcrest.Matchers.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;

/**
*/
Expand Down Expand Up @@ -184,6 +199,62 @@ public void testSorting() throws Exception {
assertThat(((FieldDoc) topDocs.scoreDocs[7]).fields[0], nullValue());
}

public void testThreads() throws Throwable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just throw Exception? (the error variable generic type should then get changed as well)

final ParentChildIndexFieldData indexFieldData = getForField(childType);
final DirectoryReader reader = DirectoryReader.open(writer, true);
final IndexParentChildFieldData global = indexFieldData.loadGlobal(reader);
final AtomicReference<Throwable> error = new AtomicReference<>();
final int numThreads = scaledRandomIntBetween(3, 8);
final Thread[] threads = new Thread[numThreads];
final CountDownLatch latch = new CountDownLatch(1);

final Map<Object, BytesRef[]> expected = new HashMap<>();
for (LeafReaderContext context : reader.leaves()) {
AtomicParentChildFieldData leafData = global.load(context);
SortedDocValues parentIds = leafData.getOrdinalsValues(parentType);
final BytesRef[] ids = new BytesRef[parentIds.getValueCount()];
for (int j = 0; j < parentIds.getValueCount(); ++j) {
final BytesRef id = parentIds.lookupOrd(j);
if (id != null) {
ids[j] = BytesRef.deepCopyOf(id);
}
}
expected.put(context.reader().getCoreCacheKey(), ids);
}

for (int i = 0; i < numThreads; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
for (int i = 0; i < 100000; ++i) {
for (LeafReaderContext context : reader.leaves()) {
AtomicParentChildFieldData leafData = global.load(context);
SortedDocValues parentIds = leafData.getOrdinalsValues(parentType);
final BytesRef[] expectedIds = expected.get(context.reader().getCoreCacheKey());
for (int j = 0; j < parentIds.getValueCount(); ++j) {
final BytesRef id = parentIds.lookupOrd(j);
assertEquals(expectedIds[j], id);
}
}
}
} catch (Throwable t) {
error.compareAndSet(null, t);
}
}
};
threads[i].start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
if (error.get() != null) {
throw error.get();
}
}

@Override
protected FieldDataType getFieldDataType() {
return new FieldDataType("_parent");
Expand Down