Skip to content

Commit

Permalink
JAMES-1890 reuse JMS connection as it's a thread-safe object
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaechler committed Dec 19, 2016
1 parent 9aff85c commit 52316f6
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 91 deletions.
Expand Up @@ -288,17 +288,13 @@ protected Message copy(Session session, Message m) throws JMSException {
@Override
public long getSize() throws MailQueueException {

Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
MessageProducer producer = null;
TemporaryQueue replyTo = null;
long size;

try {
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
replyTo = session.createTemporaryQueue();
consumer = session.createConsumer(replyTo);
Expand Down Expand Up @@ -364,13 +360,6 @@ public long getSize() throws MailQueueException {
} catch (JMSException e1) {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
}

// if we came to this point we should just fallback to super method
Expand Down
Expand Up @@ -49,6 +49,7 @@

import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.metrics.api.Metric;
import org.apache.james.queue.api.MailPrioritySupport;
import org.apache.james.queue.api.MailQueue;
Expand All @@ -58,6 +59,8 @@
import org.apache.mailet.MailAddress;
import org.slf4j.Logger;

import com.google.common.base.Throwables;

/**
* <p>
* {@link MailQueue} implementation which use a JMS Queue for the<br>
Expand All @@ -69,17 +72,22 @@
* {@link Mail} objects.
* </p>
*/
public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport {
public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport, Disposable {

protected final String queueName;
protected final ConnectionFactory connectionFactory;
protected final Connection connection;
protected final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
protected final Metric enqueuedMailsMetric;
protected final Logger logger;
public final static String FORCE_DELIVERY = "FORCE_DELIVERY";

public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queueName, Metric enqueuedMailsMetric, Logger logger) {
this.connectionFactory = connectionFactory;
try {
connection = connectionFactory.createConnection();
connection.start();
} catch (JMSException e) {
throw Throwables.propagate(e);
}
this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
this.queueName = queueName;
this.enqueuedMailsMetric = enqueuedMailsMetric;
Expand All @@ -100,16 +108,12 @@ public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorF
*/
@Override
public MailQueueItem deQueue() throws MailQueueException {
Connection connection = null;
Session session = null;
Message message;
MessageConsumer consumer = null;

while (true) {
try {
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
consumer = session.createConsumer(queue, getMessageSelector());
Expand All @@ -135,13 +139,6 @@ public MailQueueItem deQueue() throws MailQueueException {
} catch (JMSException e1) {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
}

} catch (Exception e) {
Expand All @@ -168,12 +165,6 @@ public MailQueueItem deQueue() throws MailQueueException {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
throw new MailQueueException("Unable to dequeue next message", e);
}
}
Expand All @@ -182,7 +173,6 @@ public MailQueueItem deQueue() throws MailQueueException {

@Override
public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException {
Connection connection = null;
Session session = null;

long mydelay = 0;
Expand All @@ -193,8 +183,6 @@ public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueExcept

try {

connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

int msgPrio = NORMAL_PRIORITY;
Expand Down Expand Up @@ -226,12 +214,6 @@ public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueExcept
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e) {
// ignore here
}
}
}

Expand Down Expand Up @@ -491,13 +473,10 @@ protected String getMessageSelector() {
@SuppressWarnings("unchecked")
@Override
public long getSize() throws MailQueueException {
Connection connection = null;
Session session = null;
QueueBrowser browser = null;
int size = 0;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);

Expand Down Expand Up @@ -528,27 +507,18 @@ public long getSize() throws MailQueueException {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
}
}

@Override
public long flush() throws MailQueueException {
Connection connection = null;
Session session = null;
Message message = null;
MessageConsumer consumer = null;
MessageProducer producer = null;
boolean first = true;
long count = 0;
try {
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
Expand Down Expand Up @@ -606,12 +576,6 @@ public long flush() throws MailQueueException {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
}
}

Expand All @@ -635,17 +599,13 @@ protected long count(List<Message> msgs) {
* @return messages
*/
public List<Message> removeWithSelector(String selector) throws MailQueueException {
Connection connection = null;
Session session = null;
Message message = null;
MessageConsumer consumer = null;
boolean first = true;
List<Message> messages = new ArrayList<Message>();

try {
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
consumer = session.createConsumer(queue, selector);
Expand Down Expand Up @@ -688,12 +648,6 @@ public List<Message> removeWithSelector(String selector) throws MailQueueExcepti
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
}
}

Expand Down Expand Up @@ -738,20 +692,16 @@ public long remove(Type type, String value) throws MailQueueException {
@Override
@SuppressWarnings("unchecked")
public MailQueueIterator browse() throws MailQueueException {
Connection connection = null;
Session session = null;
QueueBrowser browser = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);

browser = session.createBrowser(queue);

final Enumeration<Message> messages = browser.getEnumeration();

final Connection myConnection = connection;
final Session mySession = session;
final QueueBrowser myBrowser = browser;

Expand Down Expand Up @@ -813,12 +763,6 @@ public void close() {
// ignore here
}

try {
if (myConnection != null)
myConnection.close();
} catch (JMSException e1) {
// ignore here
}
}
};

Expand All @@ -838,15 +782,17 @@ public void close() {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e1) {
// ignore here
}
logger.error("Unable to browse queue " + queueName, e);
throw new MailQueueException("Unable to browse queue " + queueName, e);
}
}

@Override
public void dispose() {
try {
connection.close();
} catch (JMSException e) {
}
}

}
Expand Up @@ -74,13 +74,6 @@ public void done(boolean success) throws MailQueueException {
} catch (JMSException e) {
// ignore here
}

try {
if (connection != null)
connection.close();
} catch (JMSException e) {
// ignore here
}
}
}

Expand Down

0 comments on commit 52316f6

Please sign in to comment.