Skip to content
Browse files

Merge branch 'maint/3.2/master' into maint/3.2/security

  • Loading branch information...
2 parents 822b24b + 73b693b commit 2e6b106d9b41d826e5e4e8247a283a8bf6ff41b9 @viglesiasce viglesiasce committed Feb 12, 2013
Showing with 114 additions and 39 deletions.
  1. +114 −39 clc/modules/msgs/conf/scripts/setup_membership.groovy
View
153 clc/modules/msgs/conf/scripts/setup_membership.groovy
@@ -68,33 +68,54 @@ import org.jgroups.protocols.VERIFY_SUSPECT
import org.jgroups.protocols.FD
import org.jgroups.protocols.FD_SOCK
import java.net.InetAddress
+import java.net.InetSocketAddress
import java.net.UnknownHostException
import org.apache.log4j.Logger
+import org.jgroups.protocols.Discovery
import org.jgroups.protocols.FC
import org.jgroups.protocols.FD
import org.jgroups.protocols.FD_SOCK
import org.jgroups.protocols.FRAG2
import org.jgroups.protocols.MERGE2
import org.jgroups.protocols.PING
+import org.jgroups.protocols.TCP
+import org.jgroups.protocols.TCPGOSSIP;
+import org.jgroups.protocols.TP
import org.jgroups.protocols.UDP
import org.jgroups.protocols.UNICAST
import org.jgroups.protocols.VERIFY_SUSPECT
import org.jgroups.protocols.pbcast.GMS
import org.jgroups.protocols.pbcast.NAKACK
import org.jgroups.protocols.pbcast.STABLE
import org.jgroups.protocols.pbcast.STATE_TRANSFER
+import org.jgroups.stack.GossipRouter;
import com.eucalyptus.bootstrap.BootstrapArgs
import com.eucalyptus.bootstrap.Hosts
+import com.eucalyptus.bootstrap.OrderedShutdown;
import com.eucalyptus.bootstrap.SystemIds
import com.eucalyptus.empyrean.Empyrean
import com.eucalyptus.system.Threads
import com.eucalyptus.util.Internets
+import com.google.common.collect.Sets;
Logger LOG = Logger.getLogger( "com.eucalyptus.scripts.setup_membership" );
+/**
+ * UDP/Multicast configuration
+ */
String multicastAddress = "228.7.7.3";
InetAddress multicastInetAddress = InetAddress.getByName( multicastAddress );
Integer multicastPort = 8773;
+/**
+ * TCP/TCPGOSSIP configuration
+ */
+Integer tcpPortBase = 8700;
+Integer tcpPortRange = 100;
+Integer gossipPort = 8778;
+String gossipBindAddr = Internets.localHostAddress( );
+/**
+ * General Transport thread configuration
+ */
Integer threadPoolMaxThreads = 25;
Integer threadPoolMinThreads = 2;
Integer threadPoolKeepAliveTime = 5000;
@@ -109,44 +130,98 @@ defaultThreads = Threads.lookup( Empyrean.class, Hosts.class );
normalThreads = Threads.lookup( Empyrean.class, Hosts.class, "normal-pool" );
oobThreads = Threads.lookup( Empyrean.class, Hosts.class, "oob-pool" );
+def udpTransport = {
+ UDP udp = new UDP( );
+ try {
+ LOG.info( "Setting membership addres: " + Internets.localHostAddress( ) );
+ udp.setBindAddress( Internets.localHostAddress( ) );
+ udp.setBindPort( 8773 );
+ udp.setBindToAllInterfaces( false );//this sets receive_on_all_interfaces
+ } catch ( UnknownHostException ex ) {
+ LOG.error( ex, ex );
+ }
+ udp.setMulticastAddress( multicastInetAddress );
+ udp.setMulticastPort( multicastPort );
+ udp.setDiscardIncompatiblePackets( true );
+ udp.setLogDiscardMessages( false );
+ udp.setMaxBundleSize( 60000 );
+ udp.setMaxBundleTimeout( 30 );
+ udp
+}
+
+def udpDiscovery = {
+ PING pingDiscovery = new PING( );
+ pingDiscovery.setTimeout( 2000 );
+ pingDiscovery.setNumInitialMembers( 2 );
+
+ pingDiscovery
+}
+
+def tcpTransport = {
+ TCP tcp = new TCP()
+ tcp.setBindAddress( Internets.localHostAddress( ) )
+ tcp.setBindPort( tcpPortBase )
+ tcp.setReaperInterval( 30000 )
+ tcp.setPortRange( tcpPortRange )//go from 8776-9000
+ tcp
+}
+
+def tcpDiscovery = {
+ TCPGOSSIP tcpGossip = new TCPGOSSIP( );
+ tcpGossip.setTimeout( 10000 )
+ tcpGossip.setNumInitialMembers( 2 )
+ initialHosts = Sets.newHashSet( BootstrapArgs.parseBootstrapHosts( ) ).collect{ new InetSocketAddress( it, gossipPort ) }
+ LOG.info( "TCPGOSSIP: ${initialHosts}" )
+ tcpGossip.setInitialHosts( initialHosts )
+ tcpGossip
+}
+
+def gossipRouter = {
+ GossipRouter router = new GossipRouter(gossipPort, gossipBindAddr,true)
+ OrderedShutdown.registerPreShutdownHook{
+ if(router.isRunning( )) {
+ router.stop( );
+ }
+ }
+ LOG.info( "GossipRouter starting on: ${gossipBindAddr}:${gossipPort}" )
+ try {
+ router.start( );
+ LOG.info( "GossipRouter started on: ${gossipBindAddr}:${gossipPort} (see jmx object jgroups:name=GossipRouter)" )
+ } catch( Exception e ) {
+ LOG.error( "GossipRouter failed to start: ${e.getMessage()}", e );
+ }
+ router
+}
-UDP udp = new UDP( );
-try {
- LOG.info( "Setting membership addres: " + Internets.localHostAddress( ) );
- udp.setBindAddress( Internets.localHostAddress( ) );
- udp.setBindPort( 8773 );
- udp.setBindToAllInterfaces( false );//this sets receive_on_all_interfaces
-} catch ( UnknownHostException ex ) {
- LOG.error( ex, ex );
+def transportSupplier = udpTransport
+def discoverySupplier = udpDiscovery
+
+if ( !BootstrapArgs.parseBootstrapHosts( ).isEmpty( ) ) {
+ gossipRouter()
+ transportSupplier = tcpTransport
+ discoverySupplier = tcpDiscovery
}
-udp.setMulticastAddress( multicastInetAddress );
-udp.setMulticastPort( multicastPort );
-udp.setDiscardIncompatiblePackets( true );
-udp.setLogDiscardMessages( false );
-udp.setMaxBundleSize( 60000 );
-udp.setMaxBundleTimeout( 30 );
-udp.setValue( "singleton_name", SystemIds.membershipUdpMcastTransportName( ) );
-
-//udp.setDefaultThreadPool( defaultThreads );
-udp.setDefaultThreadPoolThreadFactory( defaultThreads );
-
-udp.setThreadFactory( normalThreads );
-udp.setThreadPoolMaxThreads( threadPoolMaxThreads );
-udp.setThreadPoolKeepAliveTime( threadPoolKeepAliveTime );
-udp.setThreadPoolMinThreads( threadPoolMinThreads );
-udp.setThreadPoolQueueEnabled( threadPoolQueueEnabled );
-udp.setRegularRejectionPolicy( regularRejectionPolicy );
-
-udp.setOOBThreadPoolThreadFactory( oobThreads );
-//udp.setOOBThreadPool( oobThreads );
-udp.setOOBThreadPoolMaxThreads( oobThreadPoolMaxThreads );
-udp.setOOBThreadPoolKeepAliveTime( oobThreadPoolKeepAliveTime );
-udp.setOOBThreadPoolMinThreads( oobThreadPoolMinThreads );
-udp.setOOBRejectionPolicy( oobRejectionPolicy );
-
-PING pingDiscovery = new PING( );
-pingDiscovery.setTimeout( 2000 );
-pingDiscovery.setNumInitialMembers( 2 );
+
+TP transport = transportSupplier()
+transport.setValue( "singleton_name", SystemIds.membershipUdpMcastTransportName( ) );
+//transport.setDefaultThreadPool( defaultThreads );
+transport.setDefaultThreadPoolThreadFactory( defaultThreads );
+//transport thread factories
+transport.setThreadFactory( normalThreads );
+transport.setThreadPoolMaxThreads( threadPoolMaxThreads );
+transport.setThreadPoolKeepAliveTime( threadPoolKeepAliveTime );
+transport.setThreadPoolMinThreads( threadPoolMinThreads );
+transport.setThreadPoolQueueEnabled( threadPoolQueueEnabled );
+transport.setRegularRejectionPolicy( regularRejectionPolicy );
+//transport OOB thread factories
+transport.setOOBThreadPoolThreadFactory( oobThreads );
+//transport.setOOBThreadPool( oobThreads );
+transport.setOOBThreadPoolMaxThreads( oobThreadPoolMaxThreads );
+transport.setOOBThreadPoolKeepAliveTime( oobThreadPoolKeepAliveTime );
+transport.setOOBThreadPoolMinThreads( oobThreadPoolMinThreads );
+transport.setOOBRejectionPolicy( oobRejectionPolicy );
+
+Discovery discovery = discoverySupplier()
MERGE2 mergeHandler = new MERGE2( );
mergeHandler.setMaxInterval( 30000 );
@@ -180,13 +255,13 @@ flowControl.setMaxCredits( 20000000 );
flowControl.setMinThreshold( 0.1 );
return [
- udp,
- pingDiscovery,
+ transport,
+ discovery,
mergeHandler,
fdSocket,
new FD(),
new VERIFY_SUSPECT(),
- negackBroadcast,
+ negackBroadcast,
new UNICAST(),
stableBroadcast,
groupMembership,

0 comments on commit 2e6b106

Please sign in to comment.
Something went wrong with that request. Please try again.