Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test coverage for additional JMS message types #2609

Merged
merged 1 commit into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/modules/ROOT/pages/reference/extensions/jms.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,11 @@ Or add the coordinates to your existing project:
----

Check the xref:user-guide/index.adoc[User guide] for more information about writing Camel Quarkus applications.

== Usage

=== Message mapping with `org.w3c.dom.Node`

The Camel JMS component supports message mapping between `javax.jms.Message` and `org.apache.camel.Message`. When wanting to convert a Camel message body type of `org.w3c.dom.Node`,
you must ensure that the `camel-quarkus-jaxp` extension is present on the classpath.

4 changes: 4 additions & 0 deletions extensions/jms/runtime/src/main/doc/usage.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
=== Message mapping with `org.w3c.dom.Node`

The Camel JMS component supports message mapping between `javax.jms.Message` and `org.apache.camel.Message`. When wanting to convert a Camel message body type of `org.w3c.dom.Node`,
you must ensure that the `camel-quarkus-jaxp` extension is present on the classpath.
17 changes: 17 additions & 0 deletions integration-tests/messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-sjms2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-xml-jaxp</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
Expand Down Expand Up @@ -163,6 +167,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-xml-jaxp-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@
package org.apache.camel.quarkus.component.messaging.it;

import javax.inject.Named;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import javax.ws.rs.Produces;

import org.apache.camel.component.jms.MessageListenerContainerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

