Skip to content

Commit

Permalink
[STORE] Add simple cache for StoreStats
Browse files Browse the repository at this point in the history
this commit tries to reduce the filesystem calls to fetch metadata
by using a simple cache on top of the stats call.

Relates to elastic#9683

Closes elastic#9709
  • Loading branch information
s1monw authored and bleskes committed Mar 3, 2015
1 parent c817fd3 commit a7fc58b
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 50 deletions.
82 changes: 82 additions & 0 deletions src/main/java/org/elasticsearch/common/util/SingleObjectCache.java
@@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;

import org.elasticsearch.common.unit.TimeValue;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* A very simple single object cache that allows non-blocking refresh calls
* triggered by expiry time.
*/
public abstract class SingleObjectCache<T> {

private volatile T cached;
private Lock refreshLock = new ReentrantLock();
private final TimeValue refreshInterval;
protected long lastRefreshTimestamp = 0;

protected SingleObjectCache(TimeValue refreshInterval, T initialValue) {
if (initialValue == null) {
throw new IllegalArgumentException("initialValue must not be null");
}
this.refreshInterval = refreshInterval;
cached = initialValue;
}


/**
* Returns the currently cached object and potentially refreshes the cache before returning.
*/
public T getOrRefresh() {
if (needsRefresh()) {
if (refreshLock.tryLock()) {
try {
if (needsRefresh()) { // check again!
cached = refresh();
assert cached != null;
lastRefreshTimestamp = System.currentTimeMillis();
}
} finally {
refreshLock.unlock();
}
}
}
assert cached != null;
return cached;
}

/**
* Returns a new instance to cache
*/
protected abstract T refresh();

/**
* Returns <code>true</code> iff the cache needs to be refreshed.
*/
protected boolean needsRefresh() {
if (refreshInterval.millis() == 0) {
return true;
}
final long currentTime = System.currentTimeMillis();
return (currentTime - lastRefreshTimestamp) > refreshInterval.millis();
}
}
45 changes: 43 additions & 2 deletions src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -29,6 +29,7 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand All @@ -38,9 +39,10 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.settings.IndexSettings;
Expand Down Expand Up @@ -84,13 +86,15 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
private static final int VERSION_START = 0;
private static final int VERSION = VERSION_STACK_TRACE;
private static final String CORRUPTED = "corrupted_";
public static final String INDEX_STORE_STATS_REFRESH_INTERVAL = "index.store.stats_refresh_interval";

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicInteger refCount = new AtomicInteger(1);
private final CodecService codecService;
private final DirectoryService directoryService;
private final StoreDirectory directory;
private final DistributorDirectory distributorDirectory;
private final SingleObjectCache<StoreStats> statsCache;

@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecService codecService, DirectoryService directoryService, Distributor distributor) throws IOException {
Expand All @@ -99,6 +103,9 @@ public Store(ShardId shardId, @IndexSettings Settings indexSettings, CodecServic
this.directoryService = directoryService;
this.distributorDirectory = new DistributorDirectory(distributor);
this.directory = new StoreDirectory(distributorDirectory);
final TimeValue refreshInterval = indexSettings.getAsTime(INDEX_STORE_STATS_REFRESH_INTERVAL, TimeValue.timeValueSeconds(10));
this.statsCache = new StoreStatsCache(refreshInterval, directory, directoryService);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
}


Expand Down Expand Up @@ -213,7 +220,7 @@ public void deleteContent() throws IOException {

public StoreStats stats() throws IOException {
ensureOpen();
return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
return statsCache.getOrRefresh();
}

public void renameFile(String from, String to) throws IOException {
Expand Down Expand Up @@ -550,6 +557,7 @@ private void innerClose() throws IOException {
public String toString() {
return "store(" + in.toString() + ")";
}

}

/** Log that we are about to delete this file, to the index.store.deletes component. */
Expand Down Expand Up @@ -1167,4 +1175,37 @@ public void markStoreCorrupted(CorruptIndexException exception) throws IOExcepti
directory().sync(Collections.singleton(uuid));
}
}

private static class StoreStatsCache extends SingleObjectCache<StoreStats> {
private final Directory directory;
private final DirectoryService directoryService;

public StoreStatsCache(TimeValue refreshInterval, Directory directory, DirectoryService directoryService) throws IOException {
super(refreshInterval, new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos()));
this.directory = directory;
this.directoryService = directoryService;
}

@Override
protected StoreStats refresh() {
try {
return new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos());
} catch (IOException ex) {
throw new ElasticsearchException("failed to refresh store stats");
}
}

