Skip to content

Commit

Permalink
SONAR-6521 Pull ES scroll method to utility class
Browse files Browse the repository at this point in the history
  • Loading branch information
jblievremont committed May 5, 2015
1 parent 861a040 commit 2039593
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 82 deletions.
37 changes: 37 additions & 0 deletions server/sonar-server/src/main/java/org/sonar/server/es/EsUtils.java
Expand Up @@ -21,7 +21,9 @@

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
Expand All @@ -30,14 +32,21 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;

public class EsUtils {

public static final int SCROLL_TIME_IN_MINUTES = 3;

private EsUtils() {
// only static methods
}
Expand Down Expand Up @@ -84,4 +93,32 @@ public static String formatDateTime(@Nullable Date date) {
return null;
}

public static <D extends BaseDoc> Iterator<D> scroll(final EsClient esClient, final String scrollId, final Function<Map<String, Object>, D> docConverter) {
return new Iterator<D>() {
private final Queue<SearchHit> hits = new ArrayDeque<>();

@Override
public boolean hasNext() {
if (hits.isEmpty()) {
SearchScrollRequestBuilder esRequest = esClient.prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMinutes(SCROLL_TIME_IN_MINUTES));
Collections.addAll(hits, esRequest.get().getHits().getHits());
}
return !hits.isEmpty();
}

@Override
public D next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return docConverter.apply(hits.poll().getSource());
}

@Override
public void remove() {
throw new UnsupportedOperationException("Cannot remove item when scrolling");
}
};
}
}
Expand Up @@ -21,16 +21,23 @@

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.*;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.index.query.BoolFilterBuilder;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.OrFilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
Expand All @@ -48,7 +55,12 @@
import org.sonar.api.utils.System2;
import org.sonar.core.component.ComponentDto;
import org.sonar.core.util.NonNullInputFunction;
import org.sonar.server.es.*;
import org.sonar.server.es.BaseIndex;
import org.sonar.server.es.EsClient;
import org.sonar.server.es.EsUtils;
import org.sonar.server.es.SearchOptions;
import org.sonar.server.es.SearchResult;
import org.sonar.server.es.Sorting;
import org.sonar.server.exceptions.NotFoundException;
import org.sonar.server.issue.IssueQuery;
import org.sonar.server.issue.filter.IssueFilterParameters;
Expand All @@ -61,7 +73,15 @@
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;

import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.regex.Pattern;

import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -72,9 +92,6 @@
*/
public class IssueIndex extends BaseIndex {


private static final int SCROLL_TIME_IN_MINUTES = 3;

private static final String SUBSTRING_MATCH_REGEXP = ".*%s.*";

public static final List<String> SUPPORTED_FACETS = ImmutableList.of(
Expand Down Expand Up @@ -714,7 +731,7 @@ public Iterator<IssueDoc> selectIssuesForBatch(ComponentDto component) {
.prepareSearch(IssueIndexDefinition.INDEX)
.setTypes(IssueIndexDefinition.TYPE_ISSUE)
.setSearchType(SearchType.SCAN)
.setScroll(TimeValue.timeValueMinutes(SCROLL_TIME_IN_MINUTES))
.setScroll(TimeValue.timeValueMinutes(EsUtils.SCROLL_TIME_IN_MINUTES))
.setSize(10000)
.setFetchSource(
new String[] {IssueIndexDefinition.FIELD_ISSUE_KEY, IssueIndexDefinition.FIELD_ISSUE_RULE_KEY, IssueIndexDefinition.FIELD_ISSUE_MODULE_UUID,
Expand All @@ -726,36 +743,6 @@ public Iterator<IssueDoc> selectIssuesForBatch(ComponentDto component) {
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filter));
SearchResponse response = requestBuilder.get();

return scroll(response.getScrollId());
}

// Scrolling within the index
private Iterator<IssueDoc> scroll(final String scrollId) {
return new Iterator<IssueDoc>() {
private final Queue<SearchHit> hits = new ArrayDeque<>();

@Override
public boolean hasNext() {
if (hits.isEmpty()) {
SearchScrollRequestBuilder esRequest = getClient().prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMinutes(SCROLL_TIME_IN_MINUTES));
Collections.addAll(hits, esRequest.get().getHits().getHits());
}
return !hits.isEmpty();
}

@Override
public IssueDoc next() {
if(!hasNext()){
throw new NoSuchElementException();
}
return new IssueDoc(hits.poll().getSource());
}

@Override
public void remove() {
throw new UnsupportedOperationException("Cannot remove item when scrolling");
}
};
return EsUtils.scroll(getClient(), response.getScrollId(), DOC_CONVERTER);
}
}
Expand Up @@ -20,12 +20,12 @@

package org.sonar.server.user.index;

import com.google.common.base.Function;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolFilterBuilder;
Expand All @@ -35,29 +35,33 @@
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.sonar.api.ServerComponent;
import org.sonar.core.util.NonNullInputFunction;
import org.sonar.server.es.EsClient;
import org.sonar.server.es.EsUtils;
import org.sonar.server.exceptions.NotFoundException;

import javax.annotation.CheckForNull;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Map;

public class UserIndex implements ServerComponent {

private static final int SCROLL_TIME_IN_MINUTES = 3;

private final EsClient esClient;
private static final Function<Map<String, Object>, UserDoc> DOC_CONVERTER = new NonNullInputFunction<Map<String, Object>, UserDoc>() {
@Override
protected UserDoc doApply(Map<String, Object> input) {
return new UserDoc(input);
}
};

public UserIndex(EsClient esClient) {
this.esClient = esClient;
}

private final EsClient esClient;

@CheckForNull
public UserDoc getNullableByLogin(String login) {
GetRequestBuilder request = esClient.prepareGet(UserIndexDefinition.INDEX, UserIndexDefinition.TYPE_USER, login)
Expand Down Expand Up @@ -123,45 +127,14 @@ public Iterator<UserDoc> selectUsersForBatch(List<String> logins) {
.setTypes(UserIndexDefinition.TYPE_USER)
.setSearchType(SearchType.SCAN)
.addSort(SortBuilders.fieldSort(UserIndexDefinition.FIELD_LOGIN).order(SortOrder.ASC))
.setScroll(TimeValue.timeValueMinutes(SCROLL_TIME_IN_MINUTES))
.setScroll(TimeValue.timeValueMinutes(EsUtils.SCROLL_TIME_IN_MINUTES))
.setSize(10000)
.setFetchSource(
new String[] {UserIndexDefinition.FIELD_LOGIN, UserIndexDefinition.FIELD_NAME},
null)
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filter));
SearchResponse response = requestBuilder.get();

return scroll(response.getScrollId());
}

// Scrolling within the index
private Iterator<UserDoc> scroll(final String scrollId) {
return new Iterator<UserDoc>() {
private final Queue<SearchHit> hits = new ArrayDeque<>();

@Override
public boolean hasNext() {
if (hits.isEmpty()) {
SearchScrollRequestBuilder esRequest = esClient.prepareSearchScroll(scrollId)
.setScroll(TimeValue.timeValueMinutes(SCROLL_TIME_IN_MINUTES));
Collections.addAll(hits, esRequest.get().getHits().getHits());
}
return !hits.isEmpty();
}

@Override
public UserDoc next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return new UserDoc(hits.poll().getSource());
}

@Override
public void remove() {
throw new UnsupportedOperationException("Cannot remove item when scrolling");
}
};
return EsUtils.scroll(esClient, response.getScrollId(), DOC_CONVERTER);
}

}

0 comments on commit 2039593

Please sign in to comment.