Skip to content

Commit

Permalink
HSEARCH-2178 Limiting the number of requests per bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling authored and Sanne committed May 16, 2016
1 parent 50fd086 commit 8af4645
Showing 1 changed file with 50 additions and 14 deletions.
Expand Up @@ -43,6 +43,11 @@ public class BackendRequestProcessor implements Service, Startable, Stoppable {

private static final Log LOG = LoggerFactory.make( Log.class );

/**
* Maximum number of requests sent in a single bulk. Could be made an option if needed.
*/
private static final int MAX_BULK_SIZE = 250;

private final AsyncBackendRequestProcessor asyncProcessor;
private ErrorHandler errorHandler;
private JestClient jestClient;
Expand Down Expand Up @@ -89,7 +94,7 @@ private void doExecute(Iterable<BackendRequest<?>> requests) {
nextBulk = backendRequestGroup;

if ( LOG.isTraceEnabled() ) {
LOG.tracef( "Processing bulk of %s items", nextBulk.getSize() );
LOG.tracef( "Processing bulk of %s items on index(es) %s", nextBulk.getSize(), nextBulk.getTouchedIndexes() );
}

nextBulk.execute();
Expand All @@ -105,29 +110,31 @@ private void doExecute(Iterable<BackendRequest<?>> requests) {
*/
private List<ExecutableRequest> createRequestGroups(Iterable<BackendRequest<?>> requests) {
List<ExecutableRequest> groups = new ArrayList<>();
List<BackendRequest<?>> currentBulk = new ArrayList<>();
Set<String> currentIndexNames = new HashSet<>();
BulkRequestBuilder bulkBuilder = new BulkRequestBuilder();

for ( BackendRequest<?> request : requests ) {
boolean currentRequestBulkable = request.getAction() instanceof BulkableAction;
boolean currentBulkNeedsFinishing = ( !bulkBuilder.canAddMore() || !currentRequestBulkable ) && !bulkBuilder.isEmpty();

// finish up current bulk
if ( currentBulkNeedsFinishing ) {
groups.add( bulkBuilder.build( false ) );
bulkBuilder = new BulkRequestBuilder();
}

// either add to current bulk...
if ( request.getAction() instanceof BulkableAction ) {
currentBulk.add( request );
currentIndexNames.add( request.getIndexName() );
if ( currentRequestBulkable ) {
bulkBuilder.add( request );
}
// ... or finish up current bulk and add single request for non-bulkable request
// ... or add single request for non-bulkable request
else {
if ( !currentBulk.isEmpty() ) {
groups.add( new BulkRequest( jestClient, errorHandler, currentBulk, currentIndexNames, false ) );
currentBulk.clear();
currentIndexNames.clear();
}
groups.add( new SingleRequest( jestClient, errorHandler, request ) );
}
}

// finish up last bulk
if ( !currentBulk.isEmpty() ) {
groups.add( new BulkRequest( jestClient, errorHandler, currentBulk, currentIndexNames, true ) );
if ( !bulkBuilder.isEmpty() ) {
groups.add( bulkBuilder.build( true ) );
}

return groups;
Expand Down Expand Up @@ -237,4 +244,33 @@ public void run() {
}
}
}

private class BulkRequestBuilder {

private final List<BackendRequest<?>> bulk = new ArrayList<>();
private final Set<String> indexNames = new HashSet<>();
private int size = 0;

private void add(BackendRequest<?> request) {
bulk.add( request );
indexNames.add( request.getIndexName() );
size++;
}
private boolean canAddMore() {
return size < MAX_BULK_SIZE;
}

private boolean isEmpty() {
return size == 0;
}

private ExecutableRequest build(boolean refresh) {
if ( size > 1 ) {
return new BulkRequest( jestClient, errorHandler, bulk, indexNames, refresh );
}
else {
return new SingleRequest( jestClient, errorHandler, bulk.iterator().next() );
}
}
}
}

0 comments on commit 8af4645

Please sign in to comment.