diff --git a/src/main/java/org/apache/lucene/index/TrackingMergeScheduler.java b/src/main/java/org/apache/lucene/index/TrackingMergeScheduler.java index cd09e513df4b1..57ec000c0f29b 100644 --- a/src/main/java/org/apache/lucene/index/TrackingMergeScheduler.java +++ b/src/main/java/org/apache/lucene/index/TrackingMergeScheduler.java @@ -1,3 +1,22 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.lucene.index; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; diff --git a/src/main/java/org/apache/lucene/store/StoreRateLimiting.java b/src/main/java/org/apache/lucene/store/StoreRateLimiting.java new file mode 100644 index 0000000000000..829a54debfbb0 --- /dev/null +++ b/src/main/java/org/apache/lucene/store/StoreRateLimiting.java @@ -0,0 +1,76 @@ +package org.apache.lucene.store; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.RateLimiter; +import org.elasticsearch.common.unit.ByteSizeValue; + +/** + */ +public class StoreRateLimiting { + + public static interface Provider { + + StoreRateLimiting rateLimiting(); + } + + public interface Listener { + + void onPause(long nanos); + } + + public static enum Type { + NONE, + MERGE, + ALL; + + public static Type fromString(String type) throws ElasticSearchIllegalArgumentException { + if ("none".equalsIgnoreCase(type)) { + return NONE; + } else if ("merge".equalsIgnoreCase(type)) { + return MERGE; + } else if ("all".equalsIgnoreCase(type)) { + return ALL; + } + throw new ElasticSearchIllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none]"); + } + } + + private final RateLimiter rateLimiter = new RateLimiter(0); + private volatile RateLimiter actualRateLimiter; + + private volatile Type type; + + public StoreRateLimiting() { + + } + + @Nullable + public RateLimiter getRateLimiter() { + return actualRateLimiter; + } + + public void setMaxRate(ByteSizeValue rate) { + if (rate.bytes() <= 0) { + actualRateLimiter = null; + } else if (actualRateLimiter == null) { + actualRateLimiter = rateLimiter; + actualRateLimiter.setMaxRate(rate.mbFrac()); + } else { + assert rateLimiter == actualRateLimiter; + rateLimiter.setMaxRate(rate.mbFrac()); + } + } + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public void setType(String type) throws ElasticSearchIllegalArgumentException { + this.type = Type.fromString(type); + } +} diff --git a/src/main/java/org/apache/lucene/store/XFSIndexOutput.java b/src/main/java/org/apache/lucene/store/XFSIndexOutput.java new file mode 100644 index 0000000000000..479c27287cc04 --- /dev/null +++ b/src/main/java/org/apache/lucene/store/XFSIndexOutput.java @@ -0,0 +1,26 @@ +package org.apache.lucene.store; + +import org.elasticsearch.common.RateLimiter; + +import java.io.IOException; + +/** + */ +class XFSIndexOutput extends FSDirectory.FSIndexOutput { + + private final RateLimiter rateLimiter; + + private final StoreRateLimiting.Listener rateListener; + + XFSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter, StoreRateLimiting.Listener rateListener) throws IOException { + super(parent, name); + this.rateLimiter = rateLimiter; + this.rateListener = rateListener; + } + + @Override + public void flushBuffer(byte[] b, int offset, int size) throws IOException { + rateListener.onPause(rateLimiter.pause(size)); + super.flushBuffer(b, offset, size); + } +} diff --git a/src/main/java/org/apache/lucene/store/XMMapFSDirectory.java b/src/main/java/org/apache/lucene/store/XMMapFSDirectory.java new file mode 100644 index 0000000000000..9b9b2c09dd8f8 --- /dev/null +++ b/src/main/java/org/apache/lucene/store/XMMapFSDirectory.java @@ -0,0 +1,64 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lucene.store; + +import org.apache.lucene.index.TrackingMergeScheduler; +import org.elasticsearch.common.RateLimiter; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class XMMapFSDirectory extends NIOFSDirectory { + + private final StoreRateLimiting.Provider rateLimitingProvider; + + private final StoreRateLimiting.Listener rateListener; + + public XMMapFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException { + super(path, lockFactory); + this.rateLimitingProvider = rateLimitingProvider; + this.rateListener = rateListener; + } + + @Override + public IndexOutput createOutput(String name) throws IOException { + StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting(); + StoreRateLimiting.Type type = rateLimiting.getType(); + RateLimiter limiter = rateLimiting.getRateLimiter(); + if (type == StoreRateLimiting.Type.NONE || limiter == null) { + return super.createOutput(name); + } + if (TrackingMergeScheduler.getCurrentMerge() != null) { + // we are mering, and type is either MERGE or ALL, rate limit... + ensureOpen(); + ensureCanWrite(name); + return new XFSIndexOutput(this, name, limiter, rateListener); + } + if (type == StoreRateLimiting.Type.ALL) { + ensureOpen(); + ensureCanWrite(name); + return new XFSIndexOutput(this, name, limiter, rateListener); + } + // we shouldn't really get here... + return super.createOutput(name); + } +} diff --git a/src/main/java/org/apache/lucene/store/XNIOFSDirectory.java b/src/main/java/org/apache/lucene/store/XNIOFSDirectory.java new file mode 100644 index 0000000000000..32c936d1fe9e4 --- /dev/null +++ b/src/main/java/org/apache/lucene/store/XNIOFSDirectory.java @@ -0,0 +1,64 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lucene.store; + +import org.apache.lucene.index.TrackingMergeScheduler; +import org.elasticsearch.common.RateLimiter; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class XNIOFSDirectory extends NIOFSDirectory { + + private final StoreRateLimiting.Provider rateLimitingProvider; + + private final StoreRateLimiting.Listener rateListener; + + public XNIOFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException { + super(path, lockFactory); + this.rateLimitingProvider = rateLimitingProvider; + this.rateListener = rateListener; + } + + @Override + public IndexOutput createOutput(String name) throws IOException { + StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting(); + StoreRateLimiting.Type type = rateLimiting.getType(); + RateLimiter limiter = rateLimiting.getRateLimiter(); + if (type == StoreRateLimiting.Type.NONE || limiter == null) { + return super.createOutput(name); + } + if (TrackingMergeScheduler.getCurrentMerge() != null) { + // we are mering, and type is either MERGE or ALL, rate limit... + ensureOpen(); + ensureCanWrite(name); + return new XFSIndexOutput(this, name, limiter, rateListener); + } + if (type == StoreRateLimiting.Type.ALL) { + ensureOpen(); + ensureCanWrite(name); + return new XFSIndexOutput(this, name, limiter, rateListener); + } + // we shouldn't really get here... + return super.createOutput(name); + } +} diff --git a/src/main/java/org/apache/lucene/store/XSimpleFSDirectory.java b/src/main/java/org/apache/lucene/store/XSimpleFSDirectory.java new file mode 100644 index 0000000000000..860123b2f34bc --- /dev/null +++ b/src/main/java/org/apache/lucene/store/XSimpleFSDirectory.java @@ -0,0 +1,64 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lucene.store; + +import org.apache.lucene.index.TrackingMergeScheduler; +import org.elasticsearch.common.RateLimiter; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class XSimpleFSDirectory extends SimpleFSDirectory { + + private final StoreRateLimiting.Provider rateLimitingProvider; + + private final StoreRateLimiting.Listener rateListener; + + public XSimpleFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException { + super(path, lockFactory); + this.rateLimitingProvider = rateLimitingProvider; + this.rateListener = rateListener; + } + + @Override + public IndexOutput createOutput(String name) throws IOException { + StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting(); + StoreRateLimiting.Type type = rateLimiting.getType(); + RateLimiter limiter = rateLimiting.getRateLimiter(); + if (type == StoreRateLimiting.Type.NONE || limiter == null) { + return super.createOutput(name); + } + if (TrackingMergeScheduler.getCurrentMerge() != null) { + // we are mering, and type is either MERGE or ALL, rate limit... + ensureOpen(); + ensureCanWrite(name); + return new XFSIndexOutput(this, name, limiter, rateListener); + } + if (type == StoreRateLimiting.Type.ALL) { + ensureOpen(); + ensureCanWrite(name); + return new XFSIndexOutput(this, name, limiter, rateListener); + } + // we shouldn't really get here... + return super.createOutput(name); + } +} diff --git a/src/main/java/org/elasticsearch/common/RateLimiter.java b/src/main/java/org/elasticsearch/common/RateLimiter.java index b6f3fde6d855b..c9595c80078d8 100644 --- a/src/main/java/org/elasticsearch/common/RateLimiter.java +++ b/src/main/java/org/elasticsearch/common/RateLimiter.java @@ -53,7 +53,7 @@ public void setMaxRate(double mbPerSec) { * might exceed the target). It's best to call this * with a biggish count, not one byte at a time. */ - public void pause(long bytes) { + public long pause(long bytes) { // TODO: this is purely instantenous rate; maybe we // should also offer decayed recent history one? @@ -65,11 +65,13 @@ public void pause(long bytes) { // While loop because Thread.sleep doesn't alway sleep // enough: + long totalPauseTime = 0; while (true) { final long pauseNS = targetNS - curNS; if (pauseNS > 0) { try { Thread.sleep((int) (pauseNS / 1000000), (int) (pauseNS % 1000000)); + totalPauseTime += pauseNS; } catch (InterruptedException ie) { throw new ElasticSearchInterruptedException("interrupted while rate limiting", ie); } @@ -78,5 +80,6 @@ public void pause(long bytes) { } break; } + return totalPauseTime; } } diff --git a/src/main/java/org/elasticsearch/index/service/IndexService.java b/src/main/java/org/elasticsearch/index/service/IndexService.java index d26c8a61cb3c7..6613610292ddc 100644 --- a/src/main/java/org/elasticsearch/index/service/IndexService.java +++ b/src/main/java/org/elasticsearch/index/service/IndexService.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.percolator.PercolatorService; import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; @@ -47,6 +48,8 @@ public interface IndexService extends IndexComponent, Iterable { IndexCache cache(); + IndexSettingsService settingsService(); + PercolatorService percolateService(); AnalysisService analysisService(); diff --git a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java index 139c53502da44..35a283babd0a7 100644 --- a/src/main/java/org/elasticsearch/index/service/InternalIndexService.java +++ b/src/main/java/org/elasticsearch/index/service/InternalIndexService.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.search.stats.ShardSearchModule; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.IndexShardCreationException; import org.elasticsearch.index.shard.IndexShardManagement; import org.elasticsearch.index.shard.IndexShardModule; @@ -117,6 +118,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde private final IndexStore indexStore; + private final IndexSettingsService settingsService; + private volatile ImmutableMap shardsInjectors = ImmutableMap.of(); private volatile ImmutableMap shards = ImmutableMap.of(); @@ -127,7 +130,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, PercolatorService percolatorService, AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService, - IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore) { + IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService) { super(index, indexSettings); this.injector = injector; this.nodeEnv = nodeEnv; @@ -143,6 +146,7 @@ public InternalIndexService(Injector injector, Index index, @IndexSettings Setti this.indexEngine = indexEngine; this.indexGateway = indexGateway; this.indexStore = indexStore; + this.settingsService = settingsService; this.pluginsService = injector.getInstance(PluginsService.class); this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class); @@ -192,6 +196,11 @@ public IndexGateway gateway() { return indexGateway; } + @Override + public IndexSettingsService settingsService() { + return this.settingsService; + } + @Override public IndexStore store() { return indexStore; diff --git a/src/main/java/org/elasticsearch/index/store/DirectoryService.java b/src/main/java/org/elasticsearch/index/store/DirectoryService.java index 769aae01aa78c..8d3d88a557654 100644 --- a/src/main/java/org/elasticsearch/index/store/DirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/DirectoryService.java @@ -29,6 +29,8 @@ public interface DirectoryService { Directory[] build() throws IOException; + long throttleTimeInNanos(); + void renameFile(Directory dir, String from, String to) throws IOException; void fullDelete(Directory dir) throws IOException; diff --git a/src/main/java/org/elasticsearch/index/store/IndexStore.java b/src/main/java/org/elasticsearch/index/store/IndexStore.java index 95e0f3bfe6391..becc80d9af50d 100644 --- a/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -19,24 +19,32 @@ package org.elasticsearch.index.store; +import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.index.IndexComponent; +import org.elasticsearch.index.CloseableIndexComponent; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.store.IndicesStore; import java.io.IOException; /** * Index store is an index level information of the {@link Store} each shard will use. - * - * */ -public interface IndexStore extends IndexComponent { +public interface IndexStore extends CloseableIndexComponent { /** * Is the store a persistent store that can survive full restarts. */ boolean persistent(); + IndicesStore indicesStore(); + + /** + * Returns the rate limiting, either of the index is explicitly configured, or + * the node level one (defaults to the node level one). + */ + StoreRateLimiting rateLimiting(); + /** * The shard store class that should be used for each shard. */ diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 6b80acf5878d2..18d98058eebc2 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -173,7 +173,7 @@ public void fullDelete() throws IOException { } public StoreStats stats() throws IOException { - return new StoreStats(Directories.estimateSize(directory)); + return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos()); } public ByteSizeValue estimateSize() throws IOException { diff --git a/src/main/java/org/elasticsearch/index/store/StoreStats.java b/src/main/java/org/elasticsearch/index/store/StoreStats.java index fb1d0d7100cc4..2de5cfc6c05f8 100644 --- a/src/main/java/org/elasticsearch/index/store/StoreStats.java +++ b/src/main/java/org/elasticsearch/index/store/StoreStats.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -35,12 +36,15 @@ public class StoreStats implements Streamable, ToXContent { private long sizeInBytes; + private long throttleTimeInNanos; + public StoreStats() { } - public StoreStats(long sizeInBytes) { + public StoreStats(long sizeInBytes, long throttleTimeInNanos) { this.sizeInBytes = sizeInBytes; + this.throttleTimeInNanos = throttleTimeInNanos; } public void add(StoreStats stats) { @@ -48,6 +52,7 @@ public void add(StoreStats stats) { return; } sizeInBytes += stats.sizeInBytes; + throttleTimeInNanos += stats.throttleTimeInNanos; } @@ -67,6 +72,14 @@ public ByteSizeValue getSize() { return size(); } + public TimeValue throttleTime() { + return TimeValue.timeValueNanos(throttleTimeInNanos); + } + + public TimeValue getThrottleTime() { + return throttleTime(); + } + public static StoreStats readStoreStats(StreamInput in) throws IOException { StoreStats store = new StoreStats(); store.readFrom(in); @@ -76,11 +89,13 @@ public static StoreStats readStoreStats(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException { sizeInBytes = in.readVLong(); + throttleTimeInNanos = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(sizeInBytes); + out.writeVLong(throttleTimeInNanos); } @Override @@ -88,6 +103,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(Fields.STORE); builder.field(Fields.SIZE, size().toString()); builder.field(Fields.SIZE_IN_BYTES, sizeInBytes); + builder.field(Fields.THROTTLE_TIME, throttleTime().toString()); + builder.field(Fields.THROTTLE_TIME_IN_MILLIS, throttleTime().millis()); builder.endObject(); return builder; } @@ -96,5 +113,8 @@ static final class Fields { static final XContentBuilderString STORE = new XContentBuilderString("store"); static final XContentBuilderString SIZE = new XContentBuilderString("size"); static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); + + static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time"); + static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis"); } } diff --git a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java index e475ee099dced..a3c78fa131124 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java @@ -21,6 +21,7 @@ import org.apache.lucene.store.*; import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -35,15 +36,27 @@ /** */ -public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService { +public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService, StoreRateLimiting.Listener, StoreRateLimiting.Provider { protected final FsIndexStore indexStore; + private final CounterMetric rateLimitingTimeInNanos = new CounterMetric(); + public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { super(shardId, indexSettings); this.indexStore = (FsIndexStore) indexStore; } + @Override + public long throttleTimeInNanos() { + return rateLimitingTimeInNanos.count(); + } + + @Override + public StoreRateLimiting rateLimiting() { + return indexStore.rateLimiting(); + } + protected LockFactory buildLockFactory() throws IOException { String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native")); LockFactory lockFactory = NoLockFactory.getNoLockFactory(); @@ -98,4 +111,9 @@ public void fullDelete(Directory dir) throws IOException { FileSystemUtils.deleteRecursively(fsDirectory.getDirectory().getParentFile()); } } + + @Override + public void onPause(long nanos) { + rateLimitingTimeInNanos.inc(nanos); + } } diff --git a/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java b/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java index 537147faf0dfe..d5ea9b22447d4 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/fs/FsIndexStore.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.support.AbstractIndexStore; +import org.elasticsearch.indices.store.IndicesStore; import java.io.File; import java.io.IOException; @@ -42,8 +43,8 @@ public abstract class FsIndexStore extends AbstractIndexStore { private final File[] locations; - public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { - super(index, indexSettings, indexService); + public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) { + super(index, indexSettings, indexService, indicesStore); this.nodeEnv = nodeEnv; if (nodeEnv.hasNodeFile()) { this.locations = nodeEnv.indexLocations(index); diff --git a/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java index ad3f258928340..e3ce41bf50b36 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/MmapFsDirectoryService.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.store.fs; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.MMapDirectory; +import org.apache.lucene.store.XMMapFSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; @@ -46,7 +46,7 @@ public Directory[] build() throws IOException { Directory[] dirs = new Directory[locations.length]; for (int i = 0; i < dirs.length; i++) { FileSystemUtils.mkdirs(locations[i]); - dirs[i] = new MMapDirectory(locations[i], buildLockFactory()); + dirs[i] = new XMMapFSDirectory(locations[i], buildLockFactory(), this, this); } return dirs; } diff --git a/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java b/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java index 3942fe7b7c90e..8d0c4d9def395 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/fs/MmapFsIndexStore.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.indices.store.IndicesStore; /** * @@ -33,8 +34,8 @@ public class MmapFsIndexStore extends FsIndexStore { @Inject - public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { - super(index, indexSettings, indexService, nodeEnv); + public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) { + super(index, indexSettings, indexService, indicesStore, nodeEnv); } @Override diff --git a/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java index 5474a75bd1003..022d1d7888423 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/NioFsDirectoryService.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.store.fs; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.NIOFSDirectory; +import org.apache.lucene.store.XNIOFSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; @@ -46,7 +46,7 @@ public Directory[] build() throws IOException { Directory[] dirs = new Directory[locations.length]; for (int i = 0; i < dirs.length; i++) { FileSystemUtils.mkdirs(locations[i]); - dirs[i] = new NIOFSDirectory(locations[i], buildLockFactory()); + dirs[i] = new XNIOFSDirectory(locations[i], buildLockFactory(), this, this); } return dirs; } diff --git a/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java b/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java index bfd12df5fcaf6..d6094969c91e9 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/fs/NioFsIndexStore.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.indices.store.IndicesStore; /** * @@ -33,8 +34,8 @@ public class NioFsIndexStore extends FsIndexStore { @Inject - public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { - super(index, indexSettings, indexService, nodeEnv); + public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) { + super(index, indexSettings, indexService, indicesStore, nodeEnv); } @Override diff --git a/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java index 174d186616281..03abafae7fdc1 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/SimpleFsDirectoryService.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.store.fs; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.SimpleFSDirectory; +import org.apache.lucene.store.XSimpleFSDirectory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; @@ -46,7 +46,7 @@ public Directory[] build() throws IOException { Directory[] dirs = new Directory[locations.length]; for (int i = 0; i < dirs.length; i++) { FileSystemUtils.mkdirs(locations[i]); - dirs[i] = new SimpleFSDirectory(locations[i], buildLockFactory()); + dirs[i] = new XSimpleFSDirectory(locations[i], buildLockFactory(), this, this); } return dirs; } diff --git a/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java b/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java index 8726d52073558..6f5cb3e94ad93 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/fs/SimpleFsIndexStore.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.indices.store.IndicesStore; /** * @@ -33,8 +34,8 @@ public class SimpleFsIndexStore extends FsIndexStore { @Inject - public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { - super(index, indexSettings, indexService, nodeEnv); + public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) { + super(index, indexSettings, indexService, indicesStore, nodeEnv); } @Override diff --git a/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java b/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java index 72a071193b002..fdf41ac05dbc9 100644 --- a/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/memory/ByteBufferDirectoryService.java @@ -47,6 +47,11 @@ public ByteBufferDirectoryService(ShardId shardId, @IndexSettings Settings index this.byteBufferCache = byteBufferCache; } + @Override + public long throttleTimeInNanos() { + return 0; + } + @Override public Directory[] build() { return new Directory[]{new CustomByteBufferDirectory(byteBufferCache)}; diff --git a/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java b/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java index fc78d960b4154..6f166bfe29ec7 100644 --- a/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/memory/ByteBufferIndexStore.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.support.AbstractIndexStore; +import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmStats; @@ -41,8 +42,8 @@ public class ByteBufferIndexStore extends AbstractIndexStore { @Inject public ByteBufferIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, - ByteBufferCache byteBufferCache) { - super(index, indexSettings, indexService); + ByteBufferCache byteBufferCache, IndicesStore indicesStore) { + super(index, indexSettings, indexService, indicesStore); this.direct = byteBufferCache.direct(); } diff --git a/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java b/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java index 54460023377cc..9d94d337873fc 100644 --- a/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/ram/RamDirectoryService.java @@ -41,6 +41,11 @@ public RamDirectoryService(ShardId shardId, @IndexSettings Settings indexSetting super(shardId, indexSettings); } + @Override + public long throttleTimeInNanos() { + return 0; + } + @Override public Directory[] build() { return new Directory[]{new CustomRAMDirectory()}; diff --git a/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java b/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java index 8d096aaf20bf6..486cafc61c2fe 100644 --- a/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/ram/RamIndexStore.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.support.AbstractIndexStore; +import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmStats; @@ -36,8 +37,8 @@ public class RamIndexStore extends AbstractIndexStore { @Inject - public RamIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) { - super(index, indexSettings, indexService); + public RamIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore) { + super(index, indexSettings, indexService, indicesStore); } @Override diff --git a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java index b30b7e26e8f0a..6c98be8ec9357 100644 --- a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java @@ -19,13 +19,19 @@ package org.elasticsearch.index.store.support; +import org.apache.lucene.store.StoreRateLimiting; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; +import org.elasticsearch.indices.store.IndicesStore; import java.io.IOException; @@ -34,11 +40,75 @@ */ public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore { + static { + IndexMetaData.addDynamicSettings( + "index.store.throttle.type", + "index.store.throttle.max_bytes_per_sec" + ); + } + + class ApplySettings implements IndexSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + String rateLimitingType = indexSettings.get("index.store.throttle.type", AbstractIndexStore.this.rateLimitingType); + if (!rateLimitingType.equals(AbstractIndexStore.this.rateLimitingType)) { + logger.info("updating index.store.throttle.type from [{}] to [{}]", AbstractIndexStore.this.rateLimitingType, rateLimitingType); + if (rateLimitingType.equalsIgnoreCase("node")) { + AbstractIndexStore.this.rateLimitingType = rateLimitingType; + AbstractIndexStore.this.nodeRateLimiting = true; + } else { + StoreRateLimiting.Type.fromString(rateLimitingType); + AbstractIndexStore.this.rateLimitingType = rateLimitingType; + AbstractIndexStore.this.nodeRateLimiting = false; + AbstractIndexStore.this.rateLimiting.setType(rateLimitingType); + } + } + + ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize("index.store.throttle.max_bytes_per_sec", AbstractIndexStore.this.rateLimitingThrottle); + if (!rateLimitingThrottle.equals(AbstractIndexStore.this.rateLimitingThrottle)) { + logger.info("updating index.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", AbstractIndexStore.this.rateLimitingThrottle, rateLimitingThrottle, AbstractIndexStore.this.rateLimitingType); + AbstractIndexStore.this.rateLimitingThrottle = rateLimitingThrottle; + AbstractIndexStore.this.rateLimiting.setMaxRate(rateLimitingThrottle); + } + } + } + + protected final IndexService indexService; - protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) { + protected final IndicesStore indicesStore; + + private volatile String rateLimitingType; + private volatile ByteSizeValue rateLimitingThrottle; + private volatile boolean nodeRateLimiting; + + private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); + + private final ApplySettings applySettings = new ApplySettings(); + + protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore) { super(index, indexSettings); this.indexService = indexService; + this.indicesStore = indicesStore; + + this.rateLimitingType = indexSettings.get("index.store.throttle.type", "node"); + if (rateLimitingType.equalsIgnoreCase("node")) { + nodeRateLimiting = true; + } else { + nodeRateLimiting = false; + rateLimiting.setType(rateLimitingType); + } + this.rateLimitingThrottle = indexSettings.getAsBytesSize("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0)); + rateLimiting.setMaxRate(rateLimitingThrottle); + + logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle); + + indexService.settingsService().addListener(applySettings); + } + + @Override + public void close(boolean delete) throws ElasticSearchException { + indexService.settingsService().removeListener(applySettings); } @Override @@ -50,4 +120,14 @@ public boolean canDeleteUnallocated(ShardId shardId) { public void deleteUnallocated(ShardId shardId) throws IOException { // do nothing here... } + + @Override + public IndicesStore indicesStore() { + return indicesStore; + } + + @Override + public StoreRateLimiting rateLimiting() { + return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting; + } } diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 032b8bec395f9..29ed0ba003dfa 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -32,6 +32,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -61,6 +62,7 @@ protected void configure() { bind(RecoveryTarget.class).asEagerSingleton(); bind(RecoverySource.class).asEagerSingleton(); + bind(IndicesStore.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndexingMemoryController.class).asEagerSingleton(); bind(IndicesFilterCache.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 1a370d1fb81c4..f783cb985ff2d 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -62,6 +62,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.similarity.SimilarityModule; +import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.indices.analysis.IndicesAnalysisService; @@ -349,6 +350,8 @@ private void deleteIndex(String index, boolean delete, String reason, @Nullable indexInjector.getInstance(MapperService.class).close(); indexInjector.getInstance(IndexQueryParserService.class).close(); + indexInjector.getInstance(IndexStore.class).close(delete); + Injectors.close(injector); indicesLifecycle.afterIndexClosed(indexService.index(), delete); diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 170d11549b863..acfbe71c0e109 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -19,14 +19,17 @@ package org.elasticsearch.indices.store; +import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.env.NodeEnvironment; @@ -34,6 +37,7 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import java.io.File; @@ -45,8 +49,39 @@ */ public class IndicesStore extends AbstractComponent implements ClusterStateListener { + static { + MetaData.addDynamicSettings( + "indices.store.throttle.type", + "indices.store.throttle.max_bytes_per_sec" + ); + } + + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + String rateLimitingType = settings.get("indices.store.throttle.type", IndicesStore.this.rateLimitingType); + // try and parse the type + StoreRateLimiting.Type.fromString(rateLimitingType); + if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) { + logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType); + IndicesStore.this.rateLimitingType = rateLimitingType; + IndicesStore.this.rateLimiting.setType(rateLimitingType); + } + + ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", IndicesStore.this.rateLimitingThrottle); + if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) { + logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType); + IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle; + IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle); + } + } + } + + private final NodeEnvironment nodeEnv; + private final NodeSettingsService nodeSettingsService; + private final IndicesService indicesService; private final ClusterService clusterService; @@ -59,6 +94,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final Object danglingMutex = new Object(); + private volatile String rateLimitingType; + private volatile ByteSizeValue rateLimitingThrottle; + private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); + + private final ApplySettings applySettings = new ApplySettings(); + static class DanglingIndex { public final String index; public final ScheduledFuture future; @@ -70,19 +111,33 @@ static class DanglingIndex { } @Inject - public IndicesStore(Settings settings, NodeEnvironment nodeEnv, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) { + public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) { super(settings); this.nodeEnv = nodeEnv; + this.nodeSettingsService = nodeSettingsService; this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; + this.rateLimitingType = componentSettings.get("throttle.type", "none"); + rateLimiting.setType(rateLimitingType); + this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(0)); + rateLimiting.setMaxRate(rateLimitingThrottle); + this.danglingTimeout = componentSettings.getAsTime("dangling_timeout", TimeValue.timeValueHours(2)); + logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle); + + nodeSettingsService.addListener(applySettings); clusterService.addLast(this); } + public StoreRateLimiting rateLimiting() { + return this.rateLimiting; + } + public void close() { + nodeSettingsService.removeListener(applySettings); clusterService.remove(this); }