Skip to content

Commit

Permalink
Parent/child: Fix concurrency issues of the _parent field data.
Browse files Browse the repository at this point in the history
`_parent` field data mistakenly shared some stateful data-structures across
threads.

Close elastic#8396
  • Loading branch information
jpountz committed Dec 24, 2014
1 parent 6558783 commit 7f35edc
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 88 deletions.
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSortedSet;

import org.apache.lucene.index.*;
import org.apache.lucene.index.MultiDocValues.OrdinalMap;
import org.apache.lucene.util.Accountable;
Expand All @@ -35,6 +36,8 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -271,71 +274,49 @@ public IndexParentChildFieldData loadGlobal(IndexReader indexReader) {
}
}

private static OrdinalMap buildOrdinalMap(AtomicParentChildFieldData[] atomicFD, String parentType) throws IOException {
final SortedDocValues[] ordinals = new SortedDocValues[atomicFD.length];
for (int i = 0; i < ordinals.length; ++i) {
ordinals[i] = atomicFD[i].getOrdinalsValues(parentType);
}
return OrdinalMap.build(null, ordinals, PackedInts.DEFAULT);
}

private static class OrdinalMapAndAtomicFieldData {
final OrdinalMap ordMap;
final AtomicParentChildFieldData[] fieldData;

public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldData[] fieldData) {
this.ordMap = ordMap;
this.fieldData = fieldData;
}
}

