diff --git a/docs/user-manual/en/HornetQ_User_Manual.xml b/docs/user-manual/en/HornetQ_User_Manual.xml
index 66ee1a2bec4..3ddf95485db 100644
--- a/docs/user-manual/en/HornetQ_User_Manual.xml
+++ b/docs/user-manual/en/HornetQ_User_Manual.xml
@@ -65,6 +65,7 @@
+
diff --git a/docs/user-manual/en/vertx-integration.xml b/docs/user-manual/en/vertx-integration.xml
new file mode 100644
index 00000000000..71bb11023ae
--- /dev/null
+++ b/docs/user-manual/en/vertx-integration.xml
@@ -0,0 +1,115 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ %BOOK_ENTITIES;
+ ]>
+
+ Vert.x Integration
+ Vert.x is a lightweight, high performance application platform for the
+ JVM that's designed for modern mobile, web, and enterprise applications. Vert.x provides a distributed event bus that
+ allows messages to be sent across vert.x instances and clients. You can now redirect and persist any vert.x messages
+ to HornetQ and route those messages to a specified vertx address by configuring HornetQ vertx incoming and outgoing
+ vertx connector services.
+
+
+
+ Configuring a Vertx Incoming Connector Service
+ Vertx Incoming Connector services receive messages from vertx event bus and route them to a HornetQ queue.
+ Such a service can be configured as follows:
+
+ <connector-service name="vertx-incoming-connector">
+ <factory-class>org.hornetq.integration.vertx.VertxIncomingConnectorServiceFactory</factory-class>
+ <param key="host" value="127.0.0.1"/>
+ <param key="port" value="0"/>
+ <param key="queue" value="jms.queue.vertxQueue"/>
+ <param key="vertx-address" value="vertx.in.eventaddress"/>
+ </connector-service>
+
+ Shown are the required params for the connector service:
+
+
+ queue . The name of the HornetQ queue to send message to.
+
+
+ As well as these required paramaters there are the following optional parameters
+
+
+ host . The host name on which the vertx target container is running. Default is localhost.
+
+
+ port . The port number to which the target vertx listens. Default is zero.
+
+
+ quorum-size . The quorum size of the target vertx instance.
+
+
+ ha-group . The name of the ha-group of target vertx instance. Default is hornetq .
+
+
+ vertx-address . The vertx address to listen to. default is org.hornetq.
+
+
+
+
+
+ Configuring a Vertx Outgoing Connector Service
+ Vertx Outgoing Connector services fetch vertx messages from a HornetQ queue and put them to vertx event bus.
+ Such a service can be configured as follows:
+
+ <connector-service name="vertx-outgoing-connector">
+ <factory-class>org.hornetq.integration.vertx.VertxOutgoingConnectorServiceFactory</factory-class>
+ <param key="host" value="127.0.0.1"/>
+ <param key="port" value="0"/>
+ <param key="queue" value="jms.queue.vertxQueue"/>
+ <param key="vertx-address" value="vertx.out.eventaddress"/>
+ <param key="publish" value="true"/>
+ </connector-service>
+
+ Shown are the required params for the connector service:
+
+
+ queue . The name of the HornetQ queue to fetch message from.
+
+
+ As well as these required paramaters there are the following optional parameters
+
+
+ host . The host name on which the vertx target container is running. Default is localhost.
+
+
+ port . The port number to which the target vertx listens. Default is zero.
+
+
+ quorum-size . The quorum size of the target vertx instance.
+
+
+ ha-group . The name of the ha-group of target vertx instance. Default is hornetq .
+
+
+ vertx-address . The vertx address to put messages to. default is org.hornetq.
+
+
+ publish . How messages is sent to vertx event bus. "true" means using publish style.
+ "false" means using send style. Default is false.
+
+
+
+
diff --git a/examples/core/vertx-connector/pom.xml b/examples/core/vertx-connector/pom.xml
new file mode 100644
index 00000000000..a2cc7f39357
--- /dev/null
+++ b/examples/core/vertx-connector/pom.xml
@@ -0,0 +1,181 @@
+
+ 4.0.0
+
+
+ org.hornetq.examples.core
+ core-examples
+ 2.5.0-SNAPSHOT
+
+
+ hornetq-vertx-example
+ jar
+ HornetQ Vert.x Example
+
+
+ 2.1RC1
+
+
+
+ org.hornetq
+ hornetq-server
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-core-client
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-commons
+ ${project.version}
+
+
+ io.netty
+ netty-all
+ ${netty.version}
+
+
+ org.jboss.javaee
+ jboss-jms-api
+ 1.1.0.GA
+
+
+ org.jboss.naming
+ jnp-client
+ 5.0.5.Final
+
+
+ org.jboss.spec.javax.jms
+ jboss-jms-api_2.0_spec
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ vertx-platform
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ vertx-hazelcast
+ ${vertx.version}
+ provided
+
+
+
+
+
+
+
+ org.hornetq
+ hornetq-maven-plugin
+
+
+ start
+
+ start
+
+
+
+
+ build.directory
+ ${basedir}/target/
+
+
+
+
+
+ runClient
+
+ runClient
+
+
+ org.hornetq.core.example.VertxConnectorExample
+
+
+
+ stop
+
+ stop
+
+
+
+
+
+ org.hornetq.examples.core
+ hornetq-vertx-example
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-vertx-integration
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-core-client
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-server
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-jms-client
+ ${project.version}
+
+
+ org.hornetq
+ hornetq-jms-server
+ ${project.version}
+
+
+ io.netty
+ netty-all
+ ${netty.version}
+
+
+ org.jboss.javaee
+ jboss-jms-api
+ 1.1.0.GA
+
+
+ org.jboss.naming
+ jnpserver
+ 5.0.3.GA
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+
+
+ io.vertx
+ vertx-platform
+ ${vertx.version}
+
+
+ io.vertx
+ vertx-hazelcast
+ ${vertx.version}
+
+
+
+ false
+ ${basedir}/target/classes/server0
+
+
+
+
+
+
+
diff --git a/examples/core/vertx-connector/readme.html b/examples/core/vertx-connector/readme.html
new file mode 100644
index 00000000000..59538f93727
--- /dev/null
+++ b/examples/core/vertx-connector/readme.html
@@ -0,0 +1,84 @@
+
+
+ HornetQ Vert.x Connector Service Example
+
+
+
+
+
+ Vert.x Connector Service Example
+
+ This example shows you how to configure HornetQ to use the Vert.x Connector Service.
+
+ HornetQ supports 2 types of Vert.x connector, incoming and outgoing.
+ Incoming connector consumes from Vert.x event bus and forwards to a configurable address.
+ Outgoing connector consumes from a configurable address and forwards to a configurable Vert.x event bus.
+
+
+ In this example, an incoming connector and an outgoing connector are configured. A simple java Verticle
+ is deployed. The verticle registers a message handler on the outgoing connector's address ("outgoing.vertx.address").
+ A String message is sent to Vert.x event bus on the incoming connector's address("incoming.vertx.address").
+ The message then will be forwarded to a HornetQ queue by the incoming connector. The outgoing connector listens to
+ the HornetQ queue and forwards the message from HornetQ to Vert.x event bus on the outgoing connector's address.
+ The verticle finally receives the message from it's event bus.
+
+ For more information on Vert.x concept please visit the Vertx site
+
+ Example step-by-step
+ To run the server, simply type mvn verify
+ from this directory.
+
+
+ First we need to create a Vert.x PlatformManager
+
+ platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);
+
+
+ We deploy a Verticle using the platformManager
+
+ String verticle = "org.hornetq.core.example.ExampleVerticle";
+ platformManager.deployVerticle(verticle, null, new URL[0], 1, null,
+ new Handler>(){
+
+ @Override
+ public void handle(AsyncResult result)
+ {
+ if (!result.succeeded())
+ {
+ throw new RuntimeException("failed to deploy verticle", result.cause());
+ }
+ latch0.countDown();
+ }
+
+ });
+
+
+ We register a message handler with the event bus in the Verticle to listen on the outgoing connector's address.
+
+ EventBus eventBus = vertx.eventBus();
+ eventBus.registerHandler(VertxConnectorExample.OUTGOING,
+ new Handler>() {
+ @Override
+ public void handle(Message> startMsg)
+ {
+ Object body = startMsg.body();
+ System.out.println("Verticle receives a message: " + body);
+ VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body));
+ latch0.countDown();
+ }
+ });
+
+
+
+ We send a message to incoming connector's address via event bus
+
+
+ EventBus bus = platformManager.vertx().eventBus();
+ bus.send(INCOMING, MSG);
+
+
+
+ The message will eventually arrives at the Verticle's message handler.
+
+
+
diff --git a/examples/core/vertx-connector/src/main/java/org/hornetq/core/example/ExampleVerticle.java b/examples/core/vertx-connector/src/main/java/org/hornetq/core/example/ExampleVerticle.java
new file mode 100644
index 00000000000..a9491c6149a
--- /dev/null
+++ b/examples/core/vertx-connector/src/main/java/org/hornetq/core/example/ExampleVerticle.java
@@ -0,0 +1,43 @@
+package org.hornetq.core.example;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.eventbus.EventBus;
+import org.vertx.java.core.eventbus.Message;
+import org.vertx.java.platform.Verticle;
+
+public class ExampleVerticle extends Verticle
+{
+ @Override
+ public void start()
+ {
+ EventBus eventBus = vertx.eventBus();
+
+ final CountDownLatch latch0 = new CountDownLatch(1);
+
+ // Register a handler on the outgoing connector's address
+ eventBus.registerHandler(VertxConnectorExample.OUTGOING,
+ new Handler>() {
+ @Override
+ public void handle(Message> startMsg)
+ {
+ Object body = startMsg.body();
+ System.out.println("Verticle receives a message: " + body);
+ VertxConnectorExample.result.set(VertxConnectorExample.MSG.equals(body));
+ latch0.countDown();
+ //Tell the example to finish.
+ VertxConnectorExample.latch.countDown();
+ }
+ });
+
+ try
+ {
+ latch0.await(5000, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+}
diff --git a/examples/core/vertx-connector/src/main/java/org/hornetq/core/example/VertxConnectorExample.java b/examples/core/vertx-connector/src/main/java/org/hornetq/core/example/VertxConnectorExample.java
new file mode 100644
index 00000000000..12a1e8b4580
--- /dev/null
+++ b/examples/core/vertx-connector/src/main/java/org/hornetq/core/example/VertxConnectorExample.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.example;
+
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.vertx.java.core.AsyncResult;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.eventbus.EventBus;
+import org.vertx.java.platform.PlatformLocator;
+import org.vertx.java.platform.PlatformManager;
+import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
+
+/**
+ * A simple example of using Vert.x connector service.
+ *
+ * @author Howard Gao
+ */
+public class VertxConnectorExample
+{
+ public static final String INCOMING = "incoming.vertx.address";
+ public static final String OUTGOING = "outgoing.vertx.address";
+ public static final String MSG = "Welcome to Vertx world!";
+
+ public final static CountDownLatch latch = new CountDownLatch(1);
+ public final static AtomicBoolean result = new AtomicBoolean(false);
+
+ private static final String HOST = "127.0.0.1";
+ private static final int PORT = 0;
+
+ public static void main(final String[] args) throws Exception
+ {
+ System.setProperty("vertx.clusterManagerFactory",
+ HazelcastClusterManagerFactory.class.getName());
+ PlatformManager platformManager = null;
+
+ try
+ {
+ // Step 1 Create a Vert.x PlatformManager
+ platformManager = PlatformLocator.factory.createPlatformManager(PORT, HOST);
+
+ final CountDownLatch latch0 = new CountDownLatch(1);
+
+ // Step 2 Deploy a Verticle to receive message
+ String verticle = "org.hornetq.core.example.ExampleVerticle";
+ platformManager.deployVerticle(verticle, null, new URL[0], 1, null,
+ new Handler>(){
+
+ @Override
+ public void handle(AsyncResult result)
+ {
+ if (!result.succeeded())
+ {
+ throw new RuntimeException("failed to deploy verticle", result.cause());
+ }
+ latch0.countDown();
+ }
+
+ });
+
+ latch0.await();
+
+ // Step 3 Send a message to the incoming connector's address
+ EventBus bus = platformManager.vertx().eventBus();
+ bus.send(INCOMING, MSG);
+
+ // Step 4 Waiting for the Verticle to process the message
+ latch.await(10000, TimeUnit.MILLISECONDS);
+ }
+ finally
+ {
+ if(platformManager != null)
+ {
+ platformManager.undeployAll(null);
+ platformManager.stop();
+ }
+ reportResultAndExit();
+ }
+ }
+
+ private static void reportResultAndExit()
+ {
+ if (!result.get())
+ {
+ System.err.println();
+ System.err.println("#####################");
+ System.err.println("### FAILURE! ###");
+ System.err.println("#####################");
+ System.exit(1);
+ }
+ else
+ {
+ System.out.println();
+ System.out.println("#####################");
+ System.out.println("### SUCCESS! ###");
+ System.out.println("#####################");
+ System.exit(0);
+ }
+ }
+
+}
diff --git a/examples/core/vertx-connector/src/main/resources/server0/hornetq-beans.xml b/examples/core/vertx-connector/src/main/resources/server0/hornetq-beans.xml
new file mode 100644
index 00000000000..171d3739eb7
--- /dev/null
+++ b/examples/core/vertx-connector/src/main/resources/server0/hornetq-beans.xml
@@ -0,0 +1,59 @@
+
+
+
+
+
+
+
+
+
+
+
+ 1099
+ localhost
+ 1098
+ localhost
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/examples/core/vertx-connector/src/main/resources/server0/hornetq-configuration.xml b/examples/core/vertx-connector/src/main/resources/server0/hornetq-configuration.xml
new file mode 100644
index 00000000000..32df46609b0
--- /dev/null
+++ b/examples/core/vertx-connector/src/main/resources/server0/hornetq-configuration.xml
@@ -0,0 +1,61 @@
+
+
+
+ target/server0/data/messaging/bindings
+
+ target/server0/data/messaging/journal
+
+ target/server0/data/messaging/largemessages
+
+ target/server0/data/messaging/paging
+
+
+
+
+ org.hornetq.core.remoting.impl.netty.NettyConnectorFactory
+
+
+
+
+
+
+ org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ queue.vertxQueue
+
+
+
+
+
+ org.hornetq.integration.vertx.VertxIncomingConnectorServiceFactory
+
+
+
+
+
+
+ org.hornetq.integration.vertx.VertxOutgoingConnectorServiceFactory
+
+
+
+
+
+
+
+
diff --git a/examples/core/vertx-connector/src/main/resources/server0/hornetq-jms.xml b/examples/core/vertx-connector/src/main/resources/server0/hornetq-jms.xml
new file mode 100644
index 00000000000..678e7f5214b
--- /dev/null
+++ b/examples/core/vertx-connector/src/main/resources/server0/hornetq-jms.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/core/vertx-connector/src/main/resources/server0/hornetq-users.xml b/examples/core/vertx-connector/src/main/resources/server0/hornetq-users.xml
new file mode 100644
index 00000000000..934306c4b5e
--- /dev/null
+++ b/examples/core/vertx-connector/src/main/resources/server0/hornetq-users.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/integration/hornetq-vertx-integration/pom.xml b/integration/hornetq-vertx-integration/pom.xml
new file mode 100644
index 00000000000..4412e888144
--- /dev/null
+++ b/integration/hornetq-vertx-integration/pom.xml
@@ -0,0 +1,127 @@
+
+
+ 4.0.0
+
+
+ org.hornetq
+ hornetq-pom
+ 2.5.0-SNAPSHOT
+ ../../pom.xml
+
+
+ hornetq-vertx-integration
+ jar
+ HornetQ Vert.x Integration
+
+
+
+ ${project.parent.basedir}
+
+ UTF-8
+
+
+ false
+
+
+ false
+
+
+ ${project.groupId}~${project.artifactId}~${project.version}
+
+
+ target/mods
+
+
+ 2.1RC1
+ 2.0.2-final
+ 4.11
+
+
+ 3.0
+ 2.6
+ 2.5
+ 2.0.1-final
+ 2.14
+ 2.14
+ 2.14
+ 2.9
+ 2.7
+
+
+
+
+ sonatype-nexus-snapshots
+ https://oss.sonatype.org/content/repositories/public
+
+
+
+
+
+ org.jboss.logging
+ jboss-logging-processor
+
+
+
+
+ org.jboss.logging
+ jboss-logging
+
+
+ org.hornetq
+ hornetq-server
+ ${project.version}
+
+
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ vertx-platform
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ vertx-hazelcast
+ ${vertx.version}
+ provided
+
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ io.vertx
+ testtools
+ ${vertx.testtools.version}
+ test
+
+
+
+
+
+
+
diff --git a/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/HornetQVertxLogger.java b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/HornetQVertxLogger.java
new file mode 100644
index 00000000000..a0e1a02dc77
--- /dev/null
+++ b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/HornetQVertxLogger.java
@@ -0,0 +1,63 @@
+/*
+* JBoss, Home of Professional Open Source.
+* Copyright 2010, Red Hat, Inc., and individual contributors
+* as indicated by the @author tags. See the copyright.txt file in the
+* distribution for a full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.hornetq.integration.vertx;
+
+import org.hornetq.core.server.ServerMessage;
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.LogMessage;
+import org.jboss.logging.Logger;
+import org.jboss.logging.Message;
+import org.jboss.logging.MessageLogger;
+
+/**
+ * @author Howard Gao
+ *
+ * Logger Code 19
+ *
+ * each message id must be 6 digits long starting with 19, the 3rd digit donates the level so
+ *
+ * INF0 1
+ * WARN 2
+ * DEBUG 3
+ * ERROR 4
+ * TRACE 5
+ * FATAL 6
+ *
+ * so an INFO message would be 191000 to 191999
+ */
+@MessageLogger(projectCode = "HQ")
+public interface HornetQVertxLogger extends BasicLogger
+{
+ /**
+ * The vertx logger.
+ */
+ HornetQVertxLogger LOGGER = Logger.getMessageLogger(HornetQVertxLogger.class, HornetQVertxLogger.class.getPackage().getName());
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 192001, value = "Non vertx message: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void nonVertxMessage(ServerMessage message);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 192002, value = "Invalid vertx type: {0}", format = Message.Format.MESSAGE_FORMAT)
+ void invalidVertxType(Integer type);
+
+}
diff --git a/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/IncomingVertxEventHandler.java b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/IncomingVertxEventHandler.java
new file mode 100644
index 00000000000..667b758b1df
--- /dev/null
+++ b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/IncomingVertxEventHandler.java
@@ -0,0 +1,319 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.integration.vertx;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ConnectorService;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.ConfigurationHelper;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.eventbus.EventBus;
+import org.vertx.java.core.eventbus.Message;
+import org.vertx.java.core.eventbus.ReplyException;
+import org.vertx.java.core.eventbus.impl.PingMessage;
+import org.vertx.java.core.json.JsonArray;
+import org.vertx.java.core.json.JsonObject;
+import org.vertx.java.platform.PlatformLocator;
+import org.vertx.java.platform.PlatformManager;
+import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
+
+public class IncomingVertxEventHandler implements ConnectorService
+{
+ private final String connectorName;
+
+ private final String queueName;
+
+ private final int port;
+
+ private final String host;
+
+ private final int quorumSize;
+
+ private final String haGroup;
+
+ private final String vertxAddress;
+
+ private EventBus eventBus;
+
+ private PlatformManager platformManager;
+
+ private EventHandler handler;
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private boolean isStarted = false;
+
+ public IncomingVertxEventHandler(String connectorName, Map configuration,
+ StorageManager storageManager, PostOffice postOffice,
+ ScheduledExecutorService scheduledThreadPool)
+ {
+ this.connectorName = connectorName;
+ this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null,
+ configuration);
+
+ this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
+ this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost",
+ configuration);
+ this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE,
+ -1, configuration);
+ this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP,
+ "hornetq", configuration);
+ this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS,
+ "org.hornetq", configuration);
+
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ }
+
+ @Override
+ public void start() throws Exception
+ {
+ if (this.isStarted)
+ {
+ return;
+ }
+ System.setProperty("vertx.clusterManagerFactory",
+ HazelcastClusterManagerFactory.class.getName());
+ if (quorumSize != -1)
+ {
+ platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
+ }
+ else
+ {
+ platformManager = PlatformLocator.factory.createPlatformManager(port, host);
+ }
+
+ eventBus = platformManager.vertx().eventBus();
+
+ Binding b = postOffice.getBinding(new SimpleString(queueName));
+ if (b == null)
+ {
+ throw new Exception(connectorName + ": queue " + queueName + " not found");
+ }
+
+ handler = new EventHandler();
+ eventBus.registerHandler(vertxAddress, handler);
+
+ isStarted = true;
+ HornetQVertxLogger.LOGGER.debug(connectorName + ": started");
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ if (!isStarted)
+ {
+ return;
+ }
+ eventBus.unregisterHandler(vertxAddress, handler);
+ platformManager.stop();
+ System.clearProperty("vertx.clusterManagerFactory");
+ isStarted = false;
+ HornetQVertxLogger.LOGGER.debug(connectorName + ": stopped");
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ return isStarted;
+ }
+
+ @Override
+ public String getName()
+ {
+ return connectorName;
+ }
+
+ private class EventHandler implements Handler>
+ {
+ @Override
+ public void handle(Message> message)
+ {
+ ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(),
+ VertxConstants.INITIAL_MESSAGE_BUFFER_SIZE);
+ msg.setAddress(new SimpleString(queueName));
+ msg.setDurable(true);
+ msg.encodeMessageIDToBuffer();
+
+ String replyAddress = message.replyAddress();
+ if (replyAddress != null)
+ {
+ msg.putStringProperty(VertxConstants.VERTX_MESSAGE_REPLYADDRESS, replyAddress);
+ }
+
+ // it'd be better that Message expose its type information
+ int type = getMessageType(message);
+
+ msg.putIntProperty(VertxConstants.VERTX_MESSAGE_TYPE, type);
+
+ manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type);
+
+ try
+ {
+ postOffice.route(msg, false);
+ }
+ catch (Exception e)
+ {
+ HornetQVertxLogger.LOGGER.error("failed to route msg " + msg, e);
+ }
+ }
+
+ private void manualEncodeVertxMessageBody(HornetQBuffer bodyBuffer, Object body, int type)
+ {
+ switch (type)
+ {
+ case VertxConstants.TYPE_BOOLEAN:
+ bodyBuffer.writeBoolean(((Boolean) body));
+ break;
+ case VertxConstants.TYPE_BUFFER:
+ Buffer buff = (Buffer) body;
+ int len = buff.length();
+ bodyBuffer.writeInt(len);
+ bodyBuffer.writeBytes(((Buffer) body).getBytes());
+ break;
+ case VertxConstants.TYPE_BYTEARRAY:
+ byte[] bytes = (byte[]) body;
+ bodyBuffer.writeInt(bytes.length);
+ bodyBuffer.writeBytes(bytes);
+ break;
+ case VertxConstants.TYPE_BYTE:
+ bodyBuffer.writeByte((byte) body);
+ break;
+ case VertxConstants.TYPE_CHARACTER:
+ bodyBuffer.writeChar((Character) body);
+ break;
+ case VertxConstants.TYPE_DOUBLE:
+ bodyBuffer.writeDouble((double) body);
+ break;
+ case VertxConstants.TYPE_FLOAT:
+ bodyBuffer.writeFloat((Float) body);
+ break;
+ case VertxConstants.TYPE_INT:
+ bodyBuffer.writeInt((Integer) body);
+ break;
+ case VertxConstants.TYPE_LONG:
+ bodyBuffer.writeLong((Long) body);
+ break;
+ case VertxConstants.TYPE_SHORT:
+ bodyBuffer.writeShort((Short) body);
+ break;
+ case VertxConstants.TYPE_STRING:
+ case VertxConstants.TYPE_PING:
+ bodyBuffer.writeString((String) body);
+ break;
+ case VertxConstants.TYPE_JSON_OBJECT:
+ bodyBuffer.writeString(((JsonObject) body).encode());
+ break;
+ case VertxConstants.TYPE_JSON_ARRAY:
+ bodyBuffer.writeString(((JsonArray) body).encode());
+ break;
+ case VertxConstants.TYPE_REPLY_FAILURE:
+ ReplyException except = (ReplyException) body;
+ bodyBuffer.writeInt(except.failureType().toInt());
+ bodyBuffer.writeInt(except.failureCode());
+ bodyBuffer.writeString(except.getMessage());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid body type: " + type);
+ }
+ }
+
+ private int getMessageType(Message> message)
+ {
+
+ Object body = message.body();
+
+ if (message instanceof PingMessage)
+ {
+ return VertxConstants.TYPE_PING;
+ }
+ else if (body instanceof Buffer)
+ {
+ return VertxConstants.TYPE_BUFFER;
+ }
+ else if (body instanceof Boolean)
+ {
+ return VertxConstants.TYPE_BOOLEAN;
+ }
+ else if (body instanceof byte[])
+ {
+ return VertxConstants.TYPE_BYTEARRAY;
+ }
+ else if (body instanceof Byte)
+ {
+ return VertxConstants.TYPE_BYTE;
+ }
+ else if (body instanceof Character)
+ {
+ return VertxConstants.TYPE_CHARACTER;
+ }
+ else if (body instanceof Double)
+ {
+ return VertxConstants.TYPE_DOUBLE;
+ }
+ else if (body instanceof Float)
+ {
+ return VertxConstants.TYPE_FLOAT;
+ }
+ else if (body instanceof Integer)
+ {
+ return VertxConstants.TYPE_INT;
+ }
+ else if (body instanceof Long)
+ {
+ return VertxConstants.TYPE_LONG;
+ }
+ else if (body instanceof Short)
+ {
+ return VertxConstants.TYPE_SHORT;
+ }
+ else if (body instanceof String)
+ {
+ return VertxConstants.TYPE_STRING;
+ }
+ else if (body instanceof JsonArray)
+ {
+ return VertxConstants.TYPE_JSON_ARRAY;
+ }
+ else if (body instanceof JsonObject)
+ {
+ return VertxConstants.TYPE_JSON_OBJECT;
+ }
+ else if (body instanceof ReplyException)
+ {
+ return VertxConstants.TYPE_REPLY_FAILURE;
+ }
+
+ throw new IllegalArgumentException("Type not supported: " + message);
+ }
+
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[IncomingVertxEventHandler(" + connectorName + "), queueName: " + queueName
+ + " host: " + host + " port: " + port + " vertxAddress: " + vertxAddress + "]";
+ }
+}
diff --git a/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/OutgoingVertxEventHandler.java b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/OutgoingVertxEventHandler.java
new file mode 100644
index 00000000000..cb020616173
--- /dev/null
+++ b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/OutgoingVertxEventHandler.java
@@ -0,0 +1,337 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.hornetq.integration.vertx;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.filter.Filter;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ConnectorService;
+import org.hornetq.core.server.Consumer;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.ConfigurationHelper;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.eventbus.EventBus;
+import org.vertx.java.core.eventbus.ReplyException;
+import org.vertx.java.core.eventbus.ReplyFailure;
+import org.vertx.java.core.json.JsonArray;
+import org.vertx.java.core.json.JsonObject;
+import org.vertx.java.platform.PlatformLocator;
+import org.vertx.java.platform.PlatformManager;
+import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
+
+public class OutgoingVertxEventHandler implements Consumer, ConnectorService
+{
+ private final String connectorName;
+
+ private final String queueName;
+
+ private final int port;
+
+ private final String host;
+
+ private final int quorumSize;
+
+ private final String haGroup;
+
+ private final String vertxAddress;
+
+ private final boolean publish;
+
+ private final PostOffice postOffice;
+
+ private Queue queue = null;
+
+ private Filter filter = null;
+
+ private EventBus eventBus;
+
+ private PlatformManager platformManager;
+
+ private boolean isStarted = false;
+
+ public OutgoingVertxEventHandler(String connectorName, Map configuration,
+ StorageManager storageManager, PostOffice postOffice,
+ ScheduledExecutorService scheduledThreadPool)
+ {
+ this.connectorName = connectorName;
+ this.queueName = ConfigurationHelper.getStringProperty(VertxConstants.QUEUE_NAME, null,
+ configuration);
+ this.postOffice = postOffice;
+
+ this.port = ConfigurationHelper.getIntProperty(VertxConstants.PORT, 0, configuration);
+ this.host = ConfigurationHelper.getStringProperty(VertxConstants.HOST, "localhost",
+ configuration);
+ this.quorumSize = ConfigurationHelper.getIntProperty(VertxConstants.VERTX_QUORUM_SIZE,
+ -1, configuration);
+ this.haGroup = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_HA_GROUP,
+ "hornetq", configuration);
+ this.vertxAddress = ConfigurationHelper.getStringProperty(VertxConstants.VERTX_ADDRESS,
+ "org.hornetq", configuration);
+ this.publish = ConfigurationHelper.getBooleanProperty(VertxConstants.VERTX_PUBLISH, false,
+ configuration);
+ }
+
+ @Override
+ public void start() throws Exception
+ {
+ if (this.isStarted)
+ {
+ return;
+ }
+ System.setProperty("vertx.clusterManagerFactory",
+ HazelcastClusterManagerFactory.class.getName());
+ if (quorumSize != -1)
+ {
+ platformManager = PlatformLocator.factory.createPlatformManager(port, host, quorumSize, haGroup);
+ }
+ else
+ {
+ platformManager = PlatformLocator.factory.createPlatformManager(port, host);
+ }
+
+ eventBus = platformManager.vertx().eventBus();
+
+ if (this.connectorName == null || this.connectorName.trim().equals(""))
+ {
+ throw new Exception("invalid connector name: " + this.connectorName);
+ }
+
+ if (this.queueName == null || this.queueName.trim().equals(""))
+ {
+ throw new Exception("invalid queue name: " + queueName);
+ }
+
+ SimpleString name = new SimpleString(this.queueName);
+ Binding b = this.postOffice.getBinding(name);
+ if (b == null)
+ {
+ throw new Exception(connectorName + ": queue " + queueName + " not found");
+ }
+ this.queue = (Queue) b.getBindable();
+ this.queue.addConsumer(this);
+
+ this.queue.deliverAsync();
+ this.isStarted = true;
+
+ HornetQVertxLogger.LOGGER.debug(connectorName + ": started");
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ if (!this.isStarted)
+ {
+ return;
+ }
+
+ HornetQVertxLogger.LOGGER.debug(connectorName + ": receive shutdown request");
+
+ this.queue.removeConsumer(this);
+
+ this.platformManager.stop();
+ System.clearProperty("vertx.clusterManagerFactory");
+ this.isStarted = false;
+ HornetQVertxLogger.LOGGER.debug(connectorName + ": stopped");
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ return this.isStarted;
+ }
+
+ @Override
+ public String getName()
+ {
+ return this.connectorName;
+ }
+
+ @Override
+ public HandleStatus handle(MessageReference ref) throws Exception
+ {
+ if (filter != null && !filter.match(ref.getMessage()))
+ {
+ return HandleStatus.NO_MATCH;
+ }
+
+ synchronized (this)
+ {
+ ref.handled();
+
+ ServerMessage message = ref.getMessage();
+
+ Object vertxMsgBody = null;
+ // extract information from message
+ Integer type = message.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+
+ if (type == null)
+ {
+ // log a warning and default to raw bytes
+ HornetQVertxLogger.LOGGER.nonVertxMessage(message);
+ type = VertxConstants.TYPE_RAWBYTES;
+ }
+
+ // from vertx
+ vertxMsgBody = extractMessageBody(message, type);
+
+ if (vertxMsgBody == null)
+ {
+ return HandleStatus.NO_MATCH;
+ }
+
+ // send to bus
+ if (!publish)
+ {
+ eventBus.send(vertxAddress, vertxMsgBody);
+ }
+ else
+ {
+ eventBus.publish(vertxAddress, vertxMsgBody);
+ }
+
+ queue.acknowledge(ref);
+
+ HornetQVertxLogger.LOGGER.debug(connectorName + ": forwarded to vertx: "
+ + message.getMessageID());
+ return HandleStatus.HANDLED;
+ }
+ }
+
+ private Object extractMessageBody(ServerMessage message, Integer type) throws Exception
+ {
+ Object vertxMsgBody = null;
+ HornetQBuffer bodyBuffer = message.getBodyBuffer();
+ switch (type)
+ {
+ case VertxConstants.TYPE_PING:
+ case VertxConstants.TYPE_STRING:
+ bodyBuffer.resetReaderIndex();
+ vertxMsgBody = bodyBuffer.readString();
+ break;
+ case VertxConstants.TYPE_BUFFER:
+ int len = bodyBuffer.readInt();
+ byte[] bytes = new byte[len];
+ bodyBuffer.readBytes(bytes);
+ vertxMsgBody = new Buffer(bytes);
+ break;
+ case VertxConstants.TYPE_BOOLEAN:
+ vertxMsgBody = bodyBuffer.readBoolean();
+ break;
+ case VertxConstants.TYPE_BYTEARRAY:
+ int length = bodyBuffer.readInt();
+ byte[] byteArray = new byte[length];
+ bodyBuffer.readBytes(byteArray);
+ vertxMsgBody = byteArray;
+ break;
+ case VertxConstants.TYPE_BYTE:
+ vertxMsgBody = bodyBuffer.readByte();
+ break;
+ case VertxConstants.TYPE_CHARACTER:
+ vertxMsgBody = bodyBuffer.readChar();
+ break;
+ case VertxConstants.TYPE_DOUBLE:
+ vertxMsgBody = bodyBuffer.readDouble();
+ break;
+ case VertxConstants.TYPE_FLOAT:
+ vertxMsgBody = bodyBuffer.readFloat();
+ break;
+ case VertxConstants.TYPE_INT:
+ vertxMsgBody = bodyBuffer.readInt();
+ break;
+ case VertxConstants.TYPE_LONG:
+ vertxMsgBody = bodyBuffer.readLong();
+ break;
+ case VertxConstants.TYPE_SHORT:
+ vertxMsgBody = bodyBuffer.readShort();
+ break;
+ case VertxConstants.TYPE_JSON_OBJECT:
+ vertxMsgBody = new JsonObject(bodyBuffer.readString());
+ break;
+ case VertxConstants.TYPE_JSON_ARRAY:
+ vertxMsgBody = new JsonArray(bodyBuffer.readString());
+ break;
+ case VertxConstants.TYPE_REPLY_FAILURE:
+ int failureType = bodyBuffer.readInt();
+ int failureCode = bodyBuffer.readInt();
+ String errMsg = bodyBuffer.readString();
+ vertxMsgBody = new ReplyException(ReplyFailure.fromInt(failureType), failureCode,
+ errMsg);
+ break;
+ case VertxConstants.TYPE_RAWBYTES:
+ int size = bodyBuffer.readableBytes();
+ byte[] rawBytes = new byte[size];
+ bodyBuffer.readBytes(rawBytes);
+ vertxMsgBody = rawBytes;
+ break;
+ default:
+ HornetQVertxLogger.LOGGER.invalidVertxType(type);
+ break;
+ }
+ return vertxMsgBody;
+ }
+
+ @Override
+ public void proceedDeliver(MessageReference reference) throws Exception
+ {
+ // no op
+ }
+
+ @Override
+ public Filter getFilter()
+ {
+ return this.filter;
+ }
+
+ @Override
+ public String debug()
+ {
+ return null;
+ }
+
+ @Override
+ public String toManagementString()
+ {
+ return null;
+ }
+
+ @Override
+ public List getDeliveringMessages()
+ {
+ return null;
+ }
+
+ @Override
+ public void disconnect()
+ {
+ }
+
+}
diff --git a/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxConstants.java b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxConstants.java
new file mode 100644
index 00000000000..884ce35ae95
--- /dev/null
+++ b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxConstants.java
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc., and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.hornetq.integration.vertx;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class VertxConstants
+{
+ // org.vertx.java.core.eventbus.impl.MessageFactory
+ public static final int TYPE_PING = 0;
+ public static final int TYPE_BUFFER = 1;
+ public static final int TYPE_BOOLEAN = 2;
+ public static final int TYPE_BYTEARRAY = 3;
+ public static final int TYPE_BYTE = 4;
+ public static final int TYPE_CHARACTER = 5;
+ public static final int TYPE_DOUBLE = 6;
+ public static final int TYPE_FLOAT = 7;
+ public static final int TYPE_INT = 8;
+ public static final int TYPE_LONG = 9;
+ public static final int TYPE_SHORT = 10;
+ public static final int TYPE_STRING = 11;
+ public static final int TYPE_JSON_OBJECT = 12;
+ public static final int TYPE_JSON_ARRAY = 13;
+ public static final int TYPE_REPLY_FAILURE = 100;
+ public static final int TYPE_RAWBYTES = 200;
+
+
+ public static final String PORT = "port";
+ public static final String HOST = "host";
+ public static final String QUEUE_NAME = "queue";
+ public static final String VERTX_ADDRESS = "vertx-address";
+ public static final String VERTX_PUBLISH = "publish";
+ public static final String VERTX_QUORUM_SIZE = "quorum-size";
+ public static final String VERTX_HA_GROUP = "ha-group";
+
+ public static final Set ALLOWABLE_INCOMING_CONNECTOR_KEYS;
+ public static final Set REQUIRED_INCOMING_CONNECTOR_KEYS;
+ public static final Set ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
+ public static final Set REQUIRED_OUTGOING_CONNECTOR_KEYS;
+ public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
+ public static final String VERTX_MESSAGE_REPLYADDRESS = "vertx.message.replyaddress";
+ public static final String VERTX_MESSAGE_TYPE = "vertx.message.type";
+
+ static
+ {
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet();
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(PORT);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(HOST);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_ADDRESS);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE);
+ ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(VERTX_HA_GROUP);
+
+ REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet();
+ REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
+
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet();
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(PORT);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(HOST);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_ADDRESS);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_PUBLISH);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_QUORUM_SIZE);
+ ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(VERTX_HA_GROUP);
+
+ REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet();
+ REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
+ }
+}
diff --git a/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxIncomingConnectorServiceFactory.java b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxIncomingConnectorServiceFactory.java
new file mode 100644
index 00000000000..0bbf35fef07
--- /dev/null
+++ b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxIncomingConnectorServiceFactory.java
@@ -0,0 +1,38 @@
+package org.hornetq.integration.vertx;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ConnectorService;
+import org.hornetq.core.server.ConnectorServiceFactory;
+
+public class VertxIncomingConnectorServiceFactory implements ConnectorServiceFactory
+{
+
+ @Override
+ public ConnectorService createConnectorService(String connectorName,
+ Map configuration, StorageManager storageManager,
+ PostOffice postOffice, ScheduledExecutorService scheduledThreadPool)
+ {
+
+ return new IncomingVertxEventHandler(connectorName, configuration, storageManager,
+ postOffice, scheduledThreadPool);
+
+ }
+
+ @Override
+ public Set getAllowableProperties()
+ {
+ return VertxConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS;
+ }
+
+ @Override
+ public Set getRequiredProperties()
+ {
+ return VertxConstants.REQUIRED_INCOMING_CONNECTOR_KEYS;
+ }
+
+}
diff --git a/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxOutgoingConnectorServiceFactory.java b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxOutgoingConnectorServiceFactory.java
new file mode 100644
index 00000000000..46b28af76e6
--- /dev/null
+++ b/integration/hornetq-vertx-integration/src/main/java/org/hornetq/integration/vertx/VertxOutgoingConnectorServiceFactory.java
@@ -0,0 +1,36 @@
+package org.hornetq.integration.vertx;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ConnectorService;
+import org.hornetq.core.server.ConnectorServiceFactory;
+
+public class VertxOutgoingConnectorServiceFactory implements ConnectorServiceFactory
+{
+
+ @Override
+ public ConnectorService createConnectorService(String connectorName,
+ Map configuration, StorageManager storageManager,
+ PostOffice postOffice, ScheduledExecutorService scheduledThreadPool)
+ {
+ return new OutgoingVertxEventHandler(connectorName, configuration, storageManager,
+ postOffice, scheduledThreadPool);
+ }
+
+ @Override
+ public Set getAllowableProperties()
+ {
+ return VertxConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
+ }
+
+ @Override
+ public Set getRequiredProperties()
+ {
+ return VertxConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index a16e09c0d04..0bf9367556c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
- 4.0.13.Final
+ 4.0.17.Final
Wild Hornet
2
5
@@ -485,6 +485,7 @@
integration/hornetq-spring-integration
integration/hornetq-twitter-integration
integration/hornetq-aerogear-integration
+ integration/hornetq-vertx-integration
tests
@@ -507,6 +508,7 @@
integration/hornetq-spring-integration
integration/hornetq-twitter-integration
integration/hornetq-aerogear-integration
+ integration/hornetq-vertx-integration
examples
@@ -529,6 +531,7 @@
integration/hornetq-spring-integration
integration/hornetq-twitter-integration
integration/hornetq-aerogear-integration
+ integration/hornetq-vertx-integration
examples
docs
distribution
@@ -553,6 +556,7 @@
integration/hornetq-spring-integration
integration/hornetq-twitter-integration
integration/hornetq-aerogear-integration
+ integration/hornetq-vertx-integration
tests
@@ -588,6 +592,7 @@
integration/hornetq-spring-integration
integration/hornetq-twitter-integration
integration/hornetq-aerogear-integration
+ integration/hornetq-vertx-integration
tests
@@ -619,6 +624,7 @@
integration/hornetq-spring-integration
integration/hornetq-twitter-integration
integration/hornetq-aerogear-integration
+ integration/hornetq-vertx-integration
tests
examples
@@ -852,7 +858,7 @@
false
-
+
**/filter/impl/Identifier.java
**/filter/impl/Operator.java
**/filter/impl/RegExp.java
@@ -890,6 +895,7 @@
+-->
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index b36c178341b..86d9925dcb1 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -13,6 +13,8 @@
${project.parent.parent.basedir}
+ 2.1RC1
+ 2.0.2-final
@@ -72,6 +74,11 @@
hornetq-spring-integration
${project.version}
+
+ org.hornetq
+ hornetq-vertx-integration
+ ${project.version}
+
org.hornetq
hornetq-journal
@@ -113,6 +120,11 @@
org.twitter4j
twitter4j-core
+
+ com.hazelcast
+ hazelcast
+ 2.6.6
+
junit
junit
@@ -162,6 +174,31 @@
org.jboss.jbossts.jts
jbossjts-jacorb
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ vertx-platform
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ vertx-hazelcast
+ ${vertx.version}
+ provided
+
+
+ io.vertx
+ testtools
+ ${vertx.testtools.version}
+ test
+
diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/vertx/HornetQVertxUnitTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/vertx/HornetQVertxUnitTest.java
new file mode 100644
index 00000000000..73894d25c55
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/vertx/HornetQVertxUnitTest.java
@@ -0,0 +1,852 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.vertx;
+
+import java.util.HashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.ConnectorServiceConfiguration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.vertx.VertxConstants;
+import org.hornetq.integration.vertx.VertxIncomingConnectorServiceFactory;
+import org.hornetq.integration.vertx.VertxOutgoingConnectorServiceFactory;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.Vertx;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.eventbus.impl.BaseMessage;
+import org.vertx.java.core.json.JsonArray;
+import org.vertx.java.core.json.JsonObject;
+import org.vertx.java.platform.PlatformLocator;
+import org.vertx.java.platform.PlatformManager;
+import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
+
+/**
+ * This class tests the basics of HornetQ
+ * vertx integration
+ */
+public class HornetQVertxUnitTest extends ServiceTestBase
+{
+ protected PlatformManager vertxManager;
+ protected HornetQServer server;
+
+ protected String host = "localhost";
+ protected String port = "0";
+
+ protected String incomingQueue1 = "vertxTestIncomingQueue1";
+ protected String incomingVertxAddress1 = "org.hornetq.test.incoming1";
+
+ //outgoing using send
+ protected String inOutQueue1 = "vertxTestInOutQueue1";
+ protected String incomingVertxAddress2 = "org.hornetq.test.incoming2";
+ protected String outgoingVertxAddress1 = "org.hornetq.test.outgoing1";
+
+ //outgoing using publish
+ protected String inOutQueue2 = "vertxTestInOutQueue2";
+ protected String incomingVertxAddress3 = "org.hornetq.test.incoming3";
+ protected String outgoingVertxAddress2 = "org.hornetq.test.outgoing2";
+
+ //subclasses may override this method
+ //in order to get a server with different connector services
+ @Before @Override
+ public void setUp() throws Exception
+ {
+ createVertxService();
+
+ super.setUp();
+
+ Configuration configuration = createDefaultConfig(false);
+
+ //all queues
+ CoreQueueConfiguration qc = new CoreQueueConfiguration(incomingQueue1, incomingQueue1, null, true);
+ configuration.getQueueConfigurations().add(qc);
+ qc = new CoreQueueConfiguration(inOutQueue1, inOutQueue1, null, true);
+ configuration.getQueueConfigurations().add(qc);
+ qc = new CoreQueueConfiguration(inOutQueue2, inOutQueue2, null, true);
+ configuration.getQueueConfigurations().add(qc);
+
+ //incoming
+ HashMap config1 = new HashMap();
+ config1.put(VertxConstants.HOST, host);
+ config1.put(VertxConstants.PORT, port);
+ config1.put(VertxConstants.QUEUE_NAME, incomingQueue1);
+ config1.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress1);
+
+ ConnectorServiceConfiguration inconf1 =
+ new ConnectorServiceConfiguration(
+ VertxIncomingConnectorServiceFactory.class.getName(),
+ config1, "test-vertx-incoming-connector1");
+ configuration.getConnectorServiceConfigurations().add(inconf1);
+
+ //outgoing send style
+ HashMap config2 = new HashMap();
+ config2.put(VertxConstants.HOST, host);
+ config2.put(VertxConstants.PORT, port);
+ config2.put(VertxConstants.QUEUE_NAME, inOutQueue1);
+ config2.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress2);
+
+ ConnectorServiceConfiguration inconf2 =
+ new ConnectorServiceConfiguration(
+ VertxIncomingConnectorServiceFactory.class.getName(),
+ config2, "test-vertx-incoming-connector2");
+ configuration.getConnectorServiceConfigurations().add(inconf2);
+
+ HashMap config3 = new HashMap();
+ config3.put(VertxConstants.HOST, host);
+ config3.put(VertxConstants.PORT, port);
+ config3.put(VertxConstants.QUEUE_NAME, inOutQueue1);
+ config3.put(VertxConstants.VERTX_ADDRESS, outgoingVertxAddress1);
+
+ ConnectorServiceConfiguration outconf1 =
+ new ConnectorServiceConfiguration(
+ VertxOutgoingConnectorServiceFactory.class.getName(),
+ config3, "test-vertx-outgoing-connector1");
+ configuration.getConnectorServiceConfigurations().add(outconf1);
+
+ //outgoing publish style
+ HashMap config4 = new HashMap();
+ config4.put(VertxConstants.HOST, host);
+ config4.put(VertxConstants.PORT, port);
+ config4.put(VertxConstants.QUEUE_NAME, inOutQueue2);
+ config4.put(VertxConstants.VERTX_ADDRESS, incomingVertxAddress3);
+
+ ConnectorServiceConfiguration inconf3 =
+ new ConnectorServiceConfiguration(
+ VertxIncomingConnectorServiceFactory.class.getName(),
+ config4, "test-vertx-incoming-connector3");
+ configuration.getConnectorServiceConfigurations().add(inconf3);
+
+ HashMap config5 = new HashMap();
+ config5.put(VertxConstants.HOST, host);
+ config5.put(VertxConstants.PORT, port);
+ config5.put(VertxConstants.QUEUE_NAME, inOutQueue2);
+ config5.put(VertxConstants.VERTX_ADDRESS, outgoingVertxAddress2);
+ config5.put(VertxConstants.VERTX_PUBLISH, "true");
+
+ ConnectorServiceConfiguration outconf2 =
+ new ConnectorServiceConfiguration(
+ VertxOutgoingConnectorServiceFactory.class.getName(),
+ config5, "test-vertx-outgoing-connector2");
+ configuration.getConnectorServiceConfigurations().add(outconf2);
+
+ server = createServer(false, configuration);
+ server.start();
+ }
+
+ /**
+ * (vertx events) ===> (incomingQueue1) ===> (hornetq consumer)
+ * @throws Exception
+ */
+ @Test
+ public void testIncomingEvents() throws Exception
+ {
+ Vertx vertx = vertxManager.vertx();
+
+ //send a string message
+ String greeting = "Hello World!";
+ vertx.eventBus().send(incomingVertxAddress1, greeting);
+
+ ClientMessage msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ System.out.println("==== received msg: " + msg);
+
+ int vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_STRING, vertxType);
+
+ String body = msg.getBodyBuffer().readString();
+ System.out.println("==== body: " + body);
+
+ assertEquals(greeting, body);
+
+ //send a Buffer message
+ final byte[] content = greeting.getBytes("UTF-8");
+ Buffer buffer = new Buffer(content);
+ vertx.eventBus().send(incomingVertxAddress1, buffer);
+
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_BUFFER, vertxType);
+
+ HornetQBuffer hqBuf = msg.getBodyBuffer();
+ int len = hqBuf.readInt();
+ System.out.println("==== len is: " + len);
+ assertEquals(content.length, len);
+ byte[] bytes = new byte[len];
+ hqBuf.readBytes(bytes);
+
+ //bytes must match
+ for (int i = 0; i < len; i++)
+ {
+ assertEquals(content[i], bytes[i]);
+ }
+
+ //send a boolean
+ vertx.eventBus().send(incomingVertxAddress1, Boolean.TRUE);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_BOOLEAN, vertxType);
+
+ Boolean booleanValue = msg.getBodyBuffer().readBoolean();
+ assertEquals(Boolean.TRUE, booleanValue);
+
+ //send a byte array
+ vertx.eventBus().send(incomingVertxAddress1, content);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_BYTEARRAY, vertxType);
+
+ len = msg.getBodyBuffer().readInt();
+ byte[] recvBytes = new byte[len];
+ msg.getBodyBuffer().readBytes(recvBytes);
+ //bytes must match
+ for (int i = 0; i < len; i++)
+ {
+ assertEquals(content[i], recvBytes[i]);
+ }
+
+ //send a byte
+ Byte aByte = new Byte((byte)15);
+ vertx.eventBus().send(incomingVertxAddress1, aByte);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_BYTE, vertxType);
+
+ Byte recvByte = msg.getBodyBuffer().readByte();
+ assertEquals(aByte, recvByte);
+
+ //send a Character
+ Character aChar = new Character('a');
+ vertx.eventBus().send(incomingVertxAddress1, aChar);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_CHARACTER, vertxType);
+ Character recvChar = msg.getBodyBuffer().readChar();
+ assertEquals(aChar, recvChar);
+
+ //send a Double
+ Double aDouble = new Double(1234.56d);
+ vertx.eventBus().send(incomingVertxAddress1, aDouble);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_DOUBLE, vertxType);
+ Double recvDouble = msg.getBodyBuffer().readDouble();
+ assertEquals(aDouble, recvDouble);
+
+ //send a Float
+ Float aFloat = new Float(1234.56f);
+ vertx.eventBus().send(incomingVertxAddress1, aFloat);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_FLOAT, vertxType);
+ Float recvFloat = msg.getBodyBuffer().readFloat();
+ assertEquals(aFloat, recvFloat);
+
+ //send a Integer
+ Integer aInt = new Integer(1234);
+ vertx.eventBus().send(incomingVertxAddress1, aInt);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_INT, vertxType);
+ Integer recvInt = msg.getBodyBuffer().readInt();
+ assertEquals(aInt, recvInt);
+
+ //send a Long
+ Long aLong = new Long(12345678L);
+ vertx.eventBus().send(incomingVertxAddress1, aLong);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_LONG, vertxType);
+ Long recvLong = msg.getBodyBuffer().readLong();
+ assertEquals(aLong, recvLong);
+
+ //send a Short
+ Short aShort = new Short((short)321);
+ vertx.eventBus().send(incomingVertxAddress1, aShort);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_SHORT, vertxType);
+ Short recvShort = msg.getBodyBuffer().readShort();
+ assertEquals(aShort, recvShort);
+
+ //send a JsonObject
+ String jsonObjectString = "{\n" +
+ "\"Image\": {\n" +
+ "\"Width\": 800,\n" +
+ "\"Height\": 600,\n" +
+ "\"Title\": \"View from 15th Floor\",\n" +
+ "\"Thumbnail\": {\n" +
+ "\"Url\": \"http://www.example.com/image/481989943\",\n" +
+ "\"Height\": 125,\n" +
+ "\"Width\": 100\n" +
+ "},\n" +
+ "\"IDs\": [116, 943, 234, 38793]\n" +
+ "}\n" +
+ "}";
+ JsonObject aJsonObj = new JsonObject(jsonObjectString);
+ vertx.eventBus().send(incomingVertxAddress1, aJsonObj);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_JSON_OBJECT, vertxType);
+ String recvJsonString = msg.getBodyBuffer().readString();
+ System.out.println("==== received json: " + recvJsonString);
+ assertEquals(aJsonObj, new JsonObject(recvJsonString));
+
+ //send a JsonArray
+ String jsonArrayString = "[\n" +
+ "{\n" +
+ "\"precision\": \"zip\",\n" +
+ "\"Latitude\": 37.7668,\n" +
+ "\"Longitude\": -122.3959,\n" +
+ "\"Address\": \"\",\n" +
+ "\"City\": \"SAN FRANCISCO\",\n" +
+ "\"State\": \"CA\",\n" +
+ "\"Zip\": \"94107\",\n" +
+ "\"Country\": \"US\"\n" +
+ "},\n" +
+ "{\n" +
+ "\"precision\": \"zip\",\n" +
+ "\"Latitude\": 37.371991,\n" +
+ "\"Longitude\": -122.026020,\n" +
+ "\"Address\": \"\",\n" +
+ "\"City\": \"SUNNYVALE\",\n" +
+ "\"State\": \"CA\",\n" +
+ "\"Zip\": \"94085\",\n" +
+ "\"Country\": \"US\"\n" +
+ "}\n" +
+ "]";
+ JsonArray aJsonArray = new JsonArray(jsonArrayString);
+ System.out.println("a json array string: " + aJsonArray);
+ vertx.eventBus().send(incomingVertxAddress1, aJsonArray);
+
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_JSON_ARRAY, vertxType);
+ recvJsonString = msg.getBodyBuffer().readString();
+ System.out.println("==== received json: " + recvJsonString);
+ assertEquals(aJsonArray, new JsonArray(recvJsonString));
+
+ //send a ReplyFailure
+ /*
+ ReplyFailure replyFailure = ReplyFailure.TIMEOUT;
+ int fakeFailureCode = 1234;
+ String failureMsg = "Test failure message";
+ ReplyException aReplyEx = new ReplyException(replyFailure, fakeFailureCode, failureMsg);
+ vertx.eventBus().send(incomingVertxAddress1, aReplyEx);
+ msg = receiveFromQueue(incomingQueue1);
+ assertNotNull(msg);
+ vertxType = msg.getIntProperty(VertxConstants.VERTX_MESSAGE_TYPE);
+ assertEquals(VertxConstants.TYPE_REPLY_FAILURE, vertxType);
+ int recvType = msg.getBodyBuffer().readInt();
+ int recvCode = msg.getBodyBuffer().readInt();
+ String recvFailureMsg = msg.getBodyBuffer().readString();
+ assertEquals(replyFailure.toInt(), recvType);
+ assertEquals(fakeFailureCode, recvCode);
+ assertEquals(failureMsg, recvFailureMsg);
+ */
+ }
+
+ /**
+ * vertx events (incomingVertxAddress2)
+ * ===> (inOutQueue1)
+ * ===> (outgoing handler)
+ * ===> send to vertx (outgoingVertxAddress1)
+ * @throws Exception
+ */
+ @Test
+ public void testOutgoingEvents() throws Exception
+ {
+ Vertx vertx = vertxManager.vertx();
+
+ //regiser a handler to receive outgoing messages
+ VertxTestHandler handler = new VertxTestHandler();
+ vertx.eventBus().registerHandler(outgoingVertxAddress1, handler);
+
+ //send a string message
+ String greeting = "Hello World!";
+ vertx.eventBus().send(incomingVertxAddress2, greeting);
+
+ //check message in handler
+ handler.checkStringMessageReceived(greeting);
+
+ //send a Buffer message
+ final byte[] content = greeting.getBytes("UTF-8");
+ Buffer buffer = new Buffer(content);
+ vertx.eventBus().send(incomingVertxAddress2, buffer);
+
+ handler.checkBufferMessageReceived(buffer);
+
+ //send a boolean
+ Boolean boolValue = Boolean.TRUE;
+ vertx.eventBus().send(incomingVertxAddress2, boolValue);
+
+ handler.checkBooleanMessageReceived(boolValue);
+
+ byte[] byteArray = greeting.getBytes("UTF-8");
+ vertx.eventBus().send(incomingVertxAddress2, byteArray);
+
+ handler.checkByteArrayMessageReceived(byteArray);
+
+ //send a byte
+ Byte aByte = new Byte((byte)15);
+ vertx.eventBus().send(incomingVertxAddress2, aByte);
+
+ handler.checkByteMessageReceived(aByte);
+
+ //send a Character
+ Character aChar = new Character('a');
+ vertx.eventBus().send(incomingVertxAddress2, aChar);
+
+ handler.checkCharacterMessageReceived(aChar);
+
+ //send a Double
+ Double aDouble = new Double(1234.56d);
+ vertx.eventBus().send(incomingVertxAddress2, aDouble);
+
+ handler.checkDoubleMessageReceived(aDouble);
+
+ //send a Float
+ Float aFloat = new Float(1234.56f);
+ vertx.eventBus().send(incomingVertxAddress2, aFloat);
+
+ handler.checkFloatMessageReceived(aFloat);
+
+ //send a Integer
+ Integer aInt = new Integer(1234);
+ vertx.eventBus().send(incomingVertxAddress2, aInt);
+
+ handler.checkIntegerMessageReceived(aInt);
+
+ //send a Long
+ Long aLong = new Long(12345678L);
+ vertx.eventBus().send(incomingVertxAddress2, aLong);
+
+ handler.checkLongMessageReceived(aLong);
+
+ //send a Short
+ Short aShort = new Short((short)321);
+ vertx.eventBus().send(incomingVertxAddress2, aShort);
+
+ handler.checkShortMessageReceived(aShort);
+
+ //send a JsonObject
+ String jsonObjectString = "{\n" +
+ "\"Image\": {\n" +
+ "\"Width\": 800,\n" +
+ "\"Height\": 600,\n" +
+ "\"Title\": \"View from 15th Floor\",\n" +
+ "\"Thumbnail\": {\n" +
+ "\"Url\": \"http://www.example.com/image/481989943\",\n" +
+ "\"Height\": 125,\n" +
+ "\"Width\": 100\n" +
+ "},\n" +
+ "\"IDs\": [116, 943, 234, 38793]\n" +
+ "}\n" +
+ "}";
+ JsonObject aJsonObj = new JsonObject(jsonObjectString);
+ vertx.eventBus().send(incomingVertxAddress2, aJsonObj);
+
+ handler.checkJsonObjectMessageReceived(aJsonObj);
+
+ //send a JsonArray
+ String jsonArrayString = "[\n" +
+ "{\n" +
+ "\"precision\": \"zip\",\n" +
+ "\"Latitude\": 37.7668,\n" +
+ "\"Longitude\": -122.3959,\n" +
+ "\"Address\": \"\",\n" +
+ "\"City\": \"SAN FRANCISCO\",\n" +
+ "\"State\": \"CA\",\n" +
+ "\"Zip\": \"94107\",\n" +
+ "\"Country\": \"US\"\n" +
+ "},\n" +
+ "{\n" +
+ "\"precision\": \"zip\",\n" +
+ "\"Latitude\": 37.371991,\n" +
+ "\"Longitude\": -122.026020,\n" +
+ "\"Address\": \"\",\n" +
+ "\"City\": \"SUNNYVALE\",\n" +
+ "\"State\": \"CA\",\n" +
+ "\"Zip\": \"94085\",\n" +
+ "\"Country\": \"US\"\n" +
+ "}\n" +
+ "]";
+ JsonArray aJsonArray = new JsonArray(jsonArrayString);
+ vertx.eventBus().send(incomingVertxAddress2, aJsonArray);
+
+ handler.checkJsonArrayMessageReceived(aJsonArray);
+ }
+
+ /**
+ * vertx events (incomingVertxAddress3)
+ * ===> (inOutQueue2)
+ * ===> (outgoing handler)
+ * ===> public to vertx (outgoingVertxAddress2)
+ * @throws Exception
+ */
+ @Test
+ public void testOutgoingEvents2() throws Exception
+ {
+ Vertx vertx = vertxManager.vertx();
+
+ //regiser two handlers to receive outgoing messages
+ VertxTestHandler handler1 = new VertxTestHandler();
+ vertx.eventBus().registerHandler(outgoingVertxAddress2, handler1);
+ VertxTestHandler handler2 = new VertxTestHandler();
+ vertx.eventBus().registerHandler(outgoingVertxAddress2, handler2);
+
+ //send a string message
+ String greeting = "Hello World!";
+ vertx.eventBus().send(incomingVertxAddress3, greeting);
+
+ //check message in handler
+ handler1.checkStringMessageReceived(greeting);
+ handler2.checkStringMessageReceived(greeting);
+
+ //send a Buffer message
+ final byte[] content = greeting.getBytes("UTF-8");
+ Buffer buffer = new Buffer(content);
+ vertx.eventBus().send(incomingVertxAddress3, buffer);
+
+ handler1.checkBufferMessageReceived(buffer);
+ handler2.checkBufferMessageReceived(buffer);
+
+ //send a boolean
+ Boolean boolValue = Boolean.TRUE;
+ vertx.eventBus().send(incomingVertxAddress3, boolValue);
+
+ handler1.checkBooleanMessageReceived(boolValue);
+ handler2.checkBooleanMessageReceived(boolValue);
+
+ byte[] byteArray = greeting.getBytes("UTF-8");
+ vertx.eventBus().send(incomingVertxAddress3, byteArray);
+
+ handler1.checkByteArrayMessageReceived(byteArray);
+ handler2.checkByteArrayMessageReceived(byteArray);
+
+ //send a byte
+ Byte aByte = new Byte((byte)15);
+ vertx.eventBus().send(incomingVertxAddress3, aByte);
+
+ handler1.checkByteMessageReceived(aByte);
+ handler2.checkByteMessageReceived(aByte);
+
+ //send a Character
+ Character aChar = new Character('a');
+ vertx.eventBus().send(incomingVertxAddress3, aChar);
+
+ handler1.checkCharacterMessageReceived(aChar);
+ handler2.checkCharacterMessageReceived(aChar);
+
+ //send a Double
+ Double aDouble = new Double(1234.56d);
+ vertx.eventBus().send(incomingVertxAddress3, aDouble);
+
+ handler1.checkDoubleMessageReceived(aDouble);
+ handler2.checkDoubleMessageReceived(aDouble);
+
+ //send a Float
+ Float aFloat = new Float(1234.56f);
+ vertx.eventBus().send(incomingVertxAddress3, aFloat);
+
+ handler1.checkFloatMessageReceived(aFloat);
+ handler2.checkFloatMessageReceived(aFloat);
+
+ //send a Integer
+ Integer aInt = new Integer(1234);
+ vertx.eventBus().send(incomingVertxAddress3, aInt);
+
+ handler1.checkIntegerMessageReceived(aInt);
+ handler2.checkIntegerMessageReceived(aInt);
+
+ //send a Long
+ Long aLong = new Long(12345678L);
+ vertx.eventBus().send(incomingVertxAddress3, aLong);
+
+ handler1.checkLongMessageReceived(aLong);
+ handler2.checkLongMessageReceived(aLong);
+
+ //send a Short
+ Short aShort = new Short((short)321);
+ vertx.eventBus().send(incomingVertxAddress3, aShort);
+
+ handler1.checkShortMessageReceived(aShort);
+ handler2.checkShortMessageReceived(aShort);
+
+ //send a JsonObject
+ String jsonObjectString = "{\n" +
+ "\"Image\": {\n" +
+ "\"Width\": 800,\n" +
+ "\"Height\": 600,\n" +
+ "\"Title\": \"View from 15th Floor\",\n" +
+ "\"Thumbnail\": {\n" +
+ "\"Url\": \"http://www.example.com/image/481989943\",\n" +
+ "\"Height\": 125,\n" +
+ "\"Width\": 100\n" +
+ "},\n" +
+ "\"IDs\": [116, 943, 234, 38793]\n" +
+ "}\n" +
+ "}";
+ JsonObject aJsonObj = new JsonObject(jsonObjectString);
+ vertx.eventBus().send(incomingVertxAddress3, aJsonObj);
+
+ handler1.checkJsonObjectMessageReceived(aJsonObj);
+ handler2.checkJsonObjectMessageReceived(aJsonObj);
+
+ //send a JsonArray
+ String jsonArrayString = "[\n" +
+ "{\n" +
+ "\"precision\": \"zip\",\n" +
+ "\"Latitude\": 37.7668,\n" +
+ "\"Longitude\": -122.3959,\n" +
+ "\"Address\": \"\",\n" +
+ "\"City\": \"SAN FRANCISCO\",\n" +
+ "\"State\": \"CA\",\n" +
+ "\"Zip\": \"94107\",\n" +
+ "\"Country\": \"US\"\n" +
+ "},\n" +
+ "{\n" +
+ "\"precision\": \"zip\",\n" +
+ "\"Latitude\": 37.371991,\n" +
+ "\"Longitude\": -122.026020,\n" +
+ "\"Address\": \"\",\n" +
+ "\"City\": \"SUNNYVALE\",\n" +
+ "\"State\": \"CA\",\n" +
+ "\"Zip\": \"94085\",\n" +
+ "\"Country\": \"US\"\n" +
+ "}\n" +
+ "]";
+ JsonArray aJsonArray = new JsonArray(jsonArrayString);
+ vertx.eventBus().send(incomingVertxAddress3, aJsonArray);
+
+ handler1.checkJsonArrayMessageReceived(aJsonArray);
+ handler2.checkJsonArrayMessageReceived(aJsonArray);
+
+ }
+
+ private ClientMessage receiveFromQueue(String queueName) throws Exception
+ {
+ ClientMessage msg = null;
+
+ ServerLocator locator = null;
+ ClientSessionFactory sf = null;
+ ClientSession session = null;
+
+ try
+ {
+ TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
+ locator = HornetQClient.createServerLocatorWithoutHA(tpconf);
+
+ sf = createSessionFactory(locator);
+
+ session = sf.createSession(false, true, true);
+ ClientConsumer consumer = session.createConsumer(queueName);
+ session.start();
+ msg = consumer.receive(60 * 1000);
+ msg.acknowledge();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.commit();
+ session.close();
+ }
+ if (sf != null) sf.close();
+ if (locator != null) locator.close();
+ }
+ return msg;
+ }
+
+ private void createVertxService()
+ {
+ System.setProperty("vertx.clusterManagerFactory", HazelcastClusterManagerFactory.class.getName());
+ vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port),
+ host);
+
+// vertxManager = PlatformLocator.factory.createPlatformManager(Integer.valueOf(port),
+// host, quorumSize, haGroup + System.currentTimeMillis());
+ }
+
+ private class VertxTestHandler implements Handler>
+ {
+ private volatile BaseMessage> vertxMsg = null;
+ private Object lock = new Object();
+
+ @Override
+ public void handle(BaseMessage> arg0)
+ {
+ synchronized (lock)
+ {
+ vertxMsg = arg0;
+ lock.notify();
+ }
+ }
+
+ public void checkJsonArrayMessageReceived(JsonArray aJsonArray)
+ {
+ BaseMessage> msg = waitMessage();
+ JsonArray body = (JsonArray)msg.body();
+ assertEquals(aJsonArray, body);
+ }
+
+ public void checkJsonObjectMessageReceived(final JsonObject aJsonObj)
+ {
+ BaseMessage> msg = waitMessage();
+ JsonObject body = (JsonObject)msg.body();
+ assertEquals(aJsonObj, body);
+ }
+
+ public void checkShortMessageReceived(final Short aShort)
+ {
+ BaseMessage> msg = waitMessage();
+ Short body = (Short)msg.body();
+ assertEquals(aShort, body);
+ }
+
+ public void checkLongMessageReceived(final Long aLong)
+ {
+ BaseMessage> msg = waitMessage();
+ Long body = (Long)msg.body();
+ assertEquals(aLong, body);
+ }
+
+ public void checkIntegerMessageReceived(final Integer aInt)
+ {
+ BaseMessage> msg = waitMessage();
+ Integer body = (Integer)msg.body();
+ assertEquals(aInt, body);
+ }
+
+ public void checkFloatMessageReceived(final Float aFloat)
+ {
+ BaseMessage> msg = waitMessage();
+ Float body = (Float)msg.body();
+ assertEquals(aFloat, body);
+ }
+
+ public void checkDoubleMessageReceived(final Double aDouble)
+ {
+ BaseMessage> msg = waitMessage();
+ Double body = (Double)msg.body();
+ assertEquals(aDouble, body);
+ }
+
+ public void checkCharacterMessageReceived(final Character aChar)
+ {
+ BaseMessage> msg = waitMessage();
+ Character body = (Character)msg.body();
+ assertEquals(aChar, body);
+ }
+
+ public void checkByteMessageReceived(final Byte aByte)
+ {
+ BaseMessage> msg = waitMessage();
+ Byte body = (Byte)msg.body();
+ assertEquals(aByte, body);
+ }
+
+ public void checkByteArrayMessageReceived(final byte[] byteArray)
+ {
+ BaseMessage> msg = waitMessage();
+ byte[] body = (byte[])msg.body();
+ assertEquals(byteArray.length, body.length);
+ for (int i = 0; i < byteArray.length; i++)
+ {
+ assertEquals(byteArray[i], body[i]);
+ }
+ }
+
+ public void checkBooleanMessageReceived(final Boolean boolValue)
+ {
+ BaseMessage> msg = waitMessage();
+ Boolean body = (Boolean)msg.body();
+ assertEquals(boolValue, body);
+ }
+
+ public void checkStringMessageReceived(final String str)
+ {
+ BaseMessage> msg = waitMessage();
+ String body = (String)msg.body();
+ assertEquals(str, body);
+ }
+
+ public void checkBufferMessageReceived(final Buffer buffer)
+ {
+ byte[] source = buffer.getBytes();
+ BaseMessage> msg = waitMessage();
+ Buffer body = (Buffer)msg.body();
+ byte[] bytes = body.getBytes();
+ assertEquals(source.length, bytes.length);
+ for (int i = 0; i < bytes.length; i++)
+ {
+ assertEquals(source[i], bytes[i]);
+ }
+ }
+
+ private BaseMessage> waitMessage()
+ {
+ BaseMessage> msg = null;
+ synchronized (lock)
+ {
+ if (vertxMsg == null)
+ {
+ try
+ {
+ lock.wait(10000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ msg = vertxMsg;
+ vertxMsg = null;
+ }
+ assertNotNull("Message didn't arrive after 10 seconds.", msg);
+ return msg;
+ }
+
+ }
+
+ @After @Override
+ public void tearDown() throws Exception
+ {
+ vertxManager.stop();
+ server.stop();
+ server = null;
+ }
+}