Skip to content

Commit

Permalink
JAMES-2719 Refactor delete by query on ES backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Arsnael authored and chibenwa committed May 16, 2019
1 parent 00026aa commit 45e3224
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 95 deletions.
@@ -0,0 +1,39 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.backends.es.v6;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeleteByQueryActionListener implements ActionListener<BulkByScrollResponse> {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteByQueryActionListener.class);

@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {

}

@Override
public void onFailure(Exception e) {
LOGGER.warn("Error during the ES delete by query operation: ", e);
}
}

This file was deleted.

Expand Up @@ -21,8 +21,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -34,8 +32,10 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,29 +45,30 @@
public class ElasticSearchIndexer {
private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
private static final int DEFAULT_BATCH_SIZE = 100;
private static final TimeValue TIMEOUT = new TimeValue(60000);

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);

private final RestHighLevelClient client;
private final DeleteByQueryPerformer deleteByQueryPerformer;
private final AliasName aliasName;
private final TypeName typeName;
private final int batchSize;

public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
public ElasticSearchIndexer(RestHighLevelClient client,
WriteAliasName aliasName,
TypeName typeName) {
this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
this(client, aliasName, typeName, DEFAULT_BATCH_SIZE);
}

@VisibleForTesting
public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
public ElasticSearchIndexer(RestHighLevelClient client,
WriteAliasName aliasName,
TypeName typeName,
int batchSize) {
this.client = client;
this.deleteByQueryPerformer = new DeleteByQueryPerformer(client, executor, batchSize, aliasName, typeName);
this.aliasName = aliasName;
this.typeName = typeName;
this.batchSize = batchSize;
}

public IndexResponse index(String id, String content) throws IOException {
Expand Down Expand Up @@ -112,8 +113,14 @@ public Optional<BulkResponse> delete(List<String> ids) throws IOException {
}
}

public Future<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder) {
return deleteByQueryPerformer.perform(queryBuilder);
public void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
.setDocTypes(typeName.getValue())
.setScroll(TIMEOUT)
.setQuery(queryBuilder)
.setBatchSize(batchSize);

client.deleteByQueryAsync(request, RequestOptions.DEFAULT, new DeleteByQueryActionListener());
}

private void checkArgument(String content) {
Expand Down
Expand Up @@ -21,11 +21,10 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;

import java.util.concurrent.Executors;

import org.apache.james.util.concurrent.NamedThreadFactory;
import org.awaitility.Duration;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -55,9 +54,7 @@ public void setup() {
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
.createIndexAndAliases(getESClient());
testee = new ElasticSearchIndexer(getESClient(),
Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
testee = new ElasticSearchIndexer(getESClient(), ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
}

private RestHighLevelClient getESClient() {
Expand Down Expand Up @@ -150,16 +147,17 @@ public void deleteByQueryShouldWorkOnSingleMessage() throws Exception {
testee.index(messageId, content);
elasticSearch.awaitForElasticSearch();

testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
testee.deleteAllMatchingQuery(termQuery("property", "1"));
elasticSearch.awaitForElasticSearch();

try (RestHighLevelClient client = getESClient()) {
SearchResponse searchResponse = client.search(
new SearchRequest(INDEX_NAME.getValue())
.types(TYPE_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
await().atMost(Duration.TEN_SECONDS)
.until(() -> client.search(
new SearchRequest(INDEX_NAME.getValue())
.types(TYPE_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT)
.getHits().getTotalHits() == 0);
}
}

Expand All @@ -181,16 +179,17 @@ public void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception {
testee.index(messageId3, content3);
elasticSearch.awaitForElasticSearch();

testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
testee.deleteAllMatchingQuery(termQuery("property", "1"));
elasticSearch.awaitForElasticSearch();

try (RestHighLevelClient client = getESClient()) {
SearchResponse searchResponse = client.search(
new SearchRequest(INDEX_NAME.getValue())
.types(TYPE_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
await().atMost(Duration.TEN_SECONDS)
.until(() -> client.search(
new SearchRequest(INDEX_NAME.getValue())
.types(TYPE_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT)
.getHits().getTotalHits() == 1);
}
}

Expand Down

0 comments on commit 45e3224

Please sign in to comment.