Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMS: Adapt to JMS 2.0 API change in tests #1496

Merged
merged 3 commits into from
Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ services:
- 60010:60010
- 60020:60020
- 60030:60030
ibmmq:
image: ibmcom/mq:9.1.1.0
environment:
LICENSE: accept
MQ_QMGR_NAME: QM1
ports:
- "1414:1414"
- "9443:9443"
ironauth:
image: iron/auth
ports:
Expand Down Expand Up @@ -191,11 +199,3 @@ services:
environment:
- "ORIENTDB_ROOT_PASSWORD=root"
command: /orientdb/bin/server.sh -Dmemory.chunk.size=268435456
ibmmq:
image: ibmcom/mq:9.1.1.0
environment:
LICENSE: accept
MQ_QMGR_NAME: QM1
ports:
- "1414:1414"
- "9443:9443"
314 changes: 157 additions & 157 deletions jms/src/test/java/docs/javadsl/JmsIbmmqConnectorsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,170 +45,170 @@

public class JmsIbmmqConnectorsTest {

private static ActorSystem system;
private static Materializer materializer;
private static MQQueueConnectionFactory queueConnectionFactory;
private static MQTopicConnectionFactory topicConnectionFactory;

@BeforeClass
public static void setup() throws JMSException {
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
// #ibmmq-connection-factory
// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

// #ibmmq-connection-factory
JmsIbmmqConnectorsTest.queueConnectionFactory =
(MQQueueConnectionFactory) initDefaultFactory(connectionFactory);
JmsIbmmqConnectorsTest.topicConnectionFactory =
(MQTopicConnectionFactory) initDefaultFactory(new MQTopicConnectionFactory());
}

private static MQConnectionFactory initDefaultFactory(MQConnectionFactory connectionFactory)
throws JMSException {
// #ibmmq-connection-factory
// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");

// #ibmmq-connection-factory
return connectionFactory;
}

@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
}

@Test
public void publishAndConsumeQueue()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

// #ibmmq-queue
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";

Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

// #ibmmq-queue
// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
JmsConsumer.txSource(
JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

CompletionStage<List<String>> result =
txJmsSource
.take(in.size())
.map(
envelope -> {
envelope.commit();
if (envelope.message() instanceof TextMessage) {
TextMessage message = (TextMessage) envelope.message();
return message.getText();
} else {
throw new RuntimeException(
"unexpected message type " + envelope.message().getClass());
}
})
.runWith(Sink.seq(), materializer);

List<String> out = new ArrayList<>(result.toCompletableFuture().get(3, TimeUnit.SECONDS));
Collections.sort(out);
assertEquals(in, out);
}

@Test
public void publishAndConsumeCustomDestination()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);

// #ibmmq-custom-destination
// Option2: create Source using custom factory
String customQueue = "DEV.QUEUE.3";
// #ibmmq-custom-destination

// "app" clients are limited to perform create queue
// existing queue in use
Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");

Source.from(in).runWith(jmsSink, materializer);

// #ibmmq-custom-destination
Source<String, JmsConsumerControl> jmsSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

// #ibmmq-custom-destination

CompletionStage<List<String>> result =
jmsSource.take(in.size()).runWith(Sink.seq(), materializer);

assertEquals(in.size(), result.toCompletableFuture().get(5, TimeUnit.SECONDS).size());
}

@Test
public void publishAndConsumeTopic()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
private static ActorSystem system;
private static Materializer materializer;
private static MQQueueConnectionFactory queueConnectionFactory;
private static MQTopicConnectionFactory topicConnectionFactory;

@BeforeClass
public static void setup() throws JMSException {
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
// #ibmmq-connection-factory
// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

// #ibmmq-connection-factory
JmsIbmmqConnectorsTest.queueConnectionFactory =
(MQQueueConnectionFactory) initDefaultFactory(connectionFactory);
JmsIbmmqConnectorsTest.topicConnectionFactory =
(MQTopicConnectionFactory) initDefaultFactory(new MQTopicConnectionFactory());
}

private static MQConnectionFactory initDefaultFactory(MQConnectionFactory connectionFactory)
throws JMSException {
// #ibmmq-connection-factory
// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");

// #ibmmq-connection-factory
return connectionFactory;
}