private static long estimateSize(Directory directory) throws IOException {
long estimatedSize = 0;
String[] files = directory.listAll();
for (String file : files) {
try {
estimatedSize += directory.fileLength(file);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore, the file is not there no more
}
}
return estimatedSize;
}
}
}
27 changes: 16 additions & 11 deletions src/main/java/org/elasticsearch/monitor/fs/FsService.java
Expand Up @@ -23,33 +23,38 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;

/**
*/
public class FsService extends AbstractComponent {

private final FsProbe probe;

private final TimeValue refreshInterval;

private FsStats cachedStats;
private final SingleObjectCache<FsStats> fsStatsCache;

@Inject
public FsService(Settings settings, FsProbe probe) {
super(settings);
this.probe = probe;
this.cachedStats = probe.stats();

this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(1));

TimeValue refreshInterval = settings.getAsTime("monitor.fs.refresh_interval", TimeValue.timeValueSeconds(1));
fsStatsCache = new FsStatsCache(refreshInterval, probe.stats());
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
}

public synchronized FsStats stats() {
if ((System.currentTimeMillis() - cachedStats.getTimestamp()) > refreshInterval.millis()) {
cachedStats = probe.stats();
public FsStats stats() {
return fsStatsCache.getOrRefresh();
}

private class FsStatsCache extends SingleObjectCache<FsStats> {
public FsStatsCache(TimeValue interval, FsStats initValue) {
super(interval, initValue);
}

@Override
protected FsStats refresh() {
return probe.stats();
}
return cachedStats;
}

}
29 changes: 18 additions & 11 deletions src/main/java/org/elasticsearch/monitor/network/NetworkService.java
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;

import java.net.InetAddress;
import java.net.NetworkInterface;
Expand All @@ -31,29 +32,26 @@
/**
*
*/
public class NetworkService extends AbstractComponent {
public final class NetworkService extends AbstractComponent {

private final NetworkProbe probe;

private final NetworkInfo info;

private final TimeValue refreshInterval;

private NetworkStats cachedStats;
private final SingleObjectCache<NetworkStats> networkStatsCache;

@Inject
public NetworkService(Settings settings, NetworkProbe probe) {
super(settings);
this.probe = probe;

this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5));
TimeValue refreshInterval = settings.getAsTime("monitor.network.refresh_interval", TimeValue.timeValueSeconds(5));

logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);

this.info = probe.networkInfo();
this.info.refreshInterval = refreshInterval.millis();
this.cachedStats = probe.networkStats();

networkStatsCache = new NetworkStatsCache(refreshInterval, probe.networkStats());
if (logger.isDebugEnabled()) {
StringBuilder netDebug = new StringBuilder("net_info");
try {
Expand Down Expand Up @@ -104,17 +102,26 @@ public NetworkService(Settings settings, NetworkProbe probe) {
if (logger.isTraceEnabled()) {
logger.trace("ifconfig\n\n" + ifconfig());
}
stats(); // pull the stats one time
}

public NetworkInfo info() {
return this.info;
}

public synchronized NetworkStats stats() {
if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
cachedStats = probe.networkStats();
public NetworkStats stats() {
return networkStatsCache.getOrRefresh();
}

private class NetworkStatsCache extends SingleObjectCache<NetworkStats> {
public NetworkStatsCache(TimeValue interval, NetworkStats initValue) {
super(interval, initValue);
}

@Override
protected NetworkStats refresh() {
return probe.networkStats();
}
return cachedStats;
}

public String ifconfig() {
Expand Down
24 changes: 15 additions & 9 deletions src/main/java/org/elasticsearch/monitor/os/OsService.java
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;

/**
*
Expand All @@ -33,22 +34,19 @@ public class OsService extends AbstractComponent {

private final OsInfo info;

private final TimeValue refreshInterval;

private OsStats cachedStats;
private SingleObjectCache<OsStats> osStatsCache;

@Inject
public OsService(Settings settings, OsProbe probe) {
super(settings);
this.probe = probe;

this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(1));
TimeValue refreshInterval = settings.getAsTime("monitor.os.refresh_interval", TimeValue.timeValueSeconds(1));

this.info = probe.osInfo();
this.info.refreshInterval = refreshInterval.millis();
this.info.availableProcessors = Runtime.getRuntime().availableProcessors();
this.cachedStats = probe.osStats();

osStatsCache = new OsStatsCache(refreshInterval, probe.osStats());
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
}

Expand All @@ -57,9 +55,17 @@ public OsInfo info() {
}

public synchronized OsStats stats() {
if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
cachedStats = probe.osStats();
return osStatsCache.getOrRefresh();
}

private class OsStatsCache extends SingleObjectCache<OsStats> {
public OsStatsCache(TimeValue interval, OsStats initValue) {
super(interval, initValue);
}

@Override
protected OsStats refresh() {
return probe.osStats();
}
return cachedStats;
}
}

0 comments on commit a7fc58b

Please sign in to comment.