Skip to content

Commit

Permalink
JMS: Adapt to JMS 2.0 API change in tests (#1496)
Browse files Browse the repository at this point in the history
* Add methods required by JMS 2.0
* Put ibmmq in sorted list
  • Loading branch information
ennru authored Feb 14, 2019
1 parent cec1d54 commit 4811c14
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 167 deletions.
16 changes: 8 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ services:
- 60010:60010
- 60020:60020
- 60030:60030
ibmmq:
image: ibmcom/mq:9.1.1.0
environment:
LICENSE: accept
MQ_QMGR_NAME: QM1
ports:
- "1414:1414"
- "9443:9443"
ironauth:
image: iron/auth
ports:
Expand Down Expand Up @@ -191,11 +199,3 @@ services:
environment:
- "ORIENTDB_ROOT_PASSWORD=root"
command: /orientdb/bin/server.sh -Dmemory.chunk.size=268435456
ibmmq:
image: ibmcom/mq:9.1.1.0
environment:
LICENSE: accept
MQ_QMGR_NAME: QM1
ports:
- "1414:1414"
- "9443:9443"
314 changes: 157 additions & 157 deletions jms/src/test/java/docs/javadsl/JmsIbmmqConnectorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,170 +45,170 @@

public class JmsIbmmqConnectorsTest {

private static ActorSystem system;
private static Materializer materializer;
private static MQQueueConnectionFactory queueConnectionFactory;
private static MQTopicConnectionFactory topicConnectionFactory;

@BeforeClass
public static void setup() throws JMSException {
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
// #ibmmq-connection-factory
// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

// #ibmmq-connection-factory
JmsIbmmqConnectorsTest.queueConnectionFactory =
(MQQueueConnectionFactory) initDefaultFactory(connectionFactory);
JmsIbmmqConnectorsTest.topicConnectionFactory =
(MQTopicConnectionFactory) initDefaultFactory(new MQTopicConnectionFactory());
}

private static MQConnectionFactory initDefaultFactory(MQConnectionFactory connectionFactory)
throws JMSException {
// #ibmmq-connection-factory
// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");

// #ibmmq-connection-factory
return connectionFactory;
}

@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
}

@Test
public void publishAndConsumeQueue()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

// #ibmmq-queue
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";

Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

// #ibmmq-queue
// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
JmsConsumer.txSource(
JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

CompletionStage<List<String>> result =
txJmsSource
.take(in.size())
.map(
envelope -> {
envelope.commit();
if (envelope.message() instanceof TextMessage) {
TextMessage message = (TextMessage) envelope.message();
return message.getText();
} else {
throw new RuntimeException(
"unexpected message type " + envelope.message().getClass());
}
})
.runWith(Sink.seq(), materializer);

List<String> out = new ArrayList<>(result.toCompletableFuture().get(3, TimeUnit.SECONDS));
Collections.sort(out);
assertEquals(in, out);
}

@Test
public void publishAndConsumeCustomDestination()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);

// #ibmmq-custom-destination
// Option2: create Source using custom factory
String customQueue = "DEV.QUEUE.3";
// #ibmmq-custom-destination

// "app" clients are limited to perform create queue
// existing queue in use
Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");

Source.from(in).runWith(jmsSink, materializer);

// #ibmmq-custom-destination
Source<String, JmsConsumerControl> jmsSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

// #ibmmq-custom-destination

CompletionStage<List<String>> result =
jmsSource.take(in.size()).runWith(Sink.seq(), materializer);

assertEquals(in.size(), result.toCompletableFuture().get(5, TimeUnit.SECONDS).size());
}

@Test
public void publishAndConsumeTopic()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
private static ActorSystem system;
private static Materializer materializer;
private static MQQueueConnectionFactory queueConnectionFactory;
private static MQTopicConnectionFactory topicConnectionFactory;

@BeforeClass
public static void setup() throws JMSException {
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
// #ibmmq-connection-factory
// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

// #ibmmq-connection-factory
JmsIbmmqConnectorsTest.queueConnectionFactory =
(MQQueueConnectionFactory) initDefaultFactory(connectionFactory);
JmsIbmmqConnectorsTest.topicConnectionFactory =
(MQTopicConnectionFactory) initDefaultFactory(new MQTopicConnectionFactory());
}