@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
}

@Test
public void publishAndConsumeQueue()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

// #ibmmq-queue
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";

Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
Source.from(in).runWith(jmsSink, materializer);

// #ibmmq-queue
// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
JmsConsumer.txSource(
JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// #ibmmq-queue

CompletionStage<List<String>> result =
txJmsSource
.take(in.size())
.map(
envelope -> {
envelope.commit();
if (envelope.message() instanceof TextMessage) {
TextMessage message = (TextMessage) envelope.message();
return message.getText();
} else {
throw new RuntimeException(
"unexpected message type " + envelope.message().getClass());
}
})
.runWith(Sink.seq(), materializer);

List<String> out = new ArrayList<>(result.toCompletableFuture().get(3, TimeUnit.SECONDS));
Collections.sort(out);
assertEquals(in, out);
}

@Test
public void publishAndConsumeCustomDestination()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);

// #ibmmq-topic
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";
// #ibmmq-custom-destination
// Option2: create Source using custom factory
String customQueue = "DEV.QUEUE.3";
// #ibmmq-custom-destination

// "app" clients are limited to perform create queue
// existing queue in use
Sink<String, CompletionStage<Done>> jmsSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");

Sink<String, CompletionStage<Done>> jmsTopicSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
Source.from(in).runWith(jmsSink, materializer);

// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
// #ibmmq-topic
// #ibmmq-custom-destination
Source<String, JmsConsumerControl> jmsSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, queueConnectionFactory)
.withDestination(new CustomDestination("custom", createQueue(customQueue))));

CompletionStage<List<String>> result =
jmsTopicSource.take(in.size()).runWith(Sink.seq(), materializer);
// #ibmmq-custom-destination

Thread.sleep(500);
CompletionStage<List<String>> result =
jmsSource.take(in.size()).runWith(Sink.seq(), materializer);

Source.from(in).runWith(jmsTopicSink, materializer);
assertEquals(in.size(), result.toCompletableFuture().get(5, TimeUnit.SECONDS).size());
}

assertEquals(in, result.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
@Test
public void publishAndConsumeTopic()
throws JMSException, InterruptedException, ExecutionException, TimeoutException {

// #ibmmq-custom-destination
Function<Session, Destination> createQueue(String destinationName) {
return (session) -> {
// cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
MQQueueSession mqSession = (MQQueueSession) session;
try {
return mqSession.createQueue(destinationName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
};
}
// #ibmmq-custom-destination
List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");

// #ibmmq-topic
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";

Sink<String, CompletionStage<Done>> jmsTopicSink =
JmsProducer.textSink(
JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));

// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
JmsConsumer.textSource(
JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));
// #ibmmq-topic

CompletionStage<List<String>> result =
jmsTopicSource.take(in.size()).runWith(Sink.seq(), materializer);

Thread.sleep(500);

Source.from(in).runWith(jmsTopicSink, materializer);

assertEquals(in, result.toCompletableFuture().get(5, TimeUnit.SECONDS));
}

// #ibmmq-custom-destination
Function<Session, Destination> createQueue(String destinationName) {
return (session) -> {
// cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
MQQueueSession mqSession = (MQQueueSession) session;
try {
return mqSession.createQueue(destinationName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
};
}
// #ibmmq-custom-destination

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import org.apache.activemq.ActiveMQConnection

/**
* a silly cached connection factory, not thread safe
*
* @param url
*/
class CachedConnectionFactory(connFactory: ConnectionFactory) extends ConnectionFactory {

Expand All @@ -25,4 +23,11 @@ class CachedConnectionFactory(connFactory: ConnectionFactory) extends Connection
}

override def createConnection(s: String, s1: String): Connection = cachedConnection

// added in JMS 2.0
// see https://github.com/akka/alpakka/issues/1493
def createContext(x$1: Int): javax.jms.JMSContext = ???
def createContext(x$1: String, x$2: String, x$3: Int): javax.jms.JMSContext = ???
def createContext(x$1: String, x$2: String): javax.jms.JMSContext = ???
def createContext(): javax.jms.JMSContext = ???
}