Skip to content

Commit

Permalink
Store Throttling (node level and/or index level) with options on merg…
Browse files Browse the repository at this point in the history
…e or all, closes elastic#2041.

Allow to configure store throttling (only applied on file system based storage), which allows to control the maximum bytes per sec written to the file system. It can be configured to only apply while merging, or on all output operations. The setting can eb set on the node level (in which case the throttling is done across all shards allocated on the node), or index level, in which case it only applied to that index.

The node level settings are indices.store.throttle.type to set the type, with values of none, merge and all (defaults to none). And, also, indices.store.throttle.max_bytes_per_sec (defaults to 0), which can be set to something like 1mb.

The index level settings is index.store.throttle.type for the type, with values of node, none, merge, and all. Defaults to node which will use the "shared" throttling on the node level. And, index.store.throttle.max_bytes_per_sec (defaults to 0).
  • Loading branch information
kimchy committed Jun 21, 2012
1 parent 202387f commit dc6637f
Show file tree
Hide file tree
Showing 29 changed files with 561 additions and 29 deletions.
19 changes: 19 additions & 0 deletions src/main/java/org/apache/lucene/index/TrackingMergeScheduler.java
@@ -1,3 +1,22 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.lucene.index;

import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/org/apache/lucene/store/StoreRateLimiting.java
@@ -0,0 +1,76 @@
package org.apache.lucene.store;

import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.RateLimiter;
import org.elasticsearch.common.unit.ByteSizeValue;

/**
*/
public class StoreRateLimiting {

public static interface Provider {

StoreRateLimiting rateLimiting();
}

public interface Listener {

void onPause(long nanos);
}

public static enum Type {
NONE,
MERGE,
ALL;

public static Type fromString(String type) throws ElasticSearchIllegalArgumentException {
if ("none".equalsIgnoreCase(type)) {
return NONE;
} else if ("merge".equalsIgnoreCase(type)) {
return MERGE;
} else if ("all".equalsIgnoreCase(type)) {
return ALL;
}
throw new ElasticSearchIllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none]");
}
}

private final RateLimiter rateLimiter = new RateLimiter(0);
private volatile RateLimiter actualRateLimiter;

private volatile Type type;

public StoreRateLimiting() {

}

@Nullable
public RateLimiter getRateLimiter() {
return actualRateLimiter;
}

public void setMaxRate(ByteSizeValue rate) {
if (rate.bytes() <= 0) {
actualRateLimiter = null;
} else if (actualRateLimiter == null) {
actualRateLimiter = rateLimiter;
actualRateLimiter.setMaxRate(rate.mbFrac());
} else {
assert rateLimiter == actualRateLimiter;
rateLimiter.setMaxRate(rate.mbFrac());
}
}

public Type getType() {
return type;
}

public void setType(Type type) {
this.type = type;
}

public void setType(String type) throws ElasticSearchIllegalArgumentException {
this.type = Type.fromString(type);
}
}
26 changes: 26 additions & 0 deletions src/main/java/org/apache/lucene/store/XFSIndexOutput.java
@@ -0,0 +1,26 @@
package org.apache.lucene.store;

import org.elasticsearch.common.RateLimiter;

import java.io.IOException;

/**
*/
class XFSIndexOutput extends FSDirectory.FSIndexOutput {

private final RateLimiter rateLimiter;

private final StoreRateLimiting.Listener rateListener;

XFSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter, StoreRateLimiting.Listener rateListener) throws IOException {
super(parent, name);
this.rateLimiter = rateLimiter;
this.rateListener = rateListener;
}

@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
rateListener.onPause(rateLimiter.pause(size));
super.flushBuffer(b, offset, size);
}
}
64 changes: 64 additions & 0 deletions src/main/java/org/apache/lucene/store/XMMapFSDirectory.java
@@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.lucene.store;

import org.apache.lucene.index.TrackingMergeScheduler;
import org.elasticsearch.common.RateLimiter;

