Browse files

Network bridging now working with a large load of messages.

git-svn-id: https://svn.apache.org/repos/asf/activemq/activemq-apollo/trunk@1348133 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 1c00e17 commit 64379e9a86154162d890ce788f3dc0eb14a2a274 @chirino chirino committed Jun 8, 2012
View
9 ...-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala
@@ -39,16 +39,15 @@ trait BrokerLoadListener {
def on_load_change(broker_load:LoadStatusDTO)
}
object RestLoadMonitor extends Log
-class RestLoadMonitor extends BaseService with BrokerLoadMonitor {
+class RestLoadMonitor(manager:NetworkManager) extends BaseService with BrokerLoadMonitor {
import collection.JavaConversions._
import RestLoadMonitor._
val dispatch_queue = createQueue("rest load monitor")
val members = HashMap[String, LoadMonitor]()
- var poll_interval = 5*1000;
protected def _start(on_completed: Task) = {
- schedule_reoccurring(1, SECONDS) {
+ schedule_reoccurring(manager.monitoring_interval, SECONDS) {
for(monitor <- members.values) {
monitor.poll
}
@@ -110,7 +109,9 @@ class RestLoadMonitor extends BaseService with BrokerLoadMonitor {
dispatch_queue {
for(service <- member.services) {
if( service.kind == "web_admin" ) {
- members.put(member.id, LoadMonitor(member.id, new URL(service.address)))
+ var monitor = LoadMonitor(member.id, new URL(service.address))
+ members.put(member.id, monitor)
+ monitor.poll
}
}
}
View
2 apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala
@@ -63,7 +63,7 @@ class BrokerMetrics() {
dest_load.message_size = dest.message_size
// Lets not include the network consumers in the the consumer rates..
- val consumers = dest.consumers.filter(_.user == network_user).toArray
+ val consumers = dest.consumers.filter(_.user != network_user).toArray
dest_load.consumer_count = consumers.size
dest_load.dequeue_size_rate = 0
View
8 apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala
@@ -56,6 +56,10 @@ class NetworkManager(broker: Broker) extends BaseService with ClusterMembershipL
var metrics_map = HashMap[String, BrokerMetrics]()
val bridges = HashMap[BridgeInfo, BridgeDeployer]()
+ def network_user = Option(config.user).getOrElse("network")
+ def network_password = config.password
+ def monitoring_interval = OptionSupport(config.monitoring_interval).getOrElse(5)
+
protected def _start(on_completed: Task) = {
import collection.JavaConversions._
@@ -65,7 +69,7 @@ class NetworkManager(broker: Broker) extends BaseService with ClusterMembershipL
membership_monitor.listener = this
membership_monitor.start(NOOP)
- load_monitor = new RestLoadMonitor
+ load_monitor = new RestLoadMonitor(this)
load_monitor.listener = this
load_monitor.start(NOOP)
@@ -93,7 +97,7 @@ class NetworkManager(broker: Broker) extends BaseService with ClusterMembershipL
}
def on_load_change(dto: LoadStatusDTO) = dispatch_queue {
- metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, config.user)
+ metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, network_user)
}
def load_analysis = {
View
55 ...work/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala
@@ -40,20 +40,22 @@ class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy
val bridges = HashMap[(String, String), Bridge]()
- def bridge_user = manager.config.user
- def bridge_password = manager.config.password
+ def network_user = manager.network_user
+ def network_password = manager.network_password
- def deploy(info:BridgeInfo) = {
+ def deploy(bridge_info:BridgeInfo) = {
dispatch_queue.assertExecuting()
- val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to))
- bridge.deploy(info.kind, info.dest)
+ val bridge = bridges.getOrElseUpdate((bridge_info.from, bridge_info.to), new Bridge(bridge_info.from, bridge_info.to))
+ info("Deploying bridge for destination %s, from %s to %s", bridge_info.dest, bridge_info.from, bridge_info.to)
+ bridge.deploy(bridge_info.kind, bridge_info.dest)
}
- def undeploy(info:BridgeInfo) = {
+ def undeploy(bridge_info:BridgeInfo) = {
dispatch_queue.assertExecuting()
- for( bridge <- bridges.get((info.from, info.to)) ) {
- bridge.undeploy(info.kind, info.dest)
+ for( bridge <- bridges.get((bridge_info.from, bridge_info.to)) ) {
+ info("Undeploying bridge for destination %s, from %s to %s", bridge_info.dest, bridge_info.from, bridge_info.to)
+ bridge.undeploy(bridge_info.kind, bridge_info.dest)
}
}
@@ -69,21 +71,22 @@ class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy
case MESSAGE =>
// forward it..
frame.action(SEND)
- println("forwarding message: "+frame.getHeader(MESSAGE_ID))
+ var msgid = frame.getHeader(MESSAGE_ID)
+ debug("forwarding message: %s", msgid)
to_connection.send(frame, ()=>{
// Ack it if the original connection is still up...
// TODO: if it's not a we will probably get a dup/redelivery.
// Might want to introduce some dup detection at this point.
if( from_connection.state eq original_state ) {
val ack = new StompFrame(ACK);
ack.addHeader(SUBSCRIPTION, frame.getHeader(SUBSCRIPTION))
- ack.addHeader(MESSAGE_ID, frame.getHeader(MESSAGE_ID))
+ ack.addHeader(MESSAGE_ID, msgid)
from_connection.send(ack, null)
- println("forwarded message, now acking: "+frame.getHeader(MESSAGE_ID))
+ debug("forwarded message, now acking: %s", msgid)
}
})
case _ =>
- println("unhandled stomp frame: "+frame)
+ println("unhandled stomp frame: %s", frame)
}
}
@@ -128,8 +131,8 @@ class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy
val to_stomp = new Stomp()
to_stomp.setDispatchQueue(dispatch_queue)
to_stomp.setRemoteURI(uri)
- to_stomp.setLogin(bridge_user)
- to_stomp.setPasscode(bridge_password)
+ to_stomp.setLogin(network_user)
+ to_stomp.setPasscode(network_password)
to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL)
val headers = new Properties()
headers.put("client-type", "apollo-bridge")
@@ -175,17 +178,19 @@ class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy
// Reconnect any subscriptions.
subscriptions.keySet.foreach(subscribe(_))
// Re-send messages..
- pending_sends.values.foreach(x => do_send(x._1, x._2))
+ pending_sends.values.foreach(x => request(x._1, x._2))
}
- def do_send(frame:StompFrame, on_complete: ()=>Unit) = {
+ def request(frame:StompFrame, on_complete: ()=>Unit) = {
connection.request(frame, new org.fusesource.stomp.client.Callback[StompFrame] {
override def onSuccess(response: StompFrame) = on_complete()
override def onFailure(value: Throwable) = failed(value)
})
}
+ def send(frame:StompFrame) = connection.send(frame, null)
+
def failed(value: Throwable)= {
debug("Bridge connection to %s failed due to: ", uri, value)
close(ReconnectDelayState(1000))
@@ -235,14 +240,18 @@ class StompBridgingStrategy(val manager:NetworkManager) extends BridgingStrategy
}
}
- def send(destination:StompFrame, on_complete: ()=>Unit) = {
- val id = next_id
- val cb = ()=>{
- pending_sends.remove(id)
- on_complete()
+ def send(frame:StompFrame, on_complete: ()=>Unit) = {
+ if( on_complete!=null ) {
+ val id = next_id
+ val cb = ()=>{
+ pending_sends.remove(id)
+ on_complete()
+ }
+ pending_sends.put(id, (frame, cb))
+ react[ConnectedState] { state => state.request(frame, cb) }
+ } else {
+ react[ConnectedState] { state => state.send(frame) }
}
- pending_sends.put(id, (destination, cb))
- react[ConnectedState] { state => state.do_send(destination, cb) }
}
}
View
3 ...twork/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java
@@ -43,6 +43,9 @@
@XmlAttribute(name="duplex")
public Boolean duplex;
+ @XmlAttribute(name="monitoring_interval")
+ public Integer monitoring_interval;
+
@XmlElement(name="member")
public ArrayList<ClusterMemberDTO> members = new ArrayList<ClusterMemberDTO>();
}
View
27 apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala
@@ -19,12 +19,13 @@ package org.apache.activemq.apollo.broker.network
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
-import org.apache.activemq.apollo.broker.MultiBrokerTestSupport
import javax.jms.Session._
import org.fusesource.stomp.jms.{StompJmsDestination, StompJmsConnectionFactory}
import collection.mutable.ListBuffer
import javax.jms.{Message, TextMessage, Connection, ConnectionFactory}
import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.broker.{Broker, MultiBrokerTestSupport}
+import org.fusesource.hawtdispatch._
class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach {
@@ -69,17 +70,27 @@ class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with Before
case _ => None
}
- test("forward one message") {
+ test("forward 10000 messages") {
val connections = create_connections
-
- val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE)
- val p0 = s0.createProducer(test_destination())
- p0.send(s0.createTextMessage("1"))
+ val message_count = 10000;
+
+ var dest = test_destination()
+ val data = "x" * 1024
+
+ Broker.BLOCKABLE_THREAD_POOL {
+ val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE)
+ val p0 = s0.createProducer(dest)
+ for( i <- 0 until message_count ) {
+ p0.send(s0.createTextMessage(i+":"+data))
+ }
+ }
val s1 = connections(1).createSession(false, AUTO_ACKNOWLEDGE)
- val c1 = s1.createConsumer(test_destination())
+ val c1 = s1.createConsumer(dest)
within(30, SECONDS) {
- text(c1.receive()) should be(Some("1"))
+ for( i <- 0 until message_count ) {
+ text(c1.receive()) should be(Some(i+":"+data))
+ }
}
}

0 comments on commit 64379e9

Please sign in to comment.