public class JmsProducers {

@Produces
@Named("customMessageListener")
public MessageListenerContainerFactory createCustomMessageListenerContainerFactory() {
@Named
public MessageListenerContainerFactory customMessageListener() {
return jmsEndpoint -> new DefaultMessageListenerContainer();
}

@Produces
@Named("customDestinationResolver")
public DestinationResolver createCustomDestinationResolver() {
@Named
public DestinationResolver customDestinationResolver() {
return (session, destinationName, pubSubDomain) -> {
if (destinationName.equals("ignored")) {
// Ignore and override the original queue name
Expand All @@ -47,9 +50,32 @@ public DestinationResolver createCustomDestinationResolver() {
};
}

@Produces
PlatformTransactionManager createTransactionManager(UserTransaction userTransaction,
@Named
public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
TransactionManager transactionManager) {
return new JtaTransactionManager(userTransaction, transactionManager);
}

@Named
public MessageConverter customMessageConverter() {
return new MessageConverter() {
@Override
public Message toMessage(Object o, Session session) throws JMSException, MessageConversionException {
if (o instanceof String) {
TextMessage message = session.createTextMessage("converter prefix " + o);
return message;
}
return null;
}

@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
return textMessage.getText() + " converter suffix";
}
return null;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
package org.apache.camel.quarkus.component.messaging.it;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.inject.Inject;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.TextMessage;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -31,6 +36,10 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.parsers.DocumentBuilderFactory;

import org.w3c.dom.Document;
import org.w3c.dom.Element;

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
Expand Down Expand Up @@ -74,14 +83,39 @@ public Response produceJmsQueueMessage(@PathParam("queueName") String queueName,
}

@Path("/jms/type/{type}")
@GET
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response jmsMessageType(@PathParam("type") String type) throws Exception {
public Response jmsMessageType(@PathParam("type") String type, String messageBody) throws Exception {
MockEndpoint mockEndpoint = context.getEndpoint("mock:jmsType", MockEndpoint.class);
mockEndpoint.reset();
mockEndpoint.expectedMessageCount(1);

producerTemplate.sendBodyAndHeader("jms:queue:typeTest", "test message payload", "type", type);
Object payload;
switch (type) {
case "bytes":
payload = messageBody.getBytes(StandardCharsets.UTF_8);
break;
case "file":
java.nio.file.Path path = Files.createTempFile("jms", ".txt");
Files.write(path, messageBody.getBytes(StandardCharsets.UTF_8));
payload = path.toFile();
break;
case "node":
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
Document document = factory.newDocumentBuilder().newDocument();
Element element = document.createElement("test");
element.setTextContent(messageBody);
payload = element;
break;
case "string":
payload = messageBody;
break;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}

producerTemplate.sendBody("jms:queue:typeTest", payload);

mockEndpoint.assertIsSatisfied(5000);

Expand All @@ -90,11 +124,19 @@ public Response jmsMessageType(@PathParam("type") String type) throws Exception
.getMessage(JmsMessage.class)
.getJmsMessage();

boolean result;
if (type.equals("Text")) {
result = (message instanceof javax.jms.TextMessage);
Object result;
if (type.equals("string") || type.equals("node")) {
assert message instanceof javax.jms.TextMessage;
TextMessage textMessage = (TextMessage) message;
result = textMessage.getText();
} else {
result = (message instanceof javax.jms.BytesMessage);
assert message instanceof javax.jms.BytesMessage;
BytesMessage byteMessage = (BytesMessage) message;
byteMessage.reset();
byte[] byteData;
byteData = new byte[(int) byteMessage.getBodyLength()];
byteMessage.readBytes(byteData);
result = new String(byteData);
}

return Response.ok().entity(result).build();
Expand Down Expand Up @@ -130,6 +172,7 @@ public Response jmsMapMessage(Map<String, String> payload) throws Exception {

@Path("/jms/custom/message/listener/factory")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public String customMessageListenerContainerFactory(String message) {
producerTemplate.sendBody("jms:queue:listener?messageListenerContainerFactory=#customMessageListener", message);
Expand All @@ -138,6 +181,7 @@ public String customMessageListenerContainerFactory(String message) {

@Path("/jms/custom/destination/resolver")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public String customDestinationResolver(String message) {
producerTemplate.sendBody("jms:queue:ignored?destinationResolver=#customDestinationResolver", message);
Expand All @@ -146,6 +190,15 @@ public String customDestinationResolver(String message) {
return consumerTemplate.receiveBody("jms:queue:destinationOverride", 5000, String.class);
}

@Path("/jms/custom/message/converter")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public String customMessageConverter(String message) {
producerTemplate.sendBody("jms:queue:converter?messageConverter=#customMessageConverter", message);
return consumerTemplate.receiveBody("jms:queue:converter?messageConverter=#customMessageConverter", 5000, String.class);
}

@Path("/jms/selector/{expression}")
@GET
@Produces(MediaType.TEXT_PLAIN)
Expand All @@ -169,15 +222,54 @@ public Response jmsTransaction() throws Exception {

producerTemplate.sendBody("jms:queue:txTest?transacted=true", "Test JMS Transaction");

mockEndpoint.assertIsSatisfied(15000);
mockEndpoint.assertIsSatisfied(5000);

Exchange exchange = mockEndpoint.getExchanges().get(0);
Message message = exchange.getMessage();

return Response.ok().entity(message.getBody()).build();
}

@Path("/jms/object")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
@POST
public Response testObjectMessage(String name) throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:objectTestResult", MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);

producerTemplate.sendBody("jms:queue:objectTest", new Person(name));

mockEndpoint.assertIsSatisfied();

List<Exchange> exchanges = mockEndpoint.getExchanges();
Exchange exchange = exchanges.get(0);
Person body = exchange.getMessage().getBody(Person.class);

return Response.ok().entity(body.getName()).build();
}

@Path("/jms/transfer/exchange")
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
@POST
public Response testTransferExchange(String message) throws InterruptedException {
MockEndpoint mockEndpoint = context.getEndpoint("mock:transferExchangeResult", MockEndpoint.class);
mockEndpoint.expectedMessageCount(1);

producerTemplate.sendBody("jms:queue:transferExchange?transferExchange=true", message);

mockEndpoint.assertIsSatisfied();

List<Exchange> exchanges = mockEndpoint.getExchanges();
Exchange exchange = exchanges.get(0);
String result = exchange.getMessage().getBody(String.class);

return Response.ok().entity(result).build();
}

@Path("/jms/topic")
@Consumes(MediaType.TEXT_PLAIN)
@POST
public void topicPubSub(String message) throws Exception {
MockEndpoint topicResultA = context.getEndpoint("mock:topicResultA", MockEndpoint.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ public class JmsRoutes extends RouteBuilder {

@Override
public void configure() throws Exception {
// jmsMessageType text / binary routes
from("jms:queue:typeTest?jmsMessageType=Text")
.toD("jms:queue:typeTestResult?jmsMessageType=${header.type}");
from("jms:queue:typeTest?concurrentConsumers=5")
.toD("jms:queue:typeTestResult");
aldettinger marked this conversation as resolved.
Show resolved Hide resolved

from("jms:queue:typeTestResult")
from("jms:queue:typeTestResult?artemisStreamingEnabled=false")
.to("mock:jmsType");

// Map message type routes
Expand Down Expand Up @@ -59,6 +58,12 @@ public void process(Exchange exchange) throws Exception {
})
.to("mock:txResult");

from("jms:queue:transferExchange?transferExchange=true")
.to("mock:transferExchangeResult");

from("jms:queue:objectTest")
.to("mock:objectTestResult");

// Topic routes
from("jms:topic:test?clientId=123&durableSubscriptionName=camel-quarkus")
.to("mock:topicResultA");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.messaging.it;

import java.io.Serializable;

public class Person implements Serializable {
String name;

public Person(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}