/
JGroupsMasterMessageListener.java
125 lines (108 loc) · 3.86 KB
/
JGroupsMasterMessageListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.jgroups.impl;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.jgroups.logging.impl.Log;
import org.hibernate.search.backend.spi.OperationDispatcher;
import org.hibernate.search.exception.SearchException;
import org.hibernate.search.indexes.serialization.spi.LuceneWorkSerializer;
import org.hibernate.search.spi.BuildContext;
import org.hibernate.search.spi.SearchIntegrator;
import org.hibernate.search.util.logging.impl.LoggerFactory;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.Receiver;
import org.jgroups.View;
/**
* A {@link Receiver} that listens for messages from slave nodes and apply them.
*
* @author Lukasz Moren
* @author Sanne Grinovero (C) 2011 Red Hat Inc.
* @author Ales Justin
*
*/
public class JGroupsMasterMessageListener implements Receiver {
private static final Log log = LoggerFactory.make( Log.class );
private final SearchIntegrator integrator;
private final NodeSelectorService selector;
private final LuceneWorkSerializer luceneWorkSerializer;
private volatile OperationDispatcher operationDispatcher;
public JGroupsMasterMessageListener(BuildContext context, NodeSelectorService masterNodeSelector, LuceneWorkSerializer luceneWorkSerializer) {
this.integrator = context.getUninitializedSearchIntegrator();
this.selector = masterNodeSelector;
this.luceneWorkSerializer = luceneWorkSerializer;
}
@Override
public void receive(Message message) {
final int offset = message.getOffset();
final int bufferLength = message.getLength();
final byte[] rawBuffer = message.getRawBuffer();
try {
byte[] serializedQueue = MessageSerializationHelper.extractSerializedQueue( offset, bufferLength, rawBuffer );
List<LuceneWork> queue = luceneWorkSerializer.toLuceneWorks( serializedQueue );
applyLuceneWorkLocally( queue, message );
}
catch (ClassCastException e) {
log.illegalObjectRetrievedFromMessage( e );
}
catch (SearchException e) {
log.illegalObjectRetrievedFromMessage( e );
}
}
private void applyLuceneWorkLocally(List<LuceneWork> queue, Message message) {
if ( queue != null && !queue.isEmpty() ) {
if ( log.isDebugEnabled() ) {
log.debugf(
"There are %d Lucene docs received from slave node %s to be processed if this node is the master",
(Integer) queue.size(),
message.getSrc()
);
}
OperationDispatcher operationDispatcher = getOperationDispatcher();
operationDispatcher.dispatch( queue, null );
}
else {
log.receivedEmptyLuceneWorksInMessage();
}
}
private OperationDispatcher getOperationDispatcher() {
if ( operationDispatcher == null ) {
operationDispatcher = integrator.createRemoteOperationDispatcher(
indexManager -> selector.getMasterNodeSelector( indexManager.getIndexName() ).isIndexOwnerLocal()
);
}
return operationDispatcher;
}
// ------------------------------------------------------------------------------------------------------------------
// Implementations of JGroups interfaces
// ------------------------------------------------------------------------------------------------------------------
@Override
public void viewAccepted(View view) {
log.jGroupsReceivedNewClusterView( view );
selector.viewAccepted( view );
}
@Override
public void suspect(Address suspected_mbr) {
//no-op
}
@Override
public void block() {
//no-op
}
@Override
public void getState(OutputStream arg0) throws Exception {
}
@Override
public void setState(InputStream arg0) throws Exception {
}
@Override
public void unblock() {
}
}