Skip to content

Commit

Permalink
Make MessageProducerClient idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
allanmckenzie committed Sep 5, 2022
1 parent 1358c7f commit 153bf8a
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [11.0.0-M20] - 2022-09-05
### Changed
- MessageProducerClient is now idempotent when calling `startProducer(...)`

## [11.0.0-M19] - 2022-08-31
### Added
- Added MessageProducerClientBuilder for creating configurable message producer clients
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

<properties>
<cpp.repo.name>microservice_framework</cpp.repo.name>
<framework-libraries.version>11.0.0-M20</framework-libraries.version>
<framework-libraries.version>11.0.0-M21</framework-libraries.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public class MessageProducerClient implements AutoCloseable {
private Session session;
private MessageProducer messageProducer;
private Connection connection;
private String topicName;

private ActiveMQConnectionFactory activeMQConnectionFactory;
private final ActiveMQConnectionFactory activeMQConnectionFactory;

@Deprecated(since = "Please use MessageProducerClientBuilder to create instead of using this constructor")
public MessageProducerClient() {
Expand All @@ -40,32 +41,48 @@ public MessageProducerClient() {
}

/**
* Starts the message producer for a specific topic. Must be called
* before any messages can be sent.
* Starts the message producer for a specific topic. Must be called before any messages can be
* sent.
*
* @param topicName the name of the topic to send to
*/
public void startProducer(final String topicName) {

if (topicName.equals(this.topicName)) {
return;
}

try {
activeMQConnectionFactory.setBrokerURL(QUEUE_URI);
connection = activeMQConnectionFactory.createConnection();
connection.start();
if (connection == null) {
createConnection();
}

if (messageProducer != null) {
close(messageProducer);
}

session = connection.createSession(false, AUTO_ACKNOWLEDGE);
final Destination destination = session.createTopic(topicName);
messageProducer = session.createProducer(destination);
} catch (JMSException e) {

this.topicName = topicName;
} catch (final JMSException e) {
close();
throw new RuntimeException("Failed to create message producer to topic: '" + topicName + "', queue uri: '" + QUEUE_URI + "'", e);
throw new MessageProducerClientException("Failed to create message producer to topic: '" + topicName + "', queue uri: '" + QUEUE_URI + "'", e);
}
}

private void createConnection() throws JMSException {
activeMQConnectionFactory.setBrokerURL(QUEUE_URI);
connection = activeMQConnectionFactory.createConnection();
connection.start();
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
}

/**
* Sends a message to the topic specified in <code>startProducer(...)</code>
*
* @param commandName the name of the command
* @param payload the payload to be wrapped in a simple JsonEnvelope
* @param payload the payload to be wrapped in a simple JsonEnvelope
*/
public void sendMessage(final String commandName, final JsonObject payload) {

Expand All @@ -77,7 +94,7 @@ public void sendMessage(final String commandName, final JsonObject payload) {
/**
* Sends a message to the topic specified in <code>startProducer(...)</code>
*
* @param commandName the name of the command
* @param commandName the name of the command
* @param jsonEnvelope the full JsonEnvelope to send as a message
*/
public void sendMessage(final String commandName, final JsonEnvelope jsonEnvelope) {
Expand All @@ -86,8 +103,7 @@ public void sendMessage(final String commandName, final JsonEnvelope jsonEnvelop
throw new RuntimeException("Message producer not started. Please call startProducer(...) first.");
}

@SuppressWarnings("deprecation")
final String json = jsonEnvelope.toDebugStringPrettyPrint();
@SuppressWarnings("deprecation") final String json = jsonEnvelope.toDebugStringPrettyPrint();

try {
final TextMessage message = session.createTextMessage();
Expand All @@ -96,9 +112,9 @@ public void sendMessage(final String commandName, final JsonEnvelope jsonEnvelop
message.setStringProperty("CPPNAME", commandName);

messageProducer.send(message);
} catch (JMSException e) {
} catch (final JMSException e) {
close();
throw new RuntimeException("Failed to send message. commandName: '" + commandName + "', json: " + json, e);
throw new MessageProducerClientException("Failed to send message. commandName: '" + commandName + "', json: " + json, e);
}
}

Expand All @@ -114,6 +130,7 @@ public void close() {
session = null;
messageProducer = null;
connection = null;
topicName = null;
}

private void close(final AutoCloseable closeable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.gov.justice.services.test.utils.core.messaging;

public class MessageProducerClientException extends RuntimeException {

public MessageProducerClientException(final String message, final Throwable cause) {
super(message, cause);
}
}
Loading

0 comments on commit 153bf8a

Please sign in to comment.