Skip to content

Commit ea181fd

Browse files
committed
Spring jmstemplate
1 parent af885f9 commit ea181fd

File tree

1 file changed

+201
-0
lines changed

1 file changed

+201
-0
lines changed
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
# Spring JmsTemplate
2+
- Author: [HuiFer](https://github.com/huifer)
3+
- 源码阅读仓库: [SourceHot-spring](https://github.com/SourceHot/spring-framework-read)
4+
- 源码路径: `org.springframework.jms.core.JmsTemplate`
5+
6+
7+
## 源码分析
8+
### send 发送消息
9+
10+
```java
11+
@Override
12+
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
13+
// 执行.
14+
execute(session -> {
15+
Destination destination = resolveDestinationName(session, destinationName);
16+
doSend(session, destination, messageCreator);
17+
return null;
18+
}, false);
19+
}
20+
21+
```
22+
23+
```java
24+
@Nullable
25+
public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
26+
Assert.notNull(action, "Callback object must not be null");
27+
Connection conToClose = null;
28+
Session sessionToClose = null;
29+
try {
30+
Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
31+
obtainConnectionFactory(), this.transactionalResourceFactory, startConnection);
32+
if (sessionToUse == null) {
33+
// 创建链接
34+
conToClose = createConnection();
35+
// 创建session
36+
sessionToClose = createSession(conToClose);
37+
if (startConnection) {
38+
conToClose.start();
39+
}
40+
sessionToUse = sessionToClose;
41+
}
42+
if (logger.isDebugEnabled()) {
43+
logger.debug("Executing callback on JMS Session: " + sessionToUse);
44+
}
45+
/**
46+
* sessionCallback 执行
47+
* {@link JmsTemplate#doSend(Session, javax.jms.Destination, org.springframework.jms.core.MessageCreator)}
48+
*/
49+
return action.doInJms(sessionToUse);
50+
} catch (JMSException ex) {
51+
throw convertJmsAccessException(ex);
52+
} finally {
53+
// 资源释放
54+
JmsUtils.closeSession(sessionToClose);
55+
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
56+
}
57+
}
58+
59+
```
60+
61+
- 最后`action.doInJms(sessionToUse)`的操作
62+
```java
63+
Destination destination = resolveDestinationName(session, destinationName);
64+
doSend(session, destination, messageCreator);
65+
return null;
66+
```
67+
68+
- `doSend`真正做的发送方法
69+
```java
70+
protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
71+
throws JMSException {
72+
73+
Assert.notNull(messageCreator, "MessageCreator must not be null");
74+
75+
// 创建消息生产者
76+
MessageProducer producer = createProducer(session, destination);
77+
try {
78+
// 创建消息
79+
Message message = messageCreator.createMessage(session);
80+
if (logger.isDebugEnabled()) {
81+
logger.debug("Sending created message: " + message);
82+
}
83+
// 发送
84+
doSend(producer, message);
85+
// Check commit - avoid commit call within a JTA transaction.
86+
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
87+
// Transacted session created by this template -> commit.
88+
JmsUtils.commitIfNecessary(session);
89+
}
90+
} finally {
91+
// 关闭消息生产者
92+
JmsUtils.closeMessageProducer(producer);
93+
}
94+
}
95+
96+
```
97+
1. `createProducer`中通过`javax.jms.Session.createProducer`创建`MessageProducer`,第三方消息中间件独立实现
98+
2. `createMessage`
99+
```java
100+
@Override
101+
public javax.jms.Message createMessage(Session session) throws JMSException {
102+
try {
103+
// 消息转换
104+
return this.messageConverter.toMessage(this.message, session);
105+
} catch (Exception ex) {
106+
throw new MessageConversionException("Could not convert '" + this.message + "'", ex);
107+
}
108+
}
109+
```
110+
- 消息转换后续在更新
111+
3. `doSend` 这里也是第三方消息中间件实现
112+
```java
113+
protected void doSend(MessageProducer producer, Message message) throws JMSException {
114+
if (this.deliveryDelay >= 0) {
115+
producer.setDeliveryDelay(this.deliveryDelay);
116+
}
117+
if (isExplicitQosEnabled()) {
118+
// 发送消息,第三方消息中间件实现
119+
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
120+
} else {
121+
producer.send(message);
122+
}
123+
}
124+
```
125+
4. `closeMessageProducer` 这个方法特别,直接关闭
126+
```java
127+
public static void closeMessageProducer(@Nullable MessageProducer producer) {
128+
if (producer != null) {
129+
try {
130+
producer.close();
131+
} catch (JMSException ex) {
132+
logger.trace("Could not close JMS MessageProducer", ex);
133+
} catch (Throwable ex) {
134+
// We don't trust the JMS provider: It might throw RuntimeException or Error.
135+
logger.trace("Unexpected exception on closing JMS MessageProducer", ex);
136+
}
137+
}
138+
}
139+
140+
```
141+
142+
### receive 接收消息
143+
```java
144+
@Override
145+
@Nullable
146+
public Message receive(String destinationName) throws JmsException {
147+
return receiveSelected(destinationName, null);
148+
}
149+
@Override
150+
@Nullable
151+
public Message receiveSelected(final String destinationName, @Nullable final String messageSelector) throws JmsException {
152+
return execute(session -> {
153+
Destination destination = resolveDestinationName(session, destinationName);
154+
return doReceive(session, destination, messageSelector);
155+
}, true);
156+
}
157+
@Nullable
158+
protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
159+
throws JMSException {
160+
161+
return doReceive(session, createConsumer(session, destination, messageSelector));
162+
}
163+
@Nullable
164+
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
165+
try {
166+
// Use transaction timeout (if available).
167+
long timeout = getReceiveTimeout();
168+
// 链接工厂
169+
ConnectionFactory connectionFactory = getConnectionFactory();
170+
// JMS 资源信息
171+
JmsResourceHolder resourceHolder = null;
172+
if (connectionFactory != null) {
173+
// 从连接对象中获取JMS 资源信息
174+
resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
175+
}
176+
if (resourceHolder != null && resourceHolder.hasTimeout()) {
177+
// 超时时间
178+
timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
179+
}
180+
// 具体的消息
181+
Message message = receiveFromConsumer(consumer, timeout);
182+
if (session.getTransacted()) {
183+
// 事务性操作
184+
// Commit necessary - but avoid commit call within a JTA transaction.
185+
if (isSessionLocallyTransacted(session)) {
186+
// Transacted session created by this template -> commit.
187+
JmsUtils.commitIfNecessary(session);
188+
}
189+
} else if (isClientAcknowledge(session)) {
190+
// Manually acknowledge message, if any.
191+
if (message != null) {
192+
message.acknowledge();
193+
}
194+
}
195+
return message;
196+
} finally {
197+
JmsUtils.closeMessageConsumer(consumer);
198+
}
199+
}
200+
201+
```

0 commit comments

Comments
 (0)