@Override
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
final long startTime = System.nanoTime();
final Map<String, SortedDocValues[]> types = new HashMap<>();
final Set<String> parentTypes = new HashSet<>();
synchronized (lock) {
for (BytesRef type : parentTypes) {
final SortedDocValues[] values = new SortedDocValues[indexReader.leaves().size()];
Arrays.fill(values, DocValues.emptySorted());
types.put(type.utf8ToString(), values);
for (BytesRef type : this.parentTypes) {
parentTypes.add(type.utf8ToString());
}
}

for (Map.Entry<String, SortedDocValues[]> entry : types.entrySet()) {
final String parentType = entry.getKey();
final SortedDocValues[] values = entry.getValue();
long ramBytesUsed = 0;
final Map<String, OrdinalMapAndAtomicFieldData> perType = new HashMap<>();
for (String type : parentTypes) {
final AtomicParentChildFieldData[] fieldData = new AtomicParentChildFieldData[indexReader.leaves().size()];
for (AtomicReaderContext context : indexReader.leaves()) {
SortedDocValues vals = load(context).getOrdinalsValues(parentType);
if (vals != null) {
values[context.ord] = vals;
}
fieldData[context.ord] = load(context);
}
final OrdinalMap ordMap = buildOrdinalMap(fieldData, type);
ramBytesUsed += ordMap.ramBytesUsed();
perType.put(type, new OrdinalMapAndAtomicFieldData(ordMap, fieldData));
}

long ramBytesUsed = 0;
@SuppressWarnings("unchecked")
final Map<String, SortedDocValues>[] global = new Map[indexReader.leaves().size()];
for (Map.Entry<String, SortedDocValues[]> entry : types.entrySet()) {
final String parentType = entry.getKey();
final SortedDocValues[] values = entry.getValue();
final OrdinalMap ordinalMap = OrdinalMap.build(null, entry.getValue(), PackedInts.DEFAULT);
ramBytesUsed += ordinalMap.ramBytesUsed();
for (int i = 0; i < values.length; ++i) {
final SortedDocValues segmentValues = values[i];
final LongValues globalOrds = ordinalMap.getGlobalOrds(i);
final SortedDocValues globalSortedValues = new SortedDocValues() {
@Override
public BytesRef lookupOrd(int ord) {
final int segmentNum = ordinalMap.getFirstSegmentNumber(ord);
final int segmentOrd = (int) ordinalMap.getFirstSegmentOrd(ord);
return values[segmentNum].lookupOrd(segmentOrd);
}

@Override
public int getValueCount() {
return (int) ordinalMap.getValueCount();
}

@Override
public int getOrd(int docID) {
final int segmentOrd = segmentValues.getOrd(docID);
// TODO: is there a way we can get rid of this branch?
if (segmentOrd >= 0) {
return (int) globalOrds.get(segmentOrd);
} else {
return segmentOrd;
}
}
};
Map<String, SortedDocValues> perSegmentGlobal = global[i];
if (perSegmentGlobal == null) {
perSegmentGlobal = new HashMap<>(1);
global[i] = perSegmentGlobal;
}
perSegmentGlobal.put(parentType, globalSortedValues);
}
final AtomicParentChildFieldData[] fielddata = new AtomicParentChildFieldData[indexReader.leaves().size()];
for (int i = 0; i < fielddata.length; ++i) {
fielddata[i] = new GlobalAtomicFieldData(parentTypes, perType, i);
}

breakerService.getBreaker(CircuitBreaker.Name.FIELDDATA).addWithoutBreaking(ramBytesUsed);
Expand All @@ -346,46 +327,95 @@ public int getOrd(int docID) {
);
}

return new GlobalFieldData(indexReader, global, ramBytesUsed);
return new GlobalFieldData(indexReader, fielddata, ramBytesUsed);
}

private class GlobalFieldData implements IndexParentChildFieldData, Accountable {
private static class GlobalAtomicFieldData extends AbstractAtomicParentChildFieldData {

private final AtomicParentChildFieldData[] atomicFDs;
private final IndexReader reader;
private final long ramBytesUsed;
private final Set<String> types;
private final Map<String, OrdinalMapAndAtomicFieldData> atomicFD;
private final int segmentIndex;

GlobalFieldData(IndexReader reader, final Map<String, SortedDocValues>[] globalValues, long ramBytesUsed) {
this.reader = reader;
this.ramBytesUsed = ramBytesUsed;
this.atomicFDs = new AtomicParentChildFieldData[globalValues.length];
for (int i = 0; i < globalValues.length; ++i) {
final int ord = i;
atomicFDs[i] = new AbstractAtomicParentChildFieldData() {
@Override
public long ramBytesUsed() {
return 0;
}
public GlobalAtomicFieldData(Set<String> types, Map<String, OrdinalMapAndAtomicFieldData> atomicFD, int segmentIndex) {
this.types = types;
this.atomicFD = atomicFD;
this.segmentIndex = segmentIndex;
}

@Override
public void close() {
}
@Override
public Set<String> types() {
return types;
}

@Override
public Set<String> types() {
return Collections.unmodifiableSet(globalValues[ord].keySet());
}
@Override
public SortedDocValues getOrdinalsValues(String type) {
final OrdinalMapAndAtomicFieldData atomicFD = this.atomicFD.get(type);
final OrdinalMap ordMap = atomicFD.ordMap;
final SortedDocValues[] allSegmentValues = new SortedDocValues[atomicFD.fieldData.length];
for (int i = 0; i < allSegmentValues.length; ++i) {
allSegmentValues[i] = atomicFD.fieldData[i].getOrdinalsValues(type);
}
final SortedDocValues segmentValues = allSegmentValues[segmentIndex];
if (segmentValues.getValueCount() == ordMap.getValueCount()) {
// ords are already global
return segmentValues;
}
final LongValues globalOrds = ordMap.getGlobalOrds(segmentIndex);
return new SortedDocValues() {

@Override
public BytesRef lookupOrd(int ord) {
final int segmentIndex = ordMap.getFirstSegmentNumber(ord);
final int segmentOrd = (int) ordMap.getFirstSegmentOrd(ord);
return allSegmentValues[segmentIndex].lookupOrd(segmentOrd);
}

@Override
public int getValueCount() {
return (int) ordMap.getValueCount();
}

@Override
public SortedDocValues getOrdinalsValues(String type) {
SortedDocValues dv = globalValues[ord].get(type);
if (dv == null) {
dv = DocValues.emptySorted();
}
return dv;
@Override
public int getOrd(int docID) {
final int segmentOrd = segmentValues.getOrd(docID);
// TODO: is there a way we can get rid of this branch?
if (segmentOrd >= 0) {
return (int) globalOrds.get(segmentOrd);
} else {
return segmentOrd;
}
};
}
};
}

@Override
public long ramBytesUsed() {
// this class does not take memory on its own, the index-level field data does
// it through the use of ordinal maps
return 0;
}

@Override
public void close() throws ElasticsearchException {
List<Releasable> closeables = new ArrayList<>();
for (OrdinalMapAndAtomicFieldData fds : atomicFD.values()) {
closeables.addAll(Arrays.asList(fds.fieldData));
}
Releasables.close(closeables);
}

}

private class GlobalFieldData implements IndexParentChildFieldData, Accountable {

private final AtomicParentChildFieldData[] fielddata;
private final IndexReader reader;
private final long ramBytesUsed;

GlobalFieldData(IndexReader reader, AtomicParentChildFieldData[] fielddata, long ramBytesUsed) {
this.reader = reader;
this.ramBytesUsed = ramBytesUsed;
this.fielddata = fielddata;
}

@Override
Expand All @@ -401,7 +431,7 @@ public FieldDataType getFieldDataType() {
@Override
public AtomicParentChildFieldData load(AtomicReaderContext context) {
assert context.reader().getCoreCacheKey() == reader.leaves().get(context.ord).reader().getCoreCacheKey();
return atomicFDs[context.ord];
return fielddata[context.ord];
}

@Override
Expand Down
Expand Up @@ -22,20 +22,35 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.*;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource;
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.search.MultiValueMode;
import org.junit.Before;
import org.junit.Test;

import static org.hamcrest.Matchers.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;

/**
*/
Expand Down Expand Up @@ -184,6 +199,62 @@ public void testSorting() throws Exception {
assertThat(((FieldDoc) topDocs.scoreDocs[7]).fields[0], nullValue());
}

public void testThreads() throws Exception {
final ParentChildIndexFieldData indexFieldData = getForField(childType);
final DirectoryReader reader = DirectoryReader.open(writer, true);
final IndexParentChildFieldData global = indexFieldData.loadGlobal(reader);
final AtomicReference<Exception> error = new AtomicReference<>();
final int numThreads = scaledRandomIntBetween(3, 8);
final Thread[] threads = new Thread[numThreads];
final CountDownLatch latch = new CountDownLatch(1);

final Map<Object, BytesRef[]> expected = new HashMap<>();
for (AtomicReaderContext context : reader.leaves()) {
AtomicParentChildFieldData leafData = global.load(context);
SortedDocValues parentIds = leafData.getOrdinalsValues(parentType);
final BytesRef[] ids = new BytesRef[parentIds.getValueCount()];
for (int j = 0; j < parentIds.getValueCount(); ++j) {
final BytesRef id = parentIds.lookupOrd(j);
if (id != null) {
ids[j] = BytesRef.deepCopyOf(id);
}
}
expected.put(context.reader().getCoreCacheKey(), ids);
}

for (int i = 0; i < numThreads; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
for (int i = 0; i < 100000; ++i) {
for (AtomicReaderContext context : reader.leaves()) {
AtomicParentChildFieldData leafData = global.load(context);
SortedDocValues parentIds = leafData.getOrdinalsValues(parentType);
final BytesRef[] expectedIds = expected.get(context.reader().getCoreCacheKey());
for (int j = 0; j < parentIds.getValueCount(); ++j) {
final BytesRef id = parentIds.lookupOrd(j);
assertEquals(expectedIds[j], id);
}
}
}
} catch (Exception e) {
error.compareAndSet(null, e);
}
}
};
threads[i].start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
if (error.get() != null) {
throw error.get();
}
}

@Override
protected FieldDataType getFieldDataType() {
return new FieldDataType("_parent");
Expand Down

0 comments on commit 7f35edc

Please sign in to comment.