Skip to content

Commit

Permalink
the discovery address/port now configured in standalone.xml via socke…
Browse files Browse the repository at this point in the history
…t binding
  • Loading branch information
jmazzitelli committed Jan 27, 2015
1 parent 742134a commit 8e96e0c
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class BrokerService implements Service<BrokerService> {
*/
final InjectedValue<SocketBinding> connectorSocketBinding = new InjectedValue<SocketBinding>();

/**
* Our subsystem add-step handler will inject this as a dependency for us. This object will provide the multicast
* address and port for the broker's network connector which is used to discover other brokers.
*/
final InjectedValue<SocketBinding> discoverySocketBinding = new InjectedValue<SocketBinding>();

/**
* The broker configuration file that is used to completely configure the broker. This is the "out-of-box"
* configuration that can be customized with overrides via {@link #customConfigProperties}.
Expand Down Expand Up @@ -111,11 +117,11 @@ protected void startBroker() throws StartException {
log.info("Starting the broker now");
try {
// make sure we pre-configure the broker with some settings taken from our runtime environment

// get the socket the transport connector is to bind to - make sure we do not bind "to all"
SocketBinding connectorSocketBindingValue = connectorSocketBinding.getValue();
String connectorAddress = connectorSocketBindingValue.getAddress().getHostAddress();
String connectorPort = String.valueOf(connectorSocketBindingValue.getAbsolutePort());

// just pick one if we weren't given one - we don't want to bind "to all"
if (connectorAddress.equals("0.0.0.0") || connectorAddress.equals("::/128")) {
connectorAddress = InetAddress.getLocalHost().getCanonicalHostName();
}
Expand All @@ -124,6 +130,13 @@ protected void startBroker() throws StartException {
customConfigProperties.put(BrokerSubsystemExtension.BROKER_CONNECTOR_PORT_SYSPROP, connectorPort);
log.info("Broker told to bind socket to [" + connectorAddress + ":" + connectorPort + "]");

SocketBinding discoverySocketBindingValue = discoverySocketBinding.getValue();
String discoveryAddress = discoverySocketBindingValue.getMulticastAddress().getHostAddress();
String discoveryPort = String.valueOf(discoverySocketBindingValue.getMulticastPort());
customConfigProperties.put(BrokerSubsystemExtension.BROKER_DISCOVERY_ADDRESS_SYSPROP, discoveryAddress);
customConfigProperties.put(BrokerSubsystemExtension.BROKER_DISCOVERY_PORT_SYSPROP, discoveryPort);
log.info("Broker told to discover other brokers via [" + discoveryAddress + ":" + discoveryPort + "]");

ServerEnvironment env = envServiceValue.getValue();
BrokerConfigurationSetup configSetup = new BrokerConfigurationSetup(configurationFile, customConfigProperties, env);
log.info("Broker told to use configuration file [" + configSetup.getConfigurationFile() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected void populateModel(ModelNode operation, ModelNode model) throws Operat
BrokerSubsystemDefinition.CONNECTOR_NAME_ATTRIBDEF.validateAndSet(operation, model);
BrokerSubsystemDefinition.CONNECTOR_PROTOCOL_ATTRIBDEF.validateAndSet(operation, model);
BrokerSubsystemDefinition.SOCKET_BINDING_ATTRIBDEF.validateAndSet(operation, model);
BrokerSubsystemDefinition.DISCOVERY_SOCKET_BINDING_ATTRIBDEF.validateAndSet(operation, model);
log.debug("Populating the Broker subsystem model: " + operation + "=" + model);
}

Expand Down Expand Up @@ -154,13 +155,18 @@ protected void performRuntime(OperationContext context, ModelNode operation, Mod

// install the service
String binding = BrokerSubsystemDefinition.SOCKET_BINDING_ATTRIBDEF.resolveModelAttribute(context, model).asString();
String discoveryBinding = BrokerSubsystemDefinition.DISCOVERY_SOCKET_BINDING_ATTRIBDEF.resolveModelAttribute(
context, model).asString();
ServiceName name = BrokerService.SERVICE_NAME;
ServiceController<BrokerService> controller = context.getServiceTarget() //
.addService(name, service) //
.addDependency(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class, service.envServiceValue) //
.addDependency(SocketBinding.JBOSS_BINDING_NAME.append(binding), SocketBinding.class, service.connectorSocketBinding) //
.addListener(verificationHandler) //
.setInitialMode(Mode.ACTIVE) //
ServiceController<BrokerService> controller = context.getServiceTarget()
.addService(name, service)
.addDependency(ServerEnvironmentService.SERVICE_NAME, ServerEnvironment.class, service.envServiceValue)
.addDependency(SocketBinding.JBOSS_BINDING_NAME.append(binding), SocketBinding.class,
service.connectorSocketBinding)
.addDependency(SocketBinding.JBOSS_BINDING_NAME.append(discoveryBinding), SocketBinding.class,
service.discoverySocketBinding)
.addListener(verificationHandler)
.setInitialMode(Mode.ACTIVE)
.install();
newControllers.add(controller);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,16 @@ public class BrokerSubsystemDefinition extends SimpleResourceDefinition {
.setDefaultValue(new ModelNode(BrokerSubsystemExtension.CONNECTOR_PROTOCOL_DEFAULT)).setAllowNull(true).build();

protected static final SimpleAttributeDefinition SOCKET_BINDING_ATTRIBDEF = new SimpleAttributeDefinitionBuilder(
BrokerSubsystemExtension.CONNECTOR_SOCKET_BINDING_ATTR, ModelType.STRING).setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.setDefaultValue(new ModelNode("org.hawkular.bus.broker")).setValidator(new StringLengthValidator(1)).setAllowNull(false).build();
BrokerSubsystemExtension.CONNECTOR_SOCKET_BINDING_ATTR, ModelType.STRING)
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.setDefaultValue(new ModelNode(BrokerSubsystemExtension.CONNECTOR_SOCKET_BINDING_DEFAULT))
.setValidator(new StringLengthValidator(1)).setAllowNull(false).build();

protected static final SimpleAttributeDefinition DISCOVERY_SOCKET_BINDING_ATTRIBDEF = new SimpleAttributeDefinitionBuilder(
BrokerSubsystemExtension.DISCOVERY_SOCKET_BINDING_ELEMENT, ModelType.STRING)
.setFlags(AttributeAccess.Flag.RESTART_RESOURCE_SERVICES)
.setDefaultValue(new ModelNode(BrokerSubsystemExtension.DISCOVERY_SOCKET_BINDING_DEFAULT))
.setValidator(new StringLengthValidator(1)).setAllowNull(false).build();

// operation parameters
protected static final SimpleAttributeDefinition START_OP_PARAM_RESTART = new SimpleAttributeDefinitionBuilder("restart", ModelType.BOOLEAN)
Expand All @@ -77,6 +85,7 @@ public void registerAttributes(ManagementResourceRegistration rr) {
registerReloadRequiredWriteAttributeHandler(rr, CONNECTOR_NAME_ATTRIBDEF);
registerReloadRequiredWriteAttributeHandler(rr, CONNECTOR_PROTOCOL_ATTRIBDEF);
registerReloadRequiredWriteAttributeHandler(rr, SOCKET_BINDING_ATTRIBDEF);
registerReloadRequiredWriteAttributeHandler(rr, DISCOVERY_SOCKET_BINDING_ATTRIBDEF);
}

private void registerReloadRequiredWriteAttributeHandler(ManagementResourceRegistration rr, AttributeDefinition def) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class BrokerSubsystemExtension implements Extension {
protected static final String BROKER_CONNECTOR_PROTOCOL_SYSPROP = "org.hawkular.bus.broker.connector.protocol";
protected static final String BROKER_CONNECTOR_ADDRESS_SYSPROP = "org.hawkular.bus.broker.connector.address";
protected static final String BROKER_CONNECTOR_PORT_SYSPROP = "org.hawkular.bus.broker.connector.port";
protected static final String BROKER_DISCOVERY_ADDRESS_SYSPROP = "org.hawkular.bus.broker.discovery.address";
protected static final String BROKER_DISCOVERY_PORT_SYSPROP = "org.hawkular.bus.broker.discovery.port";

// The following define the XML elements and attributes of the extension itself (these appear in WildFly's
// standalone.xml for this extension).
Expand All @@ -76,9 +78,13 @@ public class BrokerSubsystemExtension implements Extension {
protected static final String CONNECTOR_NAME_ATTR = BROKER_CONNECTOR_NAME_SYSPROP;
protected static final String CONNECTOR_PROTOCOL_ATTR = BROKER_CONNECTOR_PROTOCOL_SYSPROP;
protected static final String CONNECTOR_SOCKET_BINDING_ATTR = "socket-binding";
protected static final String CONNECTOR_SOCKET_BINDING_DEFAULT = "org.hawkular.bus.broker";
protected static final String CONNECTOR_NAME_DEFAULT = "openwire";
protected static final String CONNECTOR_PROTOCOL_DEFAULT = "tcp";

protected static final String DISCOVERY_SOCKET_BINDING_ELEMENT = "discovery-socket-binding";
protected static final String DISCOVERY_SOCKET_BINDING_DEFAULT = "org.hawkular.bus.broker.discovery";

protected static final String CUSTOM_CONFIG_ELEMENT = "custom-configuration";
protected static final String PROPERTY_ELEMENT = "property";

Expand Down Expand Up @@ -162,6 +168,9 @@ public void readElement(XMLExtendedStreamReader reader, List<ModelNode> list) th
opAdd.get(PERSISTENT_ELEMENT).set(new ValueExpression(reader.getElementText()));
} else if (elementName.equals(USE_JMX_ELEMENT)) {
opAdd.get(USE_JMX_ELEMENT).set(new ValueExpression(reader.getElementText()));
} else if (elementName.equals(DISCOVERY_SOCKET_BINDING_ELEMENT)) {
// we don't support expression here, must be the actual name
opAdd.get(DISCOVERY_SOCKET_BINDING_ELEMENT).set(reader.getElementText());
} else {
throw ParseUtils.unexpectedElement(reader);
}
Expand Down Expand Up @@ -192,7 +201,7 @@ public void writeContent(final XMLExtendedStreamWriter writer, final SubsystemMa
writer.writeAttribute(BROKER_ENABLED_ATTR, node.get(BROKER_ENABLED_ATTR).asString());
writer.writeAttribute(BROKER_CONFIG_FILE_ATTR, node.get(BROKER_CONFIG_FILE_ATTR).asString());

// our config elements
// our main broker config elements
writeElement(writer, node, BROKER_NAME_ELEMENT);
writeElement(writer, node, PERSISTENT_ELEMENT);
writeElement(writer, node, USE_JMX_ELEMENT);
Expand All @@ -215,6 +224,9 @@ public void writeContent(final XMLExtendedStreamWriter writer, final SubsystemMa
// </connector>
writer.writeEndElement();

// <socket-binding-element />
writeElement(writer, node, DISCOVERY_SOCKET_BINDING_ELEMENT);

// <custom-configuration>
writer.writeStartElement(CUSTOM_CONFIG_ELEMENT);
ModelNode configNode = node.get(CUSTOM_CONFIG_ELEMENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
updateClusterClients="true"
updateClusterClientsOnRemove="true"
rebalanceClusterClients="true"
discoveryUri="multicast://224.0.0.4:61612?group=hawkular"
discoveryUri="multicast://${org.hawkular.bus.broker.discovery.address}:${org.hawkular.bus.broker.discovery.port}?group=hawkular"
/>
</transportConnectors>

Expand All @@ -30,7 +30,7 @@
multicast://hostname:port
-->
<networkConnectors>
<networkConnector uri="multicast://224.0.0.4:61612?group=hawkular" duplex="true"/>
<networkConnector uri="multicast://${org.hawkular.bus.broker.discovery.address}:${org.hawkular.bus.broker.discovery.port}?group=hawkular"/>
</networkConnectors>

<persistenceAdapter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ hawkular-bus-broker.enabled=When true, the Hawkular Broker will be deployed and
hawkular-bus-broker.configuration-file=The configuration file that further defines the Hawkular Broker setup and behavior. If an absolute path, is used as-is; otherwise the file can be in the server configuration directory or in the module config directory - the former takes precedence.
hawkular-bus-broker.custom-configuration=Additional configuration values used to configure the broker. The broker will set these as system properties.
hawkular-bus-broker.socket-binding=Determines the binding address and port the broker listens to for incoming messages.
hawkular-bus-broker.discovery-socket-binding=Determines the multicast address and port the broker uses to discover other brokers.
hawkular-bus-broker.org.hawkular.bus.broker.name=The name of the Hawkular Broker.
hawkular-bus-broker.org.hawkular.bus.broker.persistent=Determines if the Hawkular Broker will persist its messages for fault tolerance.
hawkular-bus-broker.org.hawkular.bus.broker.use-jmx=Determines if the Hawkular Broker will enable its JMX MBeans.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<xs:element name="org.hawkular.bus.broker.persistent" type="xs:boolean" use="optional" />
<xs:element name="org.hawkular.bus.broker.use-jmx" type="xs:boolean"use="optional" />
<xs:element name="connector" type="connectorType" use="required" />
<xs:element name="discovery-socket-binding" type="xs:string" use="required" />
<xs:element name="custom-configuration" type="customConfigurationType" use="optional" />
</xs:all>
</xs:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public void testResourceDescription() throws Exception {
List<Property> attributes = content.get("attributes").asPropertyList();

List<String> expectedAttributes = Arrays.asList( //
BrokerSubsystemExtension.DISCOVERY_SOCKET_BINDING_ELEMENT, //
BrokerSubsystemExtension.CONNECTOR_SOCKET_BINDING_ATTR, //
BrokerSubsystemExtension.CONNECTOR_NAME_ATTR, //
BrokerSubsystemExtension.CONNECTOR_PROTOCOL_ATTR, //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
org.hawkular.bus.broker.connector.name="test-connector-name"
org.hawkular.bus.broker.connector.protocol="test-connector-protocol" />

<discovery-socket-binding>test-discovery-socket-binding</discovery-socket-binding>

<custom-configuration>
<property name="custom-prop" value="custom-prop-val"/>
<property name="custom-prop2" value="custom-prop-val2"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ public void testSendRPC() throws Exception {
}

// make sure the message flowed properly
assertFalse(future.isCancelled());
assertTrue(future.isDone());
assertNotNull("Didn't receive response", receivedSpecificMessage);
assertEquals("RESPONSE:" + specificMessage.getMessage(), receivedSpecificMessage.getMessage());
assertEquals(specificMessage.getDetails(), receivedSpecificMessage.getDetails());
assertEquals("RESPONSE:" + specificMessage.getSpecific(), receivedSpecificMessage.getSpecific());
assertFalse(future.isCancelled());
assertTrue(future.isDone());

// use the future.get(timeout) method and make sure it returns the same
try {
Expand Down Expand Up @@ -272,12 +272,12 @@ public void testRPCTimeout() throws Exception {
}

// make sure the message flowed properly
assertFalse(future.isCancelled());
assertTrue(future.isDone());
assertNotNull("Didn't receive response", receivedSpecificMessage);
assertEquals("RESPONSE:" + specificMessage.getMessage(), receivedSpecificMessage.getMessage());
assertEquals(specificMessage.getDetails(), receivedSpecificMessage.getDetails());
assertEquals("RESPONSE:" + specificMessage.getSpecific(), receivedSpecificMessage.getSpecific());
assertFalse(future.isCancelled());
assertTrue(future.isDone());

} finally {
// close everything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
updateClusterClients="true"
updateClusterClientsOnRemove="true"
rebalanceClusterClients="true"
discoveryUri="multicast://224.0.0.4:61612?group=hawkular"
discoveryUri="multicast://${org.hawkular.bus.broker.discovery.address}:${org.hawkular.bus.broker.discovery.port}?group=hawkular"
/>
</transportConnectors>

Expand All @@ -30,7 +30,7 @@
multicast://hostname:port
-->
<networkConnectors>
<networkConnector uri="multicast://224.0.0.4:61612?group=hawkular" duplex="true"/>
<networkConnector uri="multicast://${org.hawkular.bus.broker.discovery.address}:${org.hawkular.bus.broker.discovery.port}?group=hawkular" />
</networkConnectors>

<persistenceAdapter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@
<org.hawkular.bus.broker.persistent>false</org.hawkular.bus.broker.persistent>
<org.hawkular.bus.broker.use-jmx>false</org.hawkular.bus.broker.use-jmx>
<connector org.hawkular.bus.broker.connector.name="openwire" org.hawkular.bus.broker.connector.protocol="tcp" socket-binding="org.hawkular.bus.broker"/>
<discovery-socket-binding>org.hawkular.bus.broker.discovery</discovery-socket-binding>
<custom-configuration>
<property name="custom-prop" value="custom-prop-val"/>
</custom-configuration>
Expand Down Expand Up @@ -450,6 +451,7 @@
<outbound-socket-binding name="mail-smtp">
<remote-destination host="localhost" port="25"/>
</outbound-socket-binding>
<socket-binding name="org.hawkular.bus.broker" port="61616"/>
<socket-binding name="org.hawkular.bus.broker" port="62626"/>
<socket-binding name="org.hawkular.bus.broker.discovery" port="0" multicast-address="224.0.0.4" multicast-port="63636"/>
</socket-binding-group>
</server>

0 comments on commit 8e96e0c

Please sign in to comment.