Skip to content

Commit

Permalink
Fix FieldDataWeighter generics to accept RamUsage instead of AtomicFi…
Browse files Browse the repository at this point in the history
…eldData

The `FieldDataWeighter` allowed to use a concrete subclass of the caches
generic type to be used that causes ClassCastException and also trips the
CirciutBreaker to not be decremented appropriately.

This was tripped by settings randomization also part of this commit.

Closes #6260
  • Loading branch information
s1monw committed May 21, 2014
1 parent 03402c7 commit 17d34d5
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 29 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SegmentReader;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.SegmentReaderUtils;
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsIndexFieldData;
import org.elasticsearch.index.mapper.FieldMapper;
Expand Down Expand Up @@ -76,9 +77,11 @@ static abstract class FieldBased implements IndexFieldDataCache, SegmentReader.C
private final FieldDataType fieldDataType;
private final Cache<Key, RamUsage> cache;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
private final ESLogger logger;

protected FieldBased(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
protected FieldBased(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
assert indexService != null;
this.logger = logger;
this.indexService = indexService;
this.fieldNames = fieldNames;
this.fieldDataType = fieldDataType;
Expand All @@ -100,7 +103,11 @@ public void onRemoval(RemovalNotification<Key, RamUsage> notification) {
sizeInBytes = value.getMemorySizeInBytes();
}
for (Listener listener : key.listeners) {
listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes);
try {
listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes);
} catch (Throwable e) {
logger.error("Failed to call listener on field data cache unloading", e);
}
}
}

Expand All @@ -112,8 +119,7 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(fina
@Override
public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.sizeInBytes = fieldData.getMemorySizeInBytes();

key.listeners.add(indicesFieldDataCacheListener);
final ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
Expand All @@ -122,8 +128,15 @@ public AtomicFieldData call() throws Exception {
key.listeners.add(shard.fieldData());
}
}
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.sizeInBytes = fieldData.getMemorySizeInBytes();
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
try {
listener.onLoad(fieldNames, fieldDataType, fieldData);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on atomic field data loading", e);
}
}
return fieldData;
}
Expand All @@ -137,8 +150,7 @@ public <IFD extends IndexFieldData.WithOrdinals<?>> IFD load(final IndexReader i
@Override
public GlobalOrdinalsIndexFieldData call() throws Exception {
indexReader.addReaderClosedListener(FieldBased.this);
GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
key.sizeInBytes = ifd.getMemorySizeInBytes();

key.listeners.add(indicesFieldDataCacheListener);
final ShardId shardId = ShardUtils.extractShardId(indexReader);
if (shardId != null) {
Expand All @@ -147,8 +159,15 @@ public GlobalOrdinalsIndexFieldData call() throws Exception {
key.listeners.add(shard.fieldData());
}
}
GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
key.sizeInBytes = ifd.getMemorySizeInBytes();
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, ifd);
try {
listener.onLoad(fieldNames, fieldDataType, ifd);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on global ordinals loading", e);
}
}

