Skip to content
This repository has been archived by the owner on Aug 13, 2020. It is now read-only.

Core jms adapter #11

Merged
merged 1 commit into from
Mar 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.skyscreamer</groupId>
<artifactId>jsonassert</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
*/
public enum Component {

COMMAND_API, COMMAND_CONTROLLER, COMMAND_HANDLER;
COMMAND_API("commands", "api"), COMMAND_CONTROLLER("commands", "controller"), COMMAND_HANDLER("commands", "handler");

private final String pillar;
private final String tier;

Component(final String pillar, final String tier) {
this.pillar = pillar;
this.tier = tier;
}

/**
* Retrieves the Component of the provided {@link ServiceComponent}
Expand All @@ -29,4 +37,12 @@ public static Component getComponentFromAdapter(final Class<Adapter> clazz) {
return adapter.value();
}

public String pillar() {
return pillar;
}

public String tier() {
return tier;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.gov.justice.services.core.annotation.exception;

public class MissingAnnotationException extends RuntimeException {

public MissingAnnotationException(final String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package uk.gov.justice.services.core.context;

import uk.gov.justice.services.core.exception.InvalidNameException;

public final class ContextName {
private ContextName() {
}

/**
* Extracts context name from the logical action or event name.
*
* @param name logical name of the action or event.
* @return context name
*/
public static String fromName(final String name) {
if (!name.contains(".")) {
throw new InvalidNameException("Invalid action or event name " + name);
} else return name.split("\\.")[0];
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.core.dispatcher;


import uk.gov.justice.services.core.handler.exception.MissingHandlerException;
import uk.gov.justice.services.core.handler.registry.HandlerRegistry;
import uk.gov.justice.services.messaging.Envelope;

Expand Down Expand Up @@ -33,7 +34,7 @@ public void dispatch(final Envelope envelope) {
if (handlerRegistry.canHandle(name)) {
handlerRegistry.get(name).execute(envelope);
} else {
throw new IllegalArgumentException("No handler registered to handle action :" + name);
throw new MissingHandlerException("No handler registered to handle action :" + name);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
* Dispatches command to the correct handler.
* The framework will inject the correct implementation based on the {@link Adapter} annotation.
*/
@FunctionalInterface
public interface Dispatcher {

/**
* Dispatches the {@code envelope} to the correct handler.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ private Object instantiateHandler(final Bean<Object> bean) {
}

private AsynchronousDispatcher getDispatcher(final Component component) {
dispatcherMap.putIfAbsent(component, new AsynchronousDispatcher());
return dispatcherMap.get(component);
return dispatcherMap.computeIfAbsent(component, c -> new AsynchronousDispatcher());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.gov.justice.services.core.exception;

public class InvalidNameException extends RuntimeException {

public InvalidNameException(final String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.gov.justice.services.core.handler.exception;

public class MissingHandlerException extends RuntimeException {

public MissingHandlerException(final String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package uk.gov.justice.services.core.jms;


import uk.gov.justice.services.core.dispatcher.Dispatcher;
import uk.gov.justice.services.core.jms.converter.EnvelopeConverter;
import uk.gov.justice.services.core.jms.exception.InvalildJmsMessageTypeException;

import javax.inject.Inject;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* Abstract JMS Listener. Dispatches the message to the correct handler.
*/
public abstract class AbstractJMSListener implements MessageListener {

@Inject
EnvelopeConverter envelopeConverter;

protected abstract Dispatcher getDispatcher();

@Override
public void onMessage(final Message message) {
if (!(message instanceof TextMessage)) {
try {
throw new InvalildJmsMessageTypeException(String.format("Message is not an instance of TextMessage %s", message.getJMSMessageID()));
} catch (JMSException e) {
throw new InvalildJmsMessageTypeException(String.format("Message is not an instance of TextMessage. Failed to retrieve messageId %s",
message), e);
}
}

getDispatcher().dispatch(envelopeConverter.fromMessage((TextMessage) message));
}

}
64 changes: 40 additions & 24 deletions core/src/main/java/uk/gov/justice/services/core/jms/JmsSender.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
package uk.gov.justice.services.core.jms;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.gov.justice.services.core.annotation.Component;
import uk.gov.justice.services.core.context.ContextName;
import uk.gov.justice.services.core.jms.converter.EnvelopeConverter;
import uk.gov.justice.services.core.jms.exception.JmsSenderException;
import uk.gov.justice.services.core.util.JsonObjectConverter;
import uk.gov.justice.services.core.sender.Sender;
import uk.gov.justice.services.messaging.Envelope;

import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.enterprise.inject.Alternative;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Objects;

@ApplicationScoped
public class JmsSender {
@Alternative
public class JmsSender implements Sender {

static final String JMS_HEADER_CPPNAME = "CPPNAME";

Logger logger = LoggerFactory.getLogger(JmsSender.class);

@Inject
JsonObjectConverter jsonObjectConverter;

@Resource(mappedName = "java:comp/DefaultJMSConnectionFactory")
QueueConnectionFactory queueConnectionFactory;
private final JmsEndpoints jmsEndpoints;
private final Component destinationComponent;
private final QueueConnectionFactory queueConnectionFactory;

Context initialContext;
private EnvelopeConverter envelopeConverter;

public JmsSender(final Component destinationComponent, final EnvelopeConverter envelopeConverter, final JmsEndpoints jmsEndpoints,
final QueueConnectionFactory queueConnectionFactory) {
this.envelopeConverter = envelopeConverter;
this.destinationComponent = destinationComponent;
this.jmsEndpoints = jmsEndpoints;
this.queueConnectionFactory = queueConnectionFactory;
}

private Context getInitialContext() throws NamingException {
if (initialContext == null) {
Expand All @@ -42,14 +44,32 @@ private Context getInitialContext() throws NamingException {
return initialContext;
}

@Override
public void send(Envelope envelope) {
final String contextName = ContextName.fromName(envelope.metadata().name());
send(jmsEndpoints.getEndpoint(destinationComponent, contextName), envelope);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JmsSender jmsSender = (JmsSender) o;
return destinationComponent == jmsSender.destinationComponent;
}

@Override
public int hashCode() {
return Objects.hash(destinationComponent);
}

/**
* Sends the <code>envelope</code> to the JMS queue <code>queueName</code>
*
* @param queueName Name of the queue.
* @param envelope Envelope that needs to be sent.
*/
public void send(final String queueName, final Envelope envelope) {
private void send(final String queueName, final Envelope envelope) {

try {
final Queue queue = (Queue) getInitialContext().lookup(queueName);
Expand All @@ -58,12 +78,8 @@ public void send(final String queueName, final Envelope envelope) {

try (QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE)) {

final String envelopeAsString = jsonObjectConverter.asString(jsonObjectConverter.fromEnvelope(envelope));
final TextMessage textMessage = session.createTextMessage(envelopeAsString);
textMessage.setStringProperty(JMS_HEADER_CPPNAME, envelope.metadata().name());

try (QueueSender sender = session.createSender(queue)) {
sender.send(textMessage);
sender.send(envelopeConverter.toMessage(envelope, session));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package uk.gov.justice.services.core.jms;

import uk.gov.justice.services.core.annotation.Component;
import uk.gov.justice.services.core.jms.converter.EnvelopeConverter;

import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.QueueConnectionFactory;

/**
* Factory to create {@link JmsSender}.
*/
@ApplicationScoped
public class JmsSenderFactory {

@Inject
JmsEndpoints jmsEndpoints;

@Inject
EnvelopeConverter envelopeConverter;

@Resource(mappedName = "java:comp/DefaultJMSConnectionFactory")
QueueConnectionFactory queueConnectionFactory;

/**
* Creates a {@link JmsSender} based on the componentDestination.
*
* @param componentDestination message destination component.
* @return a new JmsSender instance.
*/
public JmsSender createJmsSender(final Component componentDestination) {
return new JmsSender(componentDestination, envelopeConverter, jmsEndpoints, queueConnectionFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package uk.gov.justice.services.core.jms.converter;

import uk.gov.justice.services.core.jms.exception.JmsConverterException;
import uk.gov.justice.services.core.util.JsonObjectConverter;
import uk.gov.justice.services.messaging.Envelope;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
* Implementation of {@link MessageConverter} for {@link uk.gov.justice.services.messaging.Envelope}
*/
@ApplicationScoped
public class EnvelopeConverter implements MessageConverter<Envelope, TextMessage> {

static final String JMS_HEADER_CPPNAME = "CPPNAME";

@Inject
JsonObjectConverter jsonObjectConverter;

@Override
public Envelope fromMessage(final TextMessage message) {
String messageAsString = null;

try {
messageAsString = message.getText();
return jsonObjectConverter.asEnvelope(jsonObjectConverter.fromString(messageAsString));
} catch (JMSException e) {
throw createJmsConverterException(message, e);
}
}

@Override
public TextMessage toMessage(final Envelope envelope, final Session session) {
final String envelopeAsString = jsonObjectConverter.asString(jsonObjectConverter.fromEnvelope(envelope));

try {
final TextMessage textMessage = session.createTextMessage(envelopeAsString);
textMessage.setStringProperty(JMS_HEADER_CPPNAME, envelope.metadata().name());
return textMessage;
} catch (JMSException e) {
throw new JmsConverterException(String.format("Exception while creating message from envelope %s", envelopeAsString), e);
}
}

private JmsConverterException createJmsConverterException(final TextMessage message, final Throwable e) {
try {
return new JmsConverterException(String.format("Exception while creating envelope from message %s", message.getJMSMessageID()), e);
} catch (JMSException e1) {
return new JmsConverterException(String.format("Exception while creating envelope from message. Failed to retrieve messageId from %s", message), e1);
}
}

}
Loading