Skip to content

Commit 6615fa9

Browse files
committed
HSEARCH-1725 Improve allocation rate of the new Lucene synchronous batching backend
1 parent a0fe686 commit 6615fa9

File tree

6 files changed

+226
-40
lines changed

6 files changed

+226
-40
lines changed

engine/src/main/java/org/hibernate/search/backend/impl/lucene/BatchSyncProcessor.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111
import org.hibernate.search.util.logging.impl.Log;
1212
import org.hibernate.search.util.logging.impl.LoggerFactory;
1313

14-
import java.util.LinkedList;
1514
import java.util.List;
16-
import java.util.concurrent.BlockingQueue;
17-
import java.util.concurrent.LinkedBlockingDeque;
1815
import java.util.concurrent.locks.LockSupport;
1916

2017
/**
@@ -35,7 +32,8 @@ public class BatchSyncProcessor {
3532

3633
private static final Log log = LoggerFactory.make();
3734

38-
private final BlockingQueue<Changeset> transferQueue = new LinkedBlockingDeque<>();
35+
private final MultiWriteDrainableLinkedList<Changeset> transferQueue = new MultiWriteDrainableLinkedList<>();
36+
3937
private volatile LuceneBackendResources resources;
4038
private final String indexName;
4139
private volatile boolean stop = false;
@@ -72,7 +70,7 @@ public void submit(List<LuceneWork> workList, IndexingMonitor monitor) {
7270
wakeUpConsumer();
7371
boolean interrupted = false;
7472
while ( ! changeset.isProcessed() && ! interrupted ) {
75-
LockSupport.park();
73+
parkCurrentThread();
7674
if ( Thread.interrupted() ) {
7775
interrupted = true;
7876
}
@@ -111,26 +109,36 @@ void updateResources(LuceneBackendResources resources) {
111109
private class Consumer implements Runnable {
112110
@Override
113111
public void run() {
112+
Iterable<Changeset> changesets;
114113
while ( ! stop ) {
115-
while ( transferQueue.isEmpty() && ! stop ) {
114+
changesets = transferQueue.drainToDetachedIterable();
115+
while ( changesets == null && ! stop ) {
116116
// Avoid busy wait
117-
LockSupport.park();
117+
parkCurrentThread();
118+
changesets = transferQueue.drainToDetachedIterable();
118119
}
119-
if ( ! transferQueue.isEmpty() ) {
120-
List<Changeset> changesets = new LinkedList<>();
121-
transferQueue.drainTo( changesets );
122-
ChangesetList changesetList = new ChangesetList( changesets );
123-
try {
124-
LuceneBackendQueueTask luceneBackendQueueTask = new LuceneBackendQueueTask( changesetList.getWork(), resources, null );
125-
luceneBackendQueueTask.run();
126-
}
127-
finally {
128-
changesetList.markProcessed();
129-
}
120+
if ( changesets != null ) {
121+
applyChangesets( changesets );
130122
}
131123
}
132124
log.stoppingSyncConsumerThread( indexName );
133125
}
126+
127+
private void applyChangesets(Iterable<Changeset> changesets) {
128+
ChangesetList changesetList = new ChangesetList( changesets );
129+
try {
130+
LuceneBackendQueueTask luceneBackendQueueTask = new LuceneBackendQueueTask( changesetList, resources, null );
131+
luceneBackendQueueTask.run();
132+
}
133+
finally {
134+
changesetList.markProcessed();
135+
}
136+
}
137+
}
138+
139+
private void parkCurrentThread() {
140+
//Always use some safety margin when parking threads:
141+
LockSupport.parkNanos( 1_000_000_000 );
134142
}
135143

136144
}

engine/src/main/java/org/hibernate/search/backend/impl/lucene/Changeset.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.hibernate.search.backend.IndexingMonitor;
1010
import org.hibernate.search.backend.LuceneWork;
1111

12+
import java.util.Iterator;
1213
import java.util.List;
1314
import java.util.concurrent.locks.LockSupport;
1415

@@ -30,8 +31,8 @@ final class Changeset {
3031
this.monitor = monitor;
3132
}
3233

33-
List<LuceneWork> getWorkList() {
34-
return workList;
34+
Iterator<LuceneWork> getWorkListIterator() {
35+
return workList.iterator();
3536
}
3637

3738
IndexingMonitor getMonitor() {

engine/src/main/java/org/hibernate/search/backend/impl/lucene/ChangesetList.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,72 @@
66
*/
77
package org.hibernate.search.backend.impl.lucene;
88