return ifd;
Expand Down Expand Up @@ -207,15 +226,15 @@ public int hashCode() {

static class Resident extends FieldBased {

public Resident(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
public Resident(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(logger, indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
}
}

static class Soft extends FieldBased {

public Soft(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
public Soft(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(logger, indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
}
}
}
Expand Up @@ -246,9 +246,9 @@ public <IFD extends IndexFieldData<?>> IFD getForField(FieldMapper<?> mapper) {
// this means changing the node level settings is simple, just set the bounds there
String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node"));
if ("resident".equals(cacheType)) {
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener);
cache = new IndexFieldDataCache.Resident(logger, indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("soft".equals(cacheType)) {
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener);
cache = new IndexFieldDataCache.Soft(logger, indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("node".equals(cacheType)) {
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
} else {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.index.SegmentReader;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.SegmentReaderUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void close() {
}

public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
return new IndexFieldCache(cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType);
return new IndexFieldCache(logger, cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType);
}

public Cache<Key, RamUsage> getCache() {
Expand All @@ -95,15 +96,20 @@ public void onRemoval(RemovalNotification<Key, RamUsage> notification) {
sizeInBytes = value.getMemorySizeInBytes();
}
for (IndexFieldDataCache.Listener listener : key.listeners) {
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes);
try {
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on field data cache unloading", e);
}
}
}

public static class FieldDataWeigher implements Weigher<Key, AtomicFieldData> {
public static class FieldDataWeigher implements Weigher<Key, RamUsage> {

@Override
public int weigh(Key key, AtomicFieldData fieldData) {
int weight = (int) Math.min(fieldData.getMemorySizeInBytes(), Integer.MAX_VALUE);
public int weigh(Key key, RamUsage ramUsage) {
int weight = (int) Math.min(ramUsage.getMemorySizeInBytes(), Integer.MAX_VALUE);
return weight == 0 ? 1 : weight;
}
}
Expand All @@ -112,14 +118,16 @@ public int weigh(Key key, AtomicFieldData fieldData) {
* A specific cache instance for the relevant parameters of it (index, fieldNames, fieldType).
*/
static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener {

private final ESLogger logger;
private final IndexService indexService;
final Index index;
final FieldMapper.Names fieldNames;
final FieldDataType fieldDataType;
private final Cache<Key, RamUsage> cache;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

IndexFieldCache(final Cache<Key, RamUsage> cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
IndexFieldCache(ESLogger logger,final Cache<Key, RamUsage> cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
this.logger = logger;
this.indexService = indexService;
this.index = index;
this.fieldNames = fieldNames;
Expand All @@ -129,8 +137,6 @@ static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreC
assert indexService != null;
}

private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

@Override
public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(final AtomicReaderContext context, final IFD indexFieldData) throws Exception {
final Key key = new Key(this, context.reader().getCoreCacheKey());
Expand All @@ -139,8 +145,7 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(fina
@Override
public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.sizeInBytes = fieldData.getMemorySizeInBytes();

key.listeners.add(indicesFieldDataCacheListener);
final ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
Expand All @@ -149,22 +154,29 @@ public AtomicFieldData call() throws Exception {
key.listeners.add(shard.fieldData());
}
}
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
try {
listener.onLoad(fieldNames, fieldDataType, fieldData);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on atomic field data loading", e);
}
}
key.sizeInBytes = fieldData.getMemorySizeInBytes();
return fieldData;
}
});
}

public <IFD extends IndexFieldData.WithOrdinals<?>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception {
final Key key = new Key(this, indexReader.getCoreCacheKey());

//noinspection unchecked
return (IFD) cache.get(key, new Callable<RamUsage>() {
@Override
public RamUsage call() throws Exception {
indexReader.addReaderClosedListener(IndexFieldCache.this);
GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
key.listeners.add(indicesFieldDataCacheListener);
final ShardId shardId = ShardUtils.extractShardId(indexReader);
if (shardId != null) {
Expand All @@ -173,8 +185,14 @@ public RamUsage call() throws Exception {
key.listeners.add(shard.fieldData());
}
}
final GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, ifd);
try {
listener.onLoad(fieldNames, fieldDataType, ifd);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on global ordinals loading", e);
}
}
return ifd;
}
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest;

/**
*/
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/org/elasticsearch/test/TestCluster.java
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArraysModule;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand Down Expand Up @@ -310,6 +311,15 @@ private static Settings getRandomNodeSettings(long seed) {
} else {
builder.put(EsExecutors.PROCESSORS, AbstractRandomizedTest.TESTS_PROCESSORS);
}

if (random.nextBoolean()) {
if (random.nextBoolean()) {
builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);
}
if (random.nextBoolean()) {
builder.put("indices.fielddata.cache.expire", TimeValue.timeValueMillis(1 + random.nextInt(10000)));
}
}
return builder.build();
}

Expand Down

0 comments on commit 17d34d5

Please sign in to comment.