/
JGroupsChannelProvider.java
188 lines (165 loc) · 6.46 KB
/
JGroupsChannelProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* Copyright (c) 2012, Red Hat, Inc. and/or its affiliates or third-party contributors as
* indicated by the @author tags or express copyright attribution
* statements applied by the authors. All third-party contributions are
* distributed under license by Red Hat, Inc.
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.hibernate.search.backend.impl.jgroups;
import java.net.URL;
import java.util.Properties;
import org.hibernate.search.spi.BuildContext;
import org.hibernate.search.spi.ServiceProvider;
import org.hibernate.search.util.configuration.impl.ConfigurationParseHelper;
import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.MessageListener;
import org.jgroups.UpHandler;
import org.jgroups.blocks.MessageDispatcher;
import org.jgroups.blocks.mux.MuxMessageDispatcher;
import org.jgroups.blocks.mux.Muxer;
/**
* Service to initialize a JGroups Channel. This needs to be centralized to
* allow sharing of channels across different index managers.
*
* @author Lukasz Moren
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2012 Red Hat Inc.
* @author Ales Justin
*/
public class JGroupsChannelProvider implements ServiceProvider<MessageSender> {
private static final Log log = LoggerFactory.make();
public static final String JGROUPS_PREFIX = "hibernate.search.services.jgroups.";
public static final String CONFIGURATION_FILE = JGROUPS_PREFIX + "configurationFile";
public static final String CLUSTER_NAME = JGROUPS_PREFIX + "clusterName";
public static final String CHANNEL_INJECT = JGROUPS_PREFIX + "providedChannel";
public static final String CLASSLOADER = JGROUPS_PREFIX + "classloader";
public static final String MUX_ID = JGROUPS_PREFIX + "mux_id";
private static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "flush-udp.xml";
private static final String DEFAULT_CLUSTER_NAME = "Hibernate Search Cluster";
private Channel channel;
private MessageSender sender;
private BuildContext context;
@Override
public void start(Properties props, BuildContext context) {
this.context = context;
log.jGroupsStartingChannel();
boolean channelIsManaged = buildChannel( props );
String clusterName = props.getProperty( JGroupsChannelProvider.CLUSTER_NAME, DEFAULT_CLUSTER_NAME );
NodeSelectorStrategyHolder masterNodeSelector = context.requestService( MasterSelectorServiceProvider.class );
JGroupsMasterMessageListener listener = new JGroupsMasterMessageListener( context, masterNodeSelector );
UpHandler handler = channel.getUpHandler();
if ( handler instanceof Muxer ) {
Short muxId = (Short) props.get( MUX_ID );
if ( muxId == null ) {
throw log.missingJGroupsMuxId();
}
@SuppressWarnings("unchecked")
Muxer<UpHandler> muxer = (Muxer<UpHandler>) handler;
if ( muxer.get( muxId ) != null ) {
throw log.jGroupsMuxIdAlreadyTaken( muxId );
}
ClassLoader cl = (ClassLoader) props.get( CLASSLOADER );
MessageListener wrapper = ( cl != null ) ? new ClassloaderMessageListener( listener, cl ) : listener;
MessageListenerToRequestHandlerAdapter adapter = new MessageListenerToRequestHandlerAdapter( wrapper );
MessageDispatcher dispatcher = new MuxMessageDispatcher( muxId, channel, wrapper, listener, adapter );
sender = new DispatcherMessageSender( dispatcher );
}
else {
// TODO -- perhaps port previous multi-handling?
channel.setReceiver( listener );
sender = new ChannelMessageSender( channel, channelIsManaged, clusterName);
}
sender.start();
masterNodeSelector.setLocalAddress( channel.getAddress() );
log.jGroupsConnectedToCluster( clusterName, channel.getAddress() );
if ( !channel.flushSupported() ) {
log.jGroupsFlushNotPresentInStack();
}
}
@Override
public MessageSender getService() {
return sender;
}
@Override
public void stop() {
context.releaseService( MasterSelectorServiceProvider.class );
context = null;
try {
channel = null;
if ( sender != null ) {
sender.stop();
sender = null;
}
}
catch ( Exception toLog ) {
log.jGroupsClosingChannelError( toLog );
}
}
/**
* Reads configuration and builds channel with its base.
* In order of preference - we first look for an external JGroups file, then a set of XML properties, and
* finally the legacy JGroups String properties.
*
* @param props configuration file
* @return true if channel is managed, false otherwise
*/
private boolean buildChannel(Properties props) {
boolean channelIsManaged = true;
if ( props != null ) {
if ( props.containsKey( JGroupsChannelProvider.CHANNEL_INJECT ) ) {
Object channelObject = props.get( JGroupsChannelProvider.CHANNEL_INJECT );
try {
channel = (org.jgroups.JChannel) channelObject;
channelIsManaged = false;
}
catch ( ClassCastException e ) {
throw log.jGroupsChannelInjectionError( e, channelObject.getClass() );
}
}
if ( props.containsKey( JGroupsChannelProvider.CONFIGURATION_FILE ) ) {
String cfg = props.getProperty( JGroupsChannelProvider.CONFIGURATION_FILE );
try {
channel = new JChannel( ConfigurationParseHelper.locateConfig( cfg ) );
}
catch ( Exception e ) {
throw log.jGroupsChannelCreationUsingFileError( cfg, e );
}
}
}
if ( channel == null ) {
log.jGroupsConfigurationNotFoundInProperties( props );
try {
URL fileUrl = ConfigurationParseHelper.locateConfig( JGroupsChannelProvider.DEFAULT_JGROUPS_CONFIGURATION_FILE );
if ( fileUrl != null ) {
channel = new JChannel( fileUrl );
}
else {
log.jGroupsDefaultConfigurationFileNotFound();
channel = new JChannel();
}
}
catch ( Exception e ) {
throw log.unableToStartJGroupsChannel( e );
}
}
return channelIsManaged;
}
}