9-
import org.hibernate.search.backend.LuceneWork;
9+
import java.util.Collections;
10+
import java.util.Iterator;
1011

11-
import java.util.ArrayList;
12-
import java.util.List;
12+
import org.hibernate.search.backend.LuceneWork;
1313

1414
/**
1515
* Aggregator for {@link org.hibernate.search.backend.impl.lucene.Changeset}
1616
*
1717
* @author gustavonalle
1818
*/
19-
final class ChangesetList {
19+
final class ChangesetList implements Iterable<LuceneWork> {
2020

21-
private final List<Changeset> changesets;
21+
private final Iterable<Changeset> changesets;
2222

23-
ChangesetList(List<Changeset> changesets) {
23+
ChangesetList(Iterable<Changeset> changesets) {
2424
this.changesets = changesets;
2525
}
2626

27-
List<LuceneWork> getWork() {
28-
ArrayList<LuceneWork> luceneWorks = new ArrayList<>();
27+
void markProcessed() {
2928
for ( Changeset changeset : changesets ) {
30-
luceneWorks.addAll( changeset.getWorkList() );
29+
changeset.markProcessed();
3130
}
32-
return luceneWorks;
3331
}
3432

35-
void markProcessed() {
36-
for ( Changeset changeset : changesets ) {
37-
changeset.markProcessed();
33+
@Override
34+
public Iterator<LuceneWork> iterator() {
35+
return new WorkIterator( changesets.iterator() );
36+
}
37+
38+
/**
39+
* A shallow iterator on all LuceneWork which avoids collection copies.
40+
* Optimized as this code area is very hot at runtime.
41+
*/
42+
private static class WorkIterator implements Iterator<LuceneWork> {
43+
44+
private Iterator<Changeset> outerIterator;
45+
private Iterator<LuceneWork> current = Collections.<LuceneWork>emptyIterator();
46+
47+
public WorkIterator(Iterator<Changeset> iterator) {
48+
this.outerIterator = iterator;
3849
}
50+
51+
@Override
52+
public boolean hasNext() {
53+
return current.hasNext() || outerIterator.hasNext();
54+
}
55+
56+
@Override
57+
public LuceneWork next() {
58+
if ( current.hasNext() ) {
59+
//advance the inner loop only
60+
return current.next();
61+
}
62+
else {
63+
//advance outer loop first
64+
Changeset next = outerIterator.next();
65+
current = next.getWorkListIterator();
66+
return current.next();
67+
}
68+
}
69+
70+
@Override
71+
public void remove() {
72+
throw new UnsupportedOperationException( "This iterator is unable to remove elements" );
73+
}
74+
3975
}
4076

4177
}

engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueTask.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package org.hibernate.search.backend.impl.lucene;
88

9-
import java.util.List;
109
import java.util.concurrent.ExecutionException;
1110
import java.util.concurrent.locks.Lock;
1211

@@ -31,10 +30,10 @@ final class LuceneBackendQueueTask implements Runnable {
3130

3231
private final Lock modificationLock;
3332
private final LuceneBackendResources resources;
34-
private final List<LuceneWork> workList;
33+
private final Iterable<LuceneWork> workList;
3534
private final IndexingMonitor monitor;
3635

37-
LuceneBackendQueueTask(List<LuceneWork> workList, LuceneBackendResources resources, IndexingMonitor monitor) {
36+
LuceneBackendQueueTask(Iterable<LuceneWork> workList, LuceneBackendResources resources, IndexingMonitor monitor) {
3837
this.workList = workList;
3938
this.resources = resources;
4039
this.monitor = monitor;
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Hibernate Search, full-text search for your domain model
3+
*
4+
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
5+
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
6+
*/
7+
package org.hibernate.search.backend.impl.lucene;
8+
9+
import java.util.Iterator;
10+
import java.util.NoSuchElementException;
11+
import java.util.concurrent.locks.ReentrantLock;
12+
13+
14+
/**
15+
* A custom structure similar to a concurrent linked list.
16+
*
17+
* This could be functionally replaced by a LinkedBlockingDeque, but we only
18+
* need a specific subset of its functionality.
19+
* Specifically, we need to maintain the order of elements being added, but on
20+
* a drain we'll only ever need to iterate the list sequentially, and the
21+
* drain needs to atomically reset the queue.
22+
*
23+
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2014 Red Hat Inc.
24+
* @since 5.0
25+
*/
26+
final class MultiWriteDrainableLinkedList<T> {
27+
28+
private final ReentrantLock lock = new ReentrantLock();
29+
30+
//Guarded by lock
31+
private Node<T> first = null;
32+
33+
//Guarded by lock
34+
private Node<T> last = null;
35+
36+
/**
37+
* Adds a new entry to this list.
38+
*/
39+
void add(T element) {
40+
final Node<T> newnode = new Node<T>( element );
41+
final ReentrantLock lock = this.lock;
42+
lock.lock();
43+
try {
44+
if ( first == null ) {
45+
first = newnode;
46+
last = newnode;
47+
}
48+
else {
49+
last.next = newnode;
50+
last = newnode;
51+
}
52+
}
53+
finally {
54+
lock.unlock();
55+
}
56+
}
57+
58+
/**
59+
* Returns an Iterable over all results added so far, but
60+
* atomically clears the structure as well.
61+
* The returned iterable will be the only entry point to
62+
* read the previously appended data.
63+
* @return an Iterable, or null if there is no data.
64+
*/
65+
Iterable<T> drainToDetachedIterable() {
66+
final Node<T> head;
67+
final ReentrantLock lock = this.lock;
68+
lock.lock();
69+
try {
70+
head = first;
71+
first = null;
72+
last = null;
73+
}
74+
finally {
75+
lock.unlock();
76+
}
77+
if ( head != null ) {
78+
return new DetachedNodeIterable<T>( head );
79+
}
80+
else {
81+
//The choice to return null rather than an empty iterator
82+
//allows the client to not need an isEmpty() method, which would
83+
//need a different level of lock granularity.
84+
return null;
85+
}
86+
}
87+
88+
static final class Node<T> {
89+
T value;
90+
Node<T> next;
91+
Node(T x) {
92+
value = x;
93+
}
94+
}
95+
96+
static final class DetachedNodeIterable<T> implements Iterable<T> {
97+
98+
private Node<T> head;
99+
100+
public DetachedNodeIterable(Node<T> head) {
101+
this.head = head;
102+
}
103+
104+
@Override
105+
public Iterator<T> iterator() {
106+
return new DetachedNodeIterator<T>( head );
107+
}
108+
}
109+
110+
static final class DetachedNodeIterator<T> implements Iterator<T> {
111+
112+
private Node<T> current;
113+
114+
DetachedNodeIterator(Node head) {
115+
this.current = head;
116+
}
117+
118+
@Override
119+
public boolean hasNext() {
120+
return current != null;
121+
}
122+
123+
@Override
124+
public T next() {
125+
if ( current == null ) {
126+
throw new NoSuchElementException();
127+
}
128+
T v = current.value;
129+
current = current.next;
130+
return v;
131+
}
132+
133+
@Override
134+
public void remove() {
135+
throw new UnsupportedOperationException( "This iterator is unable to remove elements" );
136+
}
137+
}
138+
139+
}

engine/src/main/java/org/hibernate/search/exception/impl/ErrorContextBuilder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
public class ErrorContextBuilder {
2121

2222
private Throwable th;
23-
private List<LuceneWork> workToBeDone;
23+
private Iterable<LuceneWork> workToBeDone;
2424
private List<LuceneWork> failingOperations;
2525
private List<LuceneWork> operationsThatWorked;
2626

@@ -45,8 +45,8 @@ public ErrorContextBuilder workCompleted(LuceneWork luceneWork) {
4545

4646
}
4747

48-
public ErrorContextBuilder allWorkToBeDone(List<LuceneWork> workOnWriter) {
49-
this.workToBeDone = new ArrayList<LuceneWork>( workOnWriter );
48+
public ErrorContextBuilder allWorkToBeDone(Iterable<LuceneWork> workOnWriter) {
49+
this.workToBeDone = workOnWriter;
5050
return this;
5151
}
5252

@@ -57,7 +57,10 @@ public ErrorContext createErrorContext() {
5757

5858
// for situation when there is a primary failure
5959
if ( workToBeDone != null ) {
60-
List<LuceneWork> workLeft = new ArrayList<LuceneWork>( workToBeDone );
60+
List<LuceneWork> workLeft = new ArrayList<LuceneWork>();
61+
for ( LuceneWork work : workToBeDone ) {
62+
workLeft.add( work );
63+
}
6164
if ( operationsThatWorked != null ) {
6265
workLeft.removeAll( operationsThatWorked );
6366
}

0 commit comments

Comments
 (0)