private static MQConnectionFactory initDefaultFactory(MQConnectionFactory connectionFactory)
throws JMSException {
// #ibmmq-connection-factory
// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");

// #ibmmq-connection-factory
return connectionFactory;
}

@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
}

@Test
public void publishAndConsumeQueue()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

// #ibmmq-queue
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";

Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

// #ibmmq-queue
// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
JmsConsumer.txSource(
JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

CompletionStage<List<String>> result =
txJmsSource
.take(in.size())
.map(
envelope -> {
envelope.commit();
if (envelope.message() instanceof TextMessage) {
TextMessage message = (TextMessage) envelope.message();
return message.getText();
} else {
throw new RuntimeException(
"unexpected message type " + envelope.message().getClass());
}
})
.runWith(Sink.seq(), materializer);

List<String> out = new ArrayList<>(result.toCompletableFuture().get(3, TimeUnit.SECONDS));
Collections.sort(out);
assertEquals(in, out);
}

@Test
public void publishAndConsumeCustomDestination()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);

// #ibmmq-topic
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";
// #ibmmq-custom-destination
// Option2: create Source using custom factory
String customQueue = "DEV.QUEUE.3";
// #ibmmq-custom-destination

// "app" clients are limited to perform create queue
// existing queue in use
Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");

Sink<String, CompletionStage<Done>> jmsTopicSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
Source.from(in).runWith(jmsSink, materializer);

// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
// #ibmmq-topic
// #ibmmq-custom-destination
Source<String, JmsConsumerControl> jmsSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

CompletionStage<List<String>> result =
jmsTopicSource.take(in.size()).runWith(Sink.seq(), materializer);
// #ibmmq-custom-destination

Thread.sleep(500);
CompletionStage<List<String>> result =
jmsSource.take(in.size()).runWith(Sink.seq(), materializer);

Source.from(in).runWith(jmsTopicSink, materializer);
assertEquals(in.size(), result.toCompletableFuture().get(5, TimeUnit.SECONDS).size());
}

assertEquals(in, result.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
@Test
public void publishAndConsumeTopic()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

// #ibmmq-custom-destination
Function<Session, Destination> createQueue(String destinationName) {
return (session) -> {
// cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
MQQueueSession mqSession = (MQQueueSession) session;
try {
return mqSession.createQueue(destinationName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
};
}
// #ibmmq-custom-destination
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");

// #ibmmq-topic
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";

Sink<String, CompletionStage<Done>> jmsTopicSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));

// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
// #ibmmq-topic

CompletionStage<List<String>> result =
jmsTopicSource.take(in.size()).runWith(Sink.seq(), materializer);

Thread.sleep(500);

Source.from(in).runWith(jmsTopicSink, materializer);

assertEquals(in, result.toCompletableFuture().get(5, TimeUnit.SECONDS));
}

// #ibmmq-custom-destination
Function<Session, Destination> createQueue(String destinationName) {
return (session) -> {
// cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
MQQueueSession mqSession = (MQQueueSession) session;
try {
return mqSession.createQueue(destinationName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
};
}
// #ibmmq-custom-destination

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import org.apache.activemq.ActiveMQConnection

/**
* a silly cached connection factory, not thread safe
*
* @param url
*/
class CachedConnectionFactory(connFactory: ConnectionFactory) extends ConnectionFactory {

Expand All @@ -25,4 +23,11 @@ class CachedConnectionFactory(connFactory: ConnectionFactory) extends Connection
}

override def createConnection(s: String, s1: String): Connection = cachedConnection

// added in JMS 2.0
// see https://github.com/akka/alpakka/issues/1493
def createContext(x$1: Int): javax.jms.JMSContext = ???
def createContext(x$1: String, x$2: String, x$3: Int): javax.jms.JMSContext = ???
def createContext(x$1: String, x$2: String): javax.jms.JMSContext = ???
def createContext(): javax.jms.JMSContext = ???
}

0 comments on commit 4811c14

Please sign in to comment.