import java.io.File;
import java.io.IOException;

/**
*/
public class XMMapFSDirectory extends NIOFSDirectory {

private final StoreRateLimiting.Provider rateLimitingProvider;

private final StoreRateLimiting.Listener rateListener;

public XMMapFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException {
super(path, lockFactory);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}

@Override
public IndexOutput createOutput(String name) throws IOException {
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return super.createOutput(name);
}
if (TrackingMergeScheduler.getCurrentMerge() != null) {
// we are mering, and type is either MERGE or ALL, rate limit...
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
if (type == StoreRateLimiting.Type.ALL) {
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
// we shouldn't really get here...
return super.createOutput(name);
}
}
64 changes: 64 additions & 0 deletions src/main/java/org/apache/lucene/store/XNIOFSDirectory.java
@@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.lucene.store;

import org.apache.lucene.index.TrackingMergeScheduler;
import org.elasticsearch.common.RateLimiter;

import java.io.File;
import java.io.IOException;

/**
*/
public class XNIOFSDirectory extends NIOFSDirectory {

private final StoreRateLimiting.Provider rateLimitingProvider;

private final StoreRateLimiting.Listener rateListener;

public XNIOFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException {
super(path, lockFactory);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}

@Override
public IndexOutput createOutput(String name) throws IOException {
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return super.createOutput(name);
}
if (TrackingMergeScheduler.getCurrentMerge() != null) {
// we are mering, and type is either MERGE or ALL, rate limit...
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
if (type == StoreRateLimiting.Type.ALL) {
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
// we shouldn't really get here...
return super.createOutput(name);
}
}
64 changes: 64 additions & 0 deletions src/main/java/org/apache/lucene/store/XSimpleFSDirectory.java
@@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.lucene.store;

import org.apache.lucene.index.TrackingMergeScheduler;
import org.elasticsearch.common.RateLimiter;

import java.io.File;
import java.io.IOException;

/**
*/
public class XSimpleFSDirectory extends SimpleFSDirectory {

private final StoreRateLimiting.Provider rateLimitingProvider;

private final StoreRateLimiting.Listener rateListener;

public XSimpleFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException {
super(path, lockFactory);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}

@Override
public IndexOutput createOutput(String name) throws IOException {
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return super.createOutput(name);
}
if (TrackingMergeScheduler.getCurrentMerge() != null) {
// we are mering, and type is either MERGE or ALL, rate limit...
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
if (type == StoreRateLimiting.Type.ALL) {
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
// we shouldn't really get here...
return super.createOutput(name);
}
}
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/common/RateLimiter.java
Expand Up @@ -53,7 +53,7 @@ public void setMaxRate(double mbPerSec) {
* might exceed the target). It's best to call this
* with a biggish count, not one byte at a time.
*/
public void pause(long bytes) {
public long pause(long bytes) {

// TODO: this is purely instantenous rate; maybe we
// should also offer decayed recent history one?
Expand All @@ -65,11 +65,13 @@ public void pause(long bytes) {

// While loop because Thread.sleep doesn't alway sleep
// enough:
long totalPauseTime = 0;
while (true) {
final long pauseNS = targetNS - curNS;
if (pauseNS > 0) {
try {
Thread.sleep((int) (pauseNS / 1000000), (int) (pauseNS % 1000000));
totalPauseTime += pauseNS;
} catch (InterruptedException ie) {
throw new ElasticSearchInterruptedException("interrupted while rate limiting", ie);
}
Expand All @@ -78,5 +80,6 @@ public void pause(long bytes) {
}
break;
}
return totalPauseTime;
}
}
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore;
Expand All @@ -47,6 +48,8 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {

IndexCache cache();

IndexSettingsService settingsService();

PercolatorService percolateService();

AnalysisService analysisService();
Expand Down

0 comments on commit dc6637f

Please sign in to comment.