Switch branches/tags
Nothing to show
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
..
Failed to load latest commit information.
AzureServiceBusService.java
AzureServiceBusServiceFactory.java
MessageListener.java
MessageListenerContainer.java
README.md

README.md

##Use AMQP with Microsoft Azure Service Bus Queue

Microsoft Azure Service Bus 是微软提供的消息服务总线云服务,支持消息队列, 主题订阅,点对点消息, Event Hubs功能。类似于JAVA的ActiveMQ的功能, Service Bus是支持AMQP 1.0协议的,所以基于AMQP 1.0协议实现的JMS客户端均都可以连接 连接 Service Bus发送接受消息。

本文将采用Apache Qpid 来连接Service Bus 的消息队列,简单实现一个发送接收消息队列的功能。

实现类图: QQ图片20160224113228.png

代码结构说明 AzureServiceBusServiceFactory 单例类,初始化Service Bus服务,并创建AzureServiceBusService AzureServiceBusService 简单封装Service Bus接口, 发送消息,注册接收消息监听器 MessageListener 接收消息监听处理接口 MessageListenerContainer 维护Service Bus 队列和监听器的对应关系,并启动线程接收Service Bus消息

代码下载: 地址

使用类库

  1. Apache Maven
  2. Apache Qpid JMS Library

配置maven pom文件,引入Qpid JMS依赖

<dependency>
	<groupId>org.apache.qpid</groupId>
	<artifactId>qpid-amqp-1-0-client</artifactId>
	<version>0.32</version>
</dependency>
<dependency>
	<groupId>org.apache.qpid</groupId>
	<artifactId>qpid-amqp-1-0-client-jms</artifactId>
	<version>0.32</version>
</dependency>
<dependency>
	<groupId>org.apache.qpid</groupId>
	<artifactId>qpid-amqp-1-0-common</artifactId>
	<version>0.32</version>
</dependency>	
<dependency>
	<groupId>org.apache.geronimo.specs</groupId>
	<artifactId>geronimo-jms_1.1_spec</artifactId>
	<version>1.1.1</version>
</dependency>

初始化Service Bus Connection Factory

使用JNDI 初始化Apache Qpid JMS Connection Factory,注意代码需要进行URL-encoded,具体请参见encode方法。

//AzureServiceBusServiceFactory.java 代码片段
PropertiesFactoryBean properties = SpringUtil.getBean(PropertiesFactoryBean.class);
this.username = (String) properties.getObject().get("azure.servicebus.username");
this.password = (String) properties.getObject().get("azure.servicebus.password");
this.host = (String) properties.getObject().get("azure.servicebus.host");
String defaultQueue = (String) properties.getObject().get("azure.servicebus.queue");
				
String connectionString = "amqps://"+username+":" + encode(password) + "@" + host;
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put("connectionfactory.ServiceBusConnectionFactory", connectionString);
				
Context context = new InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("ServiceBusConnectionFactory");

使用JMS 发送消息

这段代码展示如何向Service Bus Queue 发送消息.

//AzureServiceBusService.java 代码片段
public void sendTextMessage(String queue, String message) {
	Connection connection = null;
	Session session = null;
	MessageProducer producer = null;
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		producer = session.createProducer(QueueImpl.createQueue(queue));
		producer.send(session.createTextMessage(message));
	} catch (JMSException e) {
		e.printStackTrace();
	} finally {
		try {
			session.close();
			producer.close();
			connection.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

使用JMS 接收消息

这段代码展示如何从Service Bus Queue 接收消息.

//MessageListenerContainer 代码片段
private class ReceiveRunnable implements Runnable {

	@Override
	public void run() {
		while(true) {
			if(!queuelisteners.isEmpty()) {
				try {
					Connection connection = connectFactory.createConnection();
					connection.start();
					for(String queue : queuelisteners.keySet()) {
						Message got = this.receiveMessage(connection, queue, DEFAULT_TIMEOUT);
						if(got != null) {
						    // 分发给对应的监听器
							for(MessageListener listener : queuelisteners.get(queue)) {
								listener.receive(got);
							}
						}
					}
					connection.close();
					Thread.sleep(1000);
				} catch (JMSException | InterruptedException e) {
					e.printStackTrace();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	private Message receiveMessage(Connection connection, String queueName, Long timeout) throws JMSException {
	    // 接收消息
		Session receiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		MessageConsumer receiveConsumer = receiveSession.createConsumer(QueueImpl.createQueue(queueName));
		Message got = receiveConsumer.receive(timeout);
		if (got != null) {
			return got;
		}
		receiveConsumer.close();
		receiveSession.close();
		return null;
	}
	
}

如何构建稳健的程序

JMS规范定义了如何编写捕获JMS异常的方法,这里有几点需要注意,示例代码中只是简单实现,并未实现对异常的处理。

  1. 注册 ExceptionListener, JMS规范规定可以对 JMS connection 注册异常监听器,这样客户端就可以监听连接是否正常,这样开发者就可以判断是否需要重新创建connection , Session, MessageProducer and MessageConsumer。

  2. 验证消息是否发送成功。确保已经配置qpid.sync_publish 这个系统属性。设置这个属性之后,程序在发送消息后,会等待发送结果反馈后才返回,如果有异常出现,程序将会抛出JMSException。触发异常的通常是以下两种情况:

  • Service bus 拒绝发送的消息,会反馈MessageRejectedException异常。这个消息将会被Service Bus忽略。
  • 如果Service bus 关闭了JMS的连接,会反馈InvalidDestinationException 异常。这时就需要重新创建连接,并重新发送消息。

using AMQP with Service Bus Topic

使用Topic和使用Queue类似,不同的是,两者发送的目标不一样,而且Topic 消息只有相应的订阅者才能够接受消息。

发送消息

public void sendTextMessage(String topic, String message) {
	Connection connection = null;
	Session session = null;
	MessageProducer producer = null;
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		producer = session.createProducer(TopicImpl.createTopic(topic));
		producer.send(session.createTextMessage(message));
	} catch (JMSException e) {
		e.printStackTrace();
	} finally {
		try {
			session.close();
			producer.close();
			connection.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}

接受消息

Session receiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = TopicImpl.createTopic(topicName);
MessageConsumer receiveConsumer = receiveSession.createConsumer(topic);
TopicSubscriber subscriber = receiveSession.createDurableSubscriber(topic, "subscription1");
Message got = subscriber.receive(timeout);
if (got != null) {
	return got;
}
receiveConsumer.close();
receiveSession.close();
return null;

Troubleshoot

PKIX:unable to find valid certification path to requested target

请参见: