/
LuceneIndexIndexer.java
129 lines (112 loc) · 4.87 KB
/
LuceneIndexIndexer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.lucene.work.execution.impl;
import java.util.concurrent.CompletableFuture;
import org.hibernate.search.backend.lucene.document.impl.LuceneIndexEntry;
import org.hibernate.search.backend.lucene.document.impl.LuceneIndexEntryFactory;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneSerialWorkOrchestrator;
import org.hibernate.search.backend.lucene.work.impl.IndexingWork;
import org.hibernate.search.backend.lucene.work.impl.LuceneWorkFactory;
import org.hibernate.search.engine.backend.session.spi.BackendSessionContext;
import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.work.execution.spi.DocumentContributor;
import org.hibernate.search.engine.backend.work.execution.spi.DocumentReferenceProvider;
import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexer;
public class LuceneIndexIndexer implements IndexIndexer {
private final LuceneWorkFactory factory;
private final LuceneIndexEntryFactory indexEntryFactory;
private final WorkExecutionIndexManagerContext indexManagerContext;
private final String tenantId;
public LuceneIndexIndexer(LuceneWorkFactory factory,
LuceneIndexEntryFactory indexEntryFactory,
WorkExecutionIndexManagerContext indexManagerContext,
BackendSessionContext sessionContext) {
this.factory = factory;
this.indexEntryFactory = indexEntryFactory;
this.indexManagerContext = indexManagerContext;
this.tenantId = sessionContext.tenantIdentifier();
}
@Override
public CompletableFuture<?> add(DocumentReferenceProvider referenceProvider,
DocumentContributor documentContributor,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
String id = referenceProvider.identifier();
String routingKey = referenceProvider.routingKey();
LuceneIndexEntry indexEntry = indexEntryFactory.create( tenantId, id, routingKey, documentContributor );
return submit(
id, routingKey,
factory.add(
tenantId, indexManagerContext.mappedTypeName(),
referenceProvider.entityIdentifier(), id,
indexEntry
),
commitStrategy, refreshStrategy
);
}
@Override
public CompletableFuture<?> addOrUpdate(DocumentReferenceProvider referenceProvider,
DocumentContributor documentContributor,
DocumentCommitStrategy commitStrategy,
DocumentRefreshStrategy refreshStrategy) {
String id = referenceProvider.identifier();
String routingKey = referenceProvider.routingKey();
LuceneIndexEntry indexEntry = indexEntryFactory.create( tenantId, id, routingKey, documentContributor );
return submit(
id, routingKey,
factory.update(
tenantId, indexManagerContext.mappedTypeName(),
referenceProvider.entityIdentifier(), id,
indexEntry
),
commitStrategy, refreshStrategy
);
}
@Override
public CompletableFuture<?> delete(DocumentReferenceProvider referenceProvider,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
String id = referenceProvider.identifier();
String routingKey = referenceProvider.routingKey();
return submit(
id, routingKey,
factory.delete(
tenantId, indexManagerContext.mappedTypeName(),
referenceProvider.entityIdentifier(), id
),
commitStrategy, refreshStrategy
);
}
private <T> CompletableFuture<T> submit(String documentId, String routingKey, IndexingWork<T> work,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
// Route the work to the appropriate shard
LuceneSerialWorkOrchestrator orchestrator = indexManagerContext.indexingOrchestrator( documentId, routingKey );
CompletableFuture<T> futureForOrchestrator = new CompletableFuture<>();
CompletableFuture<T> futureForCaller;
boolean needsCommit = DocumentCommitStrategy.FORCE.equals( commitStrategy );
boolean needsRefresh = DocumentRefreshStrategy.FORCE.equals( refreshStrategy );
if ( needsCommit || needsRefresh ) {
// Add the handler to the future *before* submitting the works,
// so as to be sure that the commit/refresh is executed in the background,
// not in the current thread.
// It's important because we don't want to block the current thread.
futureForCaller = futureForOrchestrator.thenApply( result -> {
if ( needsCommit ) {
orchestrator.forceCommitInCurrentThread();
}
if ( needsRefresh ) {
orchestrator.forceRefreshInCurrentThread();
}
return result;
} );
}
else {
futureForCaller = futureForOrchestrator;
}
orchestrator.submit( futureForOrchestrator, work );
return futureForCaller;
}
}