Permalink
Browse files

HSEARCH-1198 MessageSender should be stopped when provider is stopped

  • Loading branch information...
1 parent 4d3c4db commit 19e226ccdb1b4e66d9c3456d2a2f6fd883481b31 @alesj alesj committed with Sanne Oct 5, 2012
@@ -24,21 +24,50 @@
package org.hibernate.search.backend.impl.jgroups;
+import org.hibernate.search.util.logging.impl.Log;
+import org.hibernate.search.util.logging.impl.LoggerFactory;
import org.jgroups.Channel;
import org.jgroups.Message;
/**
* Channel message sender.
- *
+ *
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
+ * @author Sanne Grinovero <sanne@hibernate.org> (C) 2012 Red Hat Inc.
*/
-class ChannelMessageSender extends AbstractMessageSender {
+final class ChannelMessageSender extends AbstractMessageSender {
+
+ private static final Log log = LoggerFactory.make();
- ChannelMessageSender(Channel channel) {
+ private final boolean channelIsManaged;
+ private final String clusterName;
+
+ ChannelMessageSender(Channel channel, boolean channelIsManaged, String clusterName) {
super( channel );
+ this.channelIsManaged = channelIsManaged;
+ this.clusterName = clusterName;
+ }
+
+ public void start() {
+ if ( channel != null && channelIsManaged ) {
+ try {
+ channel.connect( clusterName );
+ }
+ catch ( Exception e ) {
+ throw log.unableConnectingToJGroupsCluster( clusterName, e );
+ }
+ }
+ }
+
+ public void stop() {
+ if ( channel != null && channel.isOpen() && channelIsManaged ) {
+ log.jGroupsDisconnectingAndClosingChannel( clusterName );
+ channel.disconnect();
+ channel.close();
+ }
}
- public void send(Message message) throws Exception {
+ public void send(final Message message) throws Exception {
channel.send( message );
}
}
@@ -42,7 +42,15 @@
this.dispatcher = dispatcher;
}
+ public void stop() {
+ dispatcher.stop();
+ }
+
public void send(final Message message) throws Exception {
dispatcher.castMessage( null, message, RequestOptions.ASYNC() );
}
+
+ @Override
+ public void start() {
+ }
}
@@ -61,71 +61,33 @@
private static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "flush-udp.xml";
private static final String DEFAULT_CLUSTER_NAME = "Hibernate Search Cluster";
- protected String clusterName;
-
private Channel channel;
- private boolean channelIsManaged = true;
- private short muxId;
private MessageSender sender;
private BuildContext context;
@Override
public void start(Properties props, BuildContext context) {
- this.clusterName = props.getProperty( JGroupsChannelProvider.CLUSTER_NAME, DEFAULT_CLUSTER_NAME );
- prepareJGroupsChannel( props, context );
- }
-
- @Override
- public MessageSender getService() {
- return sender;
- }
-
- @Override
- public void stop() {
- context.releaseService( MasterSelectorServiceProvider.class );
- context = null;
- try {
- if ( channel != null && channel.isOpen() ) {
- UpHandler handler = channel.getUpHandler();
- if ( handler instanceof Muxer ) {
- Muxer muxer = (Muxer) handler;
- muxer.remove( muxId );
- }
- else {
- if ( channelIsManaged ) {
- log.jGroupsDisconnectingAndClosingChannel();
- channel.disconnect();
- channel.close();
- }
- }
- }
- }
- catch ( Exception toLog ) {
- log.jGroupsClosingChannelError( toLog );
- channel = null;
- }
- }
-
- private void prepareJGroupsChannel(Properties props, BuildContext context) {
this.context = context;
log.jGroupsStartingChannel();
- buildChannel( props );
+
+ boolean channelIsManaged = buildChannel( props );
+ String clusterName = props.getProperty( JGroupsChannelProvider.CLUSTER_NAME, DEFAULT_CLUSTER_NAME );
+
NodeSelectorStrategyHolder masterNodeSelector = context.requestService( MasterSelectorServiceProvider.class );
JGroupsMasterMessageListener listener = new JGroupsMasterMessageListener( context, masterNodeSelector );
UpHandler handler = channel.getUpHandler();
if ( handler instanceof Muxer ) {
- Short n = (Short) props.get( MUX_ID );
- if ( n == null ) {
+ Short muxId = (Short) props.get( MUX_ID );
+ if ( muxId == null ) {
throw log.missingJGroupsMuxId();
}
@SuppressWarnings("unchecked")
Muxer<UpHandler> muxer = (Muxer<UpHandler>) handler;
- if ( muxer.get( n ) != null ) {
- throw log.jGroupsMuxIdAlreadyTaken( n );
+ if ( muxer.get( muxId ) != null ) {
+ throw log.jGroupsMuxIdAlreadyTaken( muxId );
}
- muxId = n;
ClassLoader cl = (ClassLoader) props.get( CLASSLOADER );
MessageListener wrapper = ( cl != null ) ? new ClassloaderMessageListener( listener, cl ) : listener;
MessageListenerToRequestHandlerAdapter adapter = new MessageListenerToRequestHandlerAdapter( wrapper );
@@ -135,16 +97,9 @@ private void prepareJGroupsChannel(Properties props, BuildContext context) {
else {
// TODO -- perhaps port previous multi-handling?
channel.setReceiver( listener );
- if ( channelIsManaged ) {
- try {
- channel.connect( clusterName );
- }
- catch ( Exception e ) {
- throw log.unableConnectingToJGroupsCluster( clusterName, e );
- }
- }
- sender = new ChannelMessageSender( channel );
+ sender = new ChannelMessageSender( channel, channelIsManaged, clusterName);
}
+ sender.start();
masterNodeSelector.setLocalAddress( channel.getAddress() );
log.jGroupsConnectedToCluster( clusterName, channel.getAddress() );
@@ -154,15 +109,39 @@ private void prepareJGroupsChannel(Properties props, BuildContext context) {
}
}
+ @Override
+ public MessageSender getService() {
+ return sender;
+ }
+
+ @Override
+ public void stop() {
+ context.releaseService( MasterSelectorServiceProvider.class );
+ context = null;
+ try {
+ channel = null;
+
+ if ( sender != null ) {
+ sender.stop();
+ sender = null;
+ }
+ }
+ catch ( Exception toLog ) {
+ log.jGroupsClosingChannelError( toLog );
+ }
+ }
+
/**
* Reads configuration and builds channel with its base.
* In order of preference - we first look for an external JGroups file, then a set of XML properties, and
* finally the legacy JGroups String properties.
*
* @param props configuration file
+ * @return true if channel is managed, false otherwise
*/
- private void buildChannel(Properties props) {
- String cfg;
+ private boolean buildChannel(Properties props) {
+ boolean channelIsManaged = true;
+
if ( props != null ) {
if ( props.containsKey( JGroupsChannelProvider.CHANNEL_INJECT ) ) {
Object channelObject = props.get( JGroupsChannelProvider.CHANNEL_INJECT );
@@ -176,7 +155,7 @@ private void buildChannel(Properties props) {
}
if ( props.containsKey( JGroupsChannelProvider.CONFIGURATION_FILE ) ) {
- cfg = props.getProperty( JGroupsChannelProvider.CONFIGURATION_FILE );
+ String cfg = props.getProperty( JGroupsChannelProvider.CONFIGURATION_FILE );
try {
channel = new JChannel( ConfigurationParseHelper.locateConfig( cfg ) );
}
@@ -202,6 +181,8 @@ private void buildChannel(Properties props) {
throw log.unableToStartJGroupsChannel( e );
}
}
+
+ return channelIsManaged;
}
}
@@ -37,6 +37,16 @@
public interface MessageSender {
/**
+ * Start sender.
+ */
+ void start();
+
+ /**
+ * Stop sender.
+ */
+ void stop();
+
+ /**
* Send message.
*
* @param message the JGroups message
@@ -104,8 +104,8 @@
void jGroupsDefaultConfigurationFileNotFound();
@LogMessage(level = INFO)
- @Message(id = 13, value = "Disconnecting and closing JGroups Channel")
- void jGroupsDisconnectingAndClosingChannel();
+ @Message(id = 13, value = "Disconnecting and closing JGroups Channel to cluster '%1$s'")
+ void jGroupsDisconnectingAndClosingChannel(String clusterName);
@LogMessage(level = ERROR)
@Message(id = 14, value = "Problem closing channel; setting it to null")

0 comments on commit 19e226c

Please sign in to comment.