-
Notifications
You must be signed in to change notification settings - Fork 33
/
DirectMDBSession.java
100 lines (88 loc) · 3.78 KB
/
DirectMDBSession.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Copyright (c) 2000, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package com.sun.messaging.jms.ra;
import jakarta.jms.*;
import com.sun.messaging.jmq.jmsservice.JMSAck;
import com.sun.messaging.jmq.io.JMSPacket;
import com.sun.messaging.jmq.jmsservice.JMSService;
import com.sun.messaging.jmq.jmsservice.JMSService.SessionAckMode;
import com.sun.messaging.jmq.jmsservice.ConsumerClosedNoDeliveryException;
public class DirectMDBSession extends DirectSession {
/**
* Logging
*/
private static final String _className = "com.sun.messaging.jms.ra.DirectMDBSession";
/** Creates a new instance of DirectMDBSession */
public DirectMDBSession(DirectConnection dc, JMSService jmsservice, long sessionId, SessionAckMode ackMode) throws JMSException {
super(dc, jmsservice, sessionId, ackMode);
}
@Override
protected void _initSession() {
_loggerOC.entering(_className, "constructor():_init()");
}
/**
* Deliver a message from this DirectSession - only one thread can do this at a time.
*
* @throws ConsumerClosedNoDeliveryException
*/
@Override
protected synchronized JMSAck _deliverMessage(jakarta.jms.MessageListener msgListener, JMSPacket jmsPacket, long consumerId) {
JMSAck jmsAck = null;
if (this.enableThreadCheck) {
// Relies on the *same* thread being used to deliver all messages
// while this sesion is alive
long tId = Thread.currentThread().getId();
if (this.deliverThreadId == 0L) {
// first time
this.deliverThreadId = tId;
} else {
if (this.deliverThreadId != tId) {
throw new RuntimeException("Invalid to call deliver from two different threads!");
}
}
}
jakarta.jms.Message jmsMsg = null;
if (msgListener == null) {
throw new RuntimeException("DirectConsumer:MessageListener not set!");
}
if (jmsPacket == null) {
throw new RuntimeException("DirectConsumer:JMSPacket is null!");
}
try {
jmsMsg = DirectPacket.constructMessage(jmsPacket, consumerId, this, this.jmsservice, false);
} catch (Exception e) {
}
if (jmsMsg == null) {
throw new RuntimeException("DirectConsumer:JMS Message in Packet is null!");
}
try {
this.inDeliver = true;
msgListener.onMessage(jmsMsg);
// this.ds._deliverMessage(this.msgListener, jmsMsg);
this.inDeliver = false;
if (this.ackMode != SessionAckMode.CLIENT_ACKNOWLEDGE) {
jmsAck = new DirectAck(this.connectionId, this.sessionId, consumerId, ((DirectPacket) jmsMsg).getReceivedSysMessageID(),
JMSService.MessageAckType.ACKNOWLEDGE);
}
} catch (Exception e) {
System.out.println("DirectConsumer:Caught Exception delivering message" + e.getMessage());
}
return jmsAck;
}
protected synchronized void _acknowledgeMDBMessage() throws Exception {
}
}