Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TransportBulkAction for internal request from IndicesTTLService #5795

Merged
merged 1 commit into from
Apr 15, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@
<exclude>org/elasticsearch/bootstrap/Bootstrap.class</exclude>
<exclude>org/elasticsearch/Version.class</exclude>
<exclude>org/elasticsearch/index/merge/Merges.class</exclude>
<exclude>org/elasticsearch/common/lucene/search/Queries$QueryWrapperFilterFactory.class</exclude>
<exclude>org/elasticsearch/common/lucene/search/Queries$QueryWrapperFilterFactory.class</exclude>
<!-- end excludes for valid system-out -->
<!-- start excludes for Unsafe -->
<exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -178,7 +180,18 @@ private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> r
return false;
}

private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses) {
/**
* This method executes the {@link BulkRequest} and calls the given listener once the request returns.
* This method will not create any indices even if auto-create indices is enabled.
*
* @see #doExecute(BulkRequest, org.elasticsearch.action.ActionListener)
*/
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTime = System.currentTimeMillis();
executeBulk(bulkRequest, startTime, listener, new AtomicArray<BulkItemResponse>(bulkRequest.requests.size()));
}

private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
Expand Down
149 changes: 108 additions & 41 deletions src/main/java/org/elasticsearch/indices/ttl/IndicesTTLService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand All @@ -44,7 +42,6 @@
import org.elasticsearch.index.fieldvisitor.UidAndRoutingFieldsVisitor;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.FieldMappers;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
Expand All @@ -57,6 +54,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


