Skip to content

Commit

Permalink
fix(jans-auth-server): corrected thread-safety bug in ApplicationAudi…
Browse files Browse the repository at this point in the history
…tLogger #803

#803
  • Loading branch information
yuriyz committed Mar 21, 2022
1 parent 8876a57 commit ef73c2b
Showing 1 changed file with 27 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

@Named
@ApplicationScoped
@DependsOn("appInitializer")
public class ApplicationAuditLogger {

private final String BROKER_URL_PREFIX = "failover:(";
private final String BROKER_URL_SUFFIX = ")?timeout=5000&jms.useAsyncSend=true";
private final int ACK_MODE = Session.AUTO_ACKNOWLEDGE;
private final String CLIENT_QUEUE_NAME = "oauth2.audit.logging";
private final boolean transacted = false;
private static final String BROKER_URL_PREFIX = "failover:(";
private static final String BROKER_URL_SUFFIX = ")?timeout=5000&jms.useAsyncSend=true";
private static final int ACK_MODE = Session.AUTO_ACKNOWLEDGE;
private static final String CLIENT_QUEUE_NAME = "oauth2.audit.logging";
private static final boolean TRANSACTED = false;

private final ReentrantLock lock = new ReentrantLock();

Expand All @@ -55,7 +56,7 @@ public class ApplicationAuditLogger {
@Inject
private AppConfiguration appConfiguration;

private volatile PooledConnectionFactory pooledConnectionFactory;
private final AtomicReference<PooledConnectionFactory> pooledConnectionFactory = new AtomicReference<>();
private Set<String> jmsBrokerURISet;
private String jmsUserName;
private String jmsPassword;
Expand Down Expand Up @@ -89,10 +90,8 @@ public void sendMessage(OAuth2AuditLog oAuth2AuditLog) {
}

boolean messageDelivered = false;
if (sendAuditJms) {
if (tryToEstablishJMSConnection()) {
messageDelivered = loggingThroughJMS(oAuth2AuditLog);
}
if (sendAuditJms && tryToEstablishJMSConnection()) {
messageDelivered = loggingThroughJMS(oAuth2AuditLog);
}

if (!messageDelivered) {
Expand All @@ -102,23 +101,22 @@ public void sendMessage(OAuth2AuditLog oAuth2AuditLog) {

@PreDestroy
public void destroy() {
if (this.pooledConnectionFactory == null) {
if (this.pooledConnectionFactory.get() == null) {
return;
}

this.pooledConnectionFactory.clear();
this.pooledConnectionFactory = null;
this.pooledConnectionFactory.getAndSet(null).clear();
}

private boolean tryToEstablishJMSConnection() {
if (this.pooledConnectionFactory != null) {
if (this.pooledConnectionFactory.get() != null) {
return true;
}

lock.lock();
try {
// Check if another thread initialized JMS pool already
if (this.pooledConnectionFactory == null) {
if (this.pooledConnectionFactory.get() == null) {
return tryToEstablishJMSConnectionImpl();
}

Expand All @@ -129,16 +127,16 @@ private boolean tryToEstablishJMSConnection() {
}

private boolean tryToEstablishJMSConnectionImpl() {
Set<String> jmsBrokerURISet = appConfiguration.getJmsBrokerURISet();
if (!enabled || CollectionUtils.isEmpty(jmsBrokerURISet)) {
Set<String> uriSet = appConfiguration.getJmsBrokerURISet();
if (!enabled || CollectionUtils.isEmpty(uriSet)) {
return false;
}

this.jmsBrokerURISet = new HashSet<>(jmsBrokerURISet);
this.jmsBrokerURISet = new HashSet<>(uriSet);
this.jmsUserName = appConfiguration.getJmsUserName();
this.jmsPassword = appConfiguration.getJmsPassword();

Iterator<String> jmsBrokerURIIterator = jmsBrokerURISet.iterator();
Iterator<String> jmsBrokerURIIterator = uriSet.iterator();

StringBuilder uriBuilder = new StringBuilder();
while (jmsBrokerURIIterator.hasNext()) {
Expand All @@ -153,20 +151,21 @@ private boolean tryToEstablishJMSConnectionImpl() {
String brokerUrl = BROKER_URL_PREFIX + uriBuilder + BROKER_URL_SUFFIX;

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.jmsUserName, this.jmsPassword, brokerUrl);
this.pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);

pooledConnectionFactory.setIdleTimeout(5000);
pooledConnectionFactory.setMaxConnections(10);
pooledConnectionFactory.start();
final PooledConnectionFactory pool = new PooledConnectionFactory(connectionFactory);
pool.setIdleTimeout(5000);
pool.setMaxConnections(10);
pool.start();

this.pooledConnectionFactory.set(pool);
return true;
}

private boolean loggingThroughJMS(OAuth2AuditLog oAuth2AuditLog) {
try (QueueConnection connection = pooledConnectionFactory.createQueueConnection()) {
try (QueueConnection connection = pooledConnectionFactory.get().createQueueConnection()) {
connection.start();

try (QueueSession session = connection.createQueueSession(transacted, ACK_MODE);
try (QueueSession session = connection.createQueueSession(TRANSACTED, ACK_MODE);
MessageProducer producer = session.createProducer(session.createQueue(CLIENT_QUEUE_NAME))) {
TextMessage txtMessage = session.createTextMessage();
txtMessage.setText(ServerUtil.asPrettyJson(oAuth2AuditLog));
Expand All @@ -182,7 +181,9 @@ private boolean loggingThroughJMS(OAuth2AuditLog oAuth2AuditLog) {

private void loggingThroughFile(OAuth2AuditLog oAuth2AuditLog) {
try {
log.info(ServerUtil.asPrettyJson(oAuth2AuditLog));
if (log.isInfoEnabled()) {
log.info(ServerUtil.asPrettyJson(oAuth2AuditLog));
}
} catch (IOException e) {
log.error("Can't serialize the audit log", e);
}
Expand Down

0 comments on commit ef73c2b

Please sign in to comment.