Skip to content

Commit 9ad4931

Browse files
author
Gustavo Fernandes
committed
HSEARCH-1699 Batching IndexWriter commits for multiple synchronous worksets
1 parent cea1b57 commit 9ad4931

File tree

5 files changed

+296
-19
lines changed

5 files changed

+296
-19
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* Copyright (c) 2014, Red Hat, Inc. and/or its affiliates or third-party contributors as
5+
* indicated by the @author tags or express copyright attribution
6+
* statements applied by the authors. All third-party contributions are
7+
* distributed under license by Red Hat, Inc.
8+
*
9+
* This copyrighted material is made available to anyone wishing to use, modify,
10+
* copy, or redistribute it subject to the terms and conditions of the GNU
11+
* Lesser General Public License, as published by the Free Software Foundation.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15+
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
16+
* for more details.
17+
*
18+
* You should have received a copy of the GNU Lesser General Public License
19+
* along with this distribution; if not, write to:
20+
* Free Software Foundation, Inc.
21+
* 51 Franklin Street, Fifth Floor
22+
* Boston, MA 02110-1301 USA
23+
*/
24+
package org.hibernate.search.backend.impl.lucene;
25+
26+
import org.hibernate.search.backend.IndexingMonitor;
27+
import org.hibernate.search.backend.LuceneWork;
28+
import org.hibernate.search.util.logging.impl.Log;
29+
import org.hibernate.search.util.logging.impl.LoggerFactory;
30+
31+
import java.util.LinkedList;
32+
import java.util.List;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.LinkedBlockingDeque;
35+
import java.util.concurrent.locks.LockSupport;
36+
37+
/**
38+
* Processes changesets in batches, maintaining sync guarantees.
39+
*<p>
40+
* Multiple threads produce one or more {@link org.hibernate.search.backend.LuceneWork}
41+
* by calling {@link #submit(java.util.List, org.hibernate.search.backend.IndexingMonitor)},
42+
* and get blocked until their changes are applied to the index;</p>
43+
* The {@link org.hibernate.search.backend.impl.lucene.BatchSyncProcessor.Consumer} thread will
44+
* coalesce changes from multiple threads and apply them in the index, releasing the waiting threads
45+
* at the end.
46+
* <p>
47+
* In the absence of work to be applied, the Consumer thread is parked to avoid busy waiting.</p>
48+
*
49+
* @author gustavonalle
50+
*/
51+
public class BatchSyncProcessor {
52+
53+
private static final Log log = LoggerFactory.make();
54+
55+
private final BlockingQueue<Changeset> transferQueue = new LinkedBlockingDeque<Changeset>();
56+
private volatile LuceneBackendResources resources;
57+
private final String indexName;
58+
private volatile boolean stop = false;
59+
final Thread consumerThread;
60+
61+
/**
62+
* Constructor
63+
* @param resources LuceneResources to obtain the workspace
64+
* @param indexName for debugging purposes
65+
*/
66+
public BatchSyncProcessor(LuceneBackendResources resources, String indexName) {
67+
this.resources = resources;
68+
this.indexName = indexName;
69+
consumerThread = new Thread( new Consumer(), "Hibernate Search sync consumer thread for index " + indexName );
70+
consumerThread.setDaemon( true );
71+
}
72+
73+
/**
74+
* Start processing
75+
*/
76+
public void start() {
77+
log.startingSyncConsumerThread( indexName );
78+
consumerThread.start();
79+
}
80+
81+
/**
82+
* Submit work and wait for it to be applied to the index
83+
* @param workList list of work
84+
* @param monitor for statistics collection
85+
*/
86+
public void submit(List<LuceneWork> workList, IndexingMonitor monitor) {
87+
Changeset changeset = new Changeset( workList, Thread.currentThread(), monitor );
88+
transferQueue.add( changeset );
89+
wakeUpConsumer();
90+
boolean interrupted = false;
91+
while ( ! changeset.isProcessed() && ! interrupted ) {
92+
LockSupport.park();
93+
if ( Thread.interrupted() ) {
94+
interrupted = true;
95+
}
96+
}
97+
if ( interrupted ) {
98+
Thread.currentThread().interrupt();
99+
}
100+
}
101+
102+
/**
103+
* Wakes up consumer thread if necessary
104+
*/
105+
private void wakeUpConsumer() {
106+
LockSupport.unpark( consumerThread );
107+
}
108+
109+
/**
110+
* Dispose resources
111+
*/
112+
public void shutdown() {
113+
stop = true;
114+
LockSupport.unpark( consumerThread );
115+
}
116+
117+
/**
118+
* Handle on the fly rebuilds
119+
* @param resources new instance of {@link org.hibernate.search.backend.impl.lucene.LuceneBackendResources}
120+
*/
121+
void updateResources(LuceneBackendResources resources) {
122+
this.resources = resources;
123+
}
124+
125+
/**
126+
* Consumer thread
127+
*/
128+
private class Consumer implements Runnable {
129+
@Override
130+
public void run() {
131+
while ( ! stop ) {
132+
while ( transferQueue.isEmpty() && ! stop ) {
133+
// Avoid busy wait
134+
LockSupport.park();
135+
}
136+
if ( ! transferQueue.isEmpty() ) {
137+
List<Changeset> changesets = new LinkedList<Changeset>();
138+
transferQueue.drainTo( changesets );
139+
ChangesetList changesetList = new ChangesetList( changesets );
140+
try {
141+
LuceneBackendQueueTask luceneBackendQueueTask = new LuceneBackendQueueTask( changesetList.getWork(), resources, null );
142+
luceneBackendQueueTask.run();
143+
}
144+
finally {
145+
changesetList.markProcessed();
146+
}
147+
}
148+
}
149+
log.stoppingSyncConsumerThread( indexName );
150+
}
151+
}
152+
153+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* Copyright (c) 2014, Red Hat, Inc. and/or its affiliates or third-party contributors as
5+
* indicated by the @author tags or express copyright attribution
6+
* statements applied by the authors. All third-party contributions are
7+
* distributed under license by Red Hat, Inc.
8+
*
9+
* This copyrighted material is made available to anyone wishing to use, modify,
10+
* copy, or redistribute it subject to the terms and conditions of the GNU
11+
* Lesser General Public License, as published by the Free Software Foundation.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15+
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
16+
* for more details.
17+
*
18+
* You should have received a copy of the GNU Lesser General Public License
19+
* along with this distribution; if not, write to:
20+
* Free Software Foundation, Inc.
21+
* 51 Franklin Street, Fifth Floor
22+
* Boston, MA 02110-1301 USA
23+
*/
24+
package org.hibernate.search.backend.impl.lucene;
25+
26+
import org.hibernate.search.backend.IndexingMonitor;
27+
import org.hibernate.search.backend.LuceneWork;
28+
29+
import java.util.List;
30+
import java.util.concurrent.locks.LockSupport;
31+
32+
/**
33+
* A Changeset is a work to be applied to the index and its associated producer
34+
*
35+
* @author gustavonalle
36+
*/
37+
final class Changeset {
38+
39+
private final List<LuceneWork> workList;
40+
private final Thread producer;
41+
private final IndexingMonitor monitor;
42+
private volatile boolean processed = false;
43+
44+
Changeset(List<LuceneWork> workList, Thread producer, IndexingMonitor monitor) {
45+
this.workList = workList;
46+
this.producer = producer;
47+
this.monitor = monitor;
48+
}
49+
50+
List<LuceneWork> getWorkList() {
51+
return workList;
52+
}
53+
54+
IndexingMonitor getMonitor() {
55+
return monitor;
56+
}
57+
58+
boolean isProcessed() {
59+
return processed;
60+
}
61+
62+
public void markProcessed() {
63+
processed = true;
64+
LockSupport.unpark( producer );
65+
}
66+
67+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Hibernate, Relational Persistence for Idiomatic Java
3+
*
4+
* Copyright (c) 2014, Red Hat, Inc. and/or its affiliates or third-party contributors as
5+
* indicated by the @author tags or express copyright attribution
6+
* statements applied by the authors. All third-party contributions are
7+
* distributed under license by Red Hat, Inc.
8+
*
9+
* This copyrighted material is made available to anyone wishing to use, modify,
10+
* copy, or redistribute it subject to the terms and conditions of the GNU
11+
* Lesser General Public License, as published by the Free Software Foundation.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
15+
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
16+
* for more details.
17+
*
18+
* You should have received a copy of the GNU Lesser General Public License
19+
* along with this distribution; if not, write to:
20+
* Free Software Foundation, Inc.
21+
* 51 Franklin Street, Fifth Floor
22+
* Boston, MA 02110-1301 USA
23+
*/
24+
package org.hibernate.search.backend.impl.lucene;
25+
26+
import org.hibernate.search.backend.LuceneWork;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/**
32+
* Aggregator for {@link org.hibernate.search.backend.impl.lucene.Changeset}
33+
*
34+
* @author gustavonalle
35+
*/
36+
final class ChangesetList {
37+
38+
private final List<Changeset> changesets;
39+
40+
ChangesetList(List<Changeset> changesets) {
41+
this.changesets = changesets;
42+
}
43+
44+
List<LuceneWork> getWork() {
45+
ArrayList<LuceneWork> luceneWorks = new ArrayList<LuceneWork>();
46+
for ( Changeset changeset : changesets ) {
47+
luceneWorks.addAll( changeset.getWorkList() );
48+
}
49+
return luceneWorks;
50+
}
51+
52+
void markProcessed() {
53+
for ( Changeset changeset : changesets ) {
54+
changeset.markProcessed();
55+
}
56+
}
57+
58+
}

engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,8 @@
2525

2626
import java.util.List;
2727
import java.util.Properties;
28-
import java.util.concurrent.ExecutionException;
29-
import java.util.concurrent.Future;
3028
import java.util.concurrent.locks.Lock;
3129

32-
import org.hibernate.search.SearchException;
3330
import org.hibernate.search.backend.BackendFactory;
3431
import org.hibernate.search.backend.IndexingMonitor;
3532
import org.hibernate.search.backend.LuceneWork;
@@ -57,6 +54,7 @@ public class LuceneBackendQueueProcessor implements BackendQueueProcessor {
5754
private boolean sync;
5855
private AbstractWorkspaceImpl workspaceOverride;
5956
private LuceneBackendTaskStreamer streamWorker;
57+
private BatchSyncProcessor batchSyncProcessor;
6058

6159
@Override
6260
public void initialize(Properties props, WorkerBuildContext context, DirectoryBasedIndexManager indexManager) {
@@ -68,11 +66,14 @@ public void initialize(Properties props, WorkerBuildContext context, DirectoryBa
6866
}
6967
resources = new LuceneBackendResources( context, indexManager, props, workspaceOverride );
7068
streamWorker = new LuceneBackendTaskStreamer( resources );
69+
batchSyncProcessor = new BatchSyncProcessor( resources, indexManager.getIndexName() );
70+
batchSyncProcessor.start();
7171
}
7272

7373
@Override
7474
public void close() {
7575
resources.shutdown();
76+
batchSyncProcessor.shutdown();
7677
}
7778

7879
@Override
@@ -88,25 +89,15 @@ public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
8889
if ( workList == null ) {
8990
throw new IllegalArgumentException( "workList should not be null" );
9091
}
91-
LuceneBackendQueueTask luceneBackendQueueProcessor = new LuceneBackendQueueTask(
92-
workList,
93-
resources,
94-
monitor
95-
);
9692
if ( sync ) {
97-
Future<?> future = resources.getQueueingExecutor().submit( luceneBackendQueueProcessor );
98-
try {
99-
future.get();
100-
}
101-
catch (InterruptedException e) {
102-
log.interruptedWhileWaitingForIndexActivity( e );
103-
Thread.currentThread().interrupt();
104-
}
105-
catch (ExecutionException e) {
106-
throw new SearchException( "Error applying updates to the Lucene index", e.getCause() );
107-
}
93+
batchSyncProcessor.submit( workList, monitor );
10894
}
10995
else {
96+
LuceneBackendQueueTask luceneBackendQueueProcessor = new LuceneBackendQueueTask(
97+
workList,
98+
resources,
99+
monitor
100+
);
110101
resources.getQueueingExecutor().execute( luceneBackendQueueProcessor );
111102
}
112103
}
@@ -133,6 +124,7 @@ public void setCustomWorkspace(AbstractWorkspaceImpl workspace) {
133124
@Override
134125
public void indexMappingChanged() {
135126
resources = resources.onTheFlyRebuild();
127+
batchSyncProcessor.updateResources( resources );
136128
}
137129

138130
}

engine/src/main/java/org/hibernate/search/util/logging/impl/Log.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,4 +774,11 @@ public interface Log extends BasicLogger {
774774
@Message(id = 229, value = "Property " + Environment.INDEX_FLUSH_INTERVAL + "for the index '%s' needs to be positive." )
775775
SearchException flushIntervalNeedsToBePositive(String indexName);
776776

777+
@LogMessage(level = INFO)
778+
@Message(id = 230, value = "Starting sync consumer thread for index '%s'" )
779+
void startingSyncConsumerThread(String indexName);
780+
781+
@LogMessage(level = INFO)
782+
@Message(id = 231, value = "Stopping sync consumer thread for index '%s'" )
783+
void stoppingSyncConsumerThread(String indexName);
777784
}

0 commit comments

Comments
 (0)