/**
Expand All @@ -69,68 +71,83 @@ public class IndicesTTLService extends AbstractLifecycleComponent<IndicesTTLServ

private final ClusterService clusterService;
private final IndicesService indicesService;
private final Client client;
private final TransportBulkAction bulkAction;

private volatile TimeValue interval;
private final int bulkSize;
private PurgerThread purgerThread;

@Inject
public IndicesTTLService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeSettingsService nodeSettingsService, Client client) {
public IndicesTTLService(Settings settings, ClusterService clusterService, IndicesService indicesService, NodeSettingsService nodeSettingsService, TransportBulkAction bulkAction) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.client = client;
this.interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(60));
TimeValue interval = componentSettings.getAsTime("interval", TimeValue.timeValueSeconds(60));
this.bulkAction = bulkAction;
this.bulkSize = componentSettings.getAsInt("bulk_size", 10000);
this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[ttl_expire]"), interval);

nodeSettingsService.addListener(new ApplySettings());
}

@Override
protected void doStart() throws ElasticsearchException {
this.purgerThread = new PurgerThread(EsExecutors.threadName(settings, "[ttl_expire]"));
this.purgerThread.start();
}

@Override
protected void doStop() throws ElasticsearchException {
this.purgerThread.doStop();
this.purgerThread.interrupt();
try {
this.purgerThread.shutdown();
} catch (InterruptedException e) {
Thread.interrupted();
}
}

@Override
protected void doClose() throws ElasticsearchException {
}

private class PurgerThread extends Thread {
volatile boolean running = true;
private final AtomicBoolean running = new AtomicBoolean(true);
private final Notifier notifier;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);


public PurgerThread(String name) {
public PurgerThread(String name, TimeValue interval) {
super(name);
setDaemon(true);
this.notifier = new Notifier(interval);
}

public void shutdown() throws InterruptedException {
if (running.compareAndSet(true, false)) {
notifier.doNotify();
shutdownLatch.await();
}

}

public void doStop() {
running = false;
public void resetInterval(TimeValue interval) {
notifier.setTimeout(interval);
}

public void run() {
while (running) {
try {
List<IndexShard> shardsToPurge = getShardsToPurge();
purgeShards(shardsToPurge);
} catch (Throwable e) {
if (running) {
logger.warn("failed to execute ttl purge", e);
try {
while (running.get()) {
try {
List<IndexShard> shardsToPurge = getShardsToPurge();
purgeShards(shardsToPurge);
} catch (Throwable e) {
if (running.get()) {
logger.warn("failed to execute ttl purge", e);
}
}
if (running.get()) {
notifier.await();
}
}
try {
Thread.sleep(interval.millis());
} catch (InterruptedException e) {
// ignore, if we are interrupted because we are shutting down, running will be false
}

} finally {
shutdownLatch.countDown();
}
}

Expand Down Expand Up @@ -174,6 +191,10 @@ private List<IndexShard> getShardsToPurge() {
}
return shardsToPurge;
}

public TimeValue getInterval() {
return notifier.getTimeout();
}
}

private void purgeShards(List<IndexShard> shardsToPurge) {
Expand All @@ -182,11 +203,13 @@ private void purgeShards(List<IndexShard> shardsToPurge) {
Engine.Searcher searcher = shardToPurge.acquireSearcher("indices_ttl");
try {
logger.debug("[{}][{}] purging shard", shardToPurge.routingEntry().index(), shardToPurge.routingEntry().id());
ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector(shardToPurge.routingEntry().index());
ExpiredDocsCollector expiredDocsCollector = new ExpiredDocsCollector();
searcher.searcher().search(query, expiredDocsCollector);
List<DocToPurge> docsToPurge = expiredDocsCollector.getDocsToPurge();
BulkRequestBuilder bulkRequest = client.prepareBulk();

BulkRequest bulkRequest = new BulkRequest();
for (DocToPurge docToPurge : docsToPurge) {

bulkRequest.add(new DeleteRequest().index(shardToPurge.routingEntry().index()).type(docToPurge.type).id(docToPurge.id).version(docToPurge.version).routing(docToPurge.routing));
bulkRequest = processBulkIfNeeded(bulkRequest, false);
}
Expand Down Expand Up @@ -214,12 +237,10 @@ public DocToPurge(String type, String id, long version, String routing) {
}

private class ExpiredDocsCollector extends Collector {
private final MapperService mapperService;
private AtomicReaderContext context;
private List<DocToPurge> docsToPurge = new ArrayList<>();

public ExpiredDocsCollector(String index) {
mapperService = indicesService.indexService(index).mapperService();
public ExpiredDocsCollector() {
}

public void setScorer(Scorer scorer) {
Expand Down Expand Up @@ -250,10 +271,10 @@ public List<DocToPurge> getDocsToPurge() {
}
}

private BulkRequestBuilder processBulkIfNeeded(BulkRequestBuilder bulkRequest, boolean force) {
private BulkRequest processBulkIfNeeded(BulkRequest bulkRequest, boolean force) {
if ((force && bulkRequest.numberOfActions() > 0) || bulkRequest.numberOfActions() >= bulkSize) {
try {
bulkRequest.execute(new ActionListener<BulkResponse>() {
bulkAction.executeBulk(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
logger.trace("bulk took " + bulkResponse.getTookInMillis() + "ms");
Expand All @@ -267,18 +288,64 @@ public void onFailure(Throwable e) {
} catch (Exception e) {
logger.warn("failed to process bulk", e);
}
bulkRequest = client.prepareBulk();
bulkRequest = new BulkRequest();
}
return bulkRequest;
}

class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
TimeValue interval = settings.getAsTime(INDICES_TTL_INTERVAL, IndicesTTLService.this.interval);
if (!interval.equals(IndicesTTLService.this.interval)) {
logger.info("updating indices.ttl.interval from [{}] to [{}]", IndicesTTLService.this.interval, interval);
IndicesTTLService.this.interval = interval;
final TimeValue currentInterval = IndicesTTLService.this.purgerThread.getInterval();
final TimeValue interval = settings.getAsTime(INDICES_TTL_INTERVAL, currentInterval);
if (!interval.equals(currentInterval)) {
logger.info("updating indices.ttl.interval from [{}] to [{}]",currentInterval, interval);
IndicesTTLService.this.purgerThread.resetInterval(interval);

}
}
}


private static final class Notifier {

private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile TimeValue timeout;

public Notifier(TimeValue timeout) {
assert timeout != null;
this.timeout = timeout;
}

public void await() {
lock.lock();
try {
condition.await(timeout.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.interrupted();
} finally {
lock.unlock();
}

}

public void setTimeout(TimeValue timeout) {
assert timeout != null;
this.timeout = timeout;
doNotify();
}

public TimeValue getTimeout() {
return timeout;
}

public void doNotify() {
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
}
Expand Down
55 changes: 53 additions & 2 deletions src/test/java/org/elasticsearch/percolator/TTLPercolatorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
Expand All @@ -51,14 +52,12 @@ protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder()
.put(super.nodeSettings(nodeOrdinal))
.put("indices.ttl.interval", PURGE_INTERVAL)
.put("action.auto_create_index", false) // see #5766
.build();
}

@Test
public void testPercolatingWithTimeToLive() throws Exception {
final Client client = client();
client.admin().indices().prepareDelete("_all").execute().actionGet();
ensureGreen();

String percolatorMapping = XContentFactory.jsonBuilder().startObject().startObject(PercolatorService.TYPE_NAME)
Expand Down Expand Up @@ -150,4 +149,56 @@ public boolean apply(Object input) {
assertThat(percolateResponse.getMatches(), emptyArray());
}


@Test
public void testEnsureTTLDoesNotCreateIndex() throws IOException, InterruptedException {
final Client client = client();
ensureGreen();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put("indices.ttl.interval", 60) // 60 sec
.build()).get();

String typeMapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("_ttl").field("enabled", true).endObject()
.endObject().endObject().string();

client.admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1))
.addMapping("type1", typeMapping)
.execute().actionGet();
ensureGreen();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder()
.put("indices.ttl.interval", 1) // 60 sec
.build()).get();

for (int i = 0; i < 100; i++) {
logger.info("index: " + i);
client.prepareIndex("test", "type1", "" + i).setSource(jsonBuilder()
.startObject()
.startObject("query")
.startObject("term")
.field("field1", "value1")
.endObject()
.endObject()
.endObject()
).setTTL(randomIntBetween(10, 500)).execute().actionGet();
}
refresh();
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
IndicesStatsResponse indicesStatsResponse = client.admin().indices().prepareStats("test").clear().setIndexing(true).get();
logger.info("delete count [{}]", indicesStatsResponse.getIndices().get("test").getTotal().getIndexing().getTotal().getDeleteCount());
// TTL deletes one doc, but it is indexed in the primary shard and replica shards
return indicesStatsResponse.getIndices().get("test").getTotal().getIndexing().getTotal().getDeleteCount() != 0;
}
}, 5, TimeUnit.SECONDS), equalTo(true));
cluster().wipeIndices("test");
client.admin().indices().prepareCreate("test")
.addMapping("type1", typeMapping)
.execute().actionGet();


}

}