Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] JMS intermittent test fix #6392

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import static io.helidon.messaging.connectors.jms.AcknowledgeMode.AUTO_ACKNOWLEDGE;

public class AbstractJmsTest {

static final String BROKER_URL = "vm://localhost?broker.persistent=false";
Expand All @@ -45,4 +50,19 @@ static void tearDown() throws Exception {
session.close();
}

static void clearQueue(String queueName){
var cf = JakartaJms.create(new ActiveMQConnectionFactory(AbstractJmsTest.BROKER_URL));
try (Connection conn = cf.createConnection();
var s = conn.createSession(false, AUTO_ACKNOWLEDGE.getAckMode())) {
conn.start();
Queue queue = s.createQueue(queueName);
MessageConsumer cons = s.createConsumer(queue);
jakarta.jms.Message m;
do {
m = cons.receive(100L);
} while (m != null);
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,12 +85,13 @@ Stream<Message> consumeAllCurrent(String topic) {
Message m;
List<Message> result = new ArrayList<>();
for (; ; ) {
m = consumer.receive(50L);
m = consumer.receive(500L);
if (m == null) {
break;
}
result.add(m);
}
consumer.close();
return result.stream();
} catch (JMSException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,6 @@ protected void countDown(String method) {
}
}

@ApplicationScoped
public static class ChannelAck extends AbstractSampleBean {

@Incoming("test-channel-ack-1")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Void> channelAck(Message<String> msg) {
LOGGER.fine(() -> String.format("Received %s", msg.getPayload()));
consumed().add(msg.getPayload());
if (msg.getPayload().startsWith("NO_ACK")) {
LOGGER.fine(() -> String.format("NOT Acked %s", msg.getPayload()));
} else {
LOGGER.fine(() -> String.format("Acked %s", msg.getPayload()));
msg.ack();
}
countDown("channel1()");
return CompletableFuture.completedFuture(null);
}
}

@ApplicationScoped
public static class Channel1 extends AbstractSampleBean {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package io.helidon.messaging.connectors.jms;

import java.lang.annotation.Annotation;
import java.time.Duration;
import java.util.List;

import io.helidon.messaging.connectors.mock.MockConnector;
import io.helidon.messaging.connectors.mock.TestConnector;
import io.helidon.microprofile.config.ConfigCdiExtension;
import io.helidon.microprofile.messaging.MessagingCdiExtension;
import io.helidon.microprofile.tests.junit5.AddBean;
Expand All @@ -29,19 +33,24 @@
import io.helidon.microprofile.tests.junit5.DisableDiscovery;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.se.SeContainer;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import static java.lang.System.Logger.Level.DEBUG;

@HelidonTest(resetPerTest = true)
@DisableDiscovery
@AddBeans({
@AddBean(JmsConnector.class),
@AddBean(AbstractSampleBean.ChannelAck.class),
@AddBean(MockConnector.class),
})
@AddExtensions({
@AddExtension(ConfigCdiExtension.class),
Expand All @@ -59,42 +68,56 @@
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.acknowledge-mode", value = "CLIENT_ACKNOWLEDGE"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.type", value = "queue"),
@AddConfig(key = "mp.messaging.incoming.test-channel-ack-1.destination", value = AckMpTest.TEST_QUEUE_ACK),

@AddConfig(key = "mp.messaging.outgoing.mock-conn-channel.connector", value = MockConnector.CONNECTOR_NAME),
})
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AckMpTest extends AbstractMPTest {

static final String TEST_QUEUE_ACK = "queue-ack";

@PostConstruct
void cleanupBefore() {
//cleanup not acked messages
consumeAllCurrent(TEST_QUEUE_ACK)
.map(JmsMessage::of)
.forEach(Message::ack);
private static final System.Logger LOGGER = System.getLogger(AckMpTest.class.getName());
private static final Annotation TEST_CONNECTOR_ANNOTATION = MockConnector.class.getAnnotation(TestConnector.class);

@Incoming("test-channel-ack-1")
@Outgoing("mock-conn-channel")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Message<String> channelAck(Message<String> msg) {
LOGGER.log(DEBUG, () -> String.format("Received %s", msg.getPayload()));
if (msg.getPayload().startsWith("NO_ACK")) {
LOGGER.log(DEBUG, () -> String.format("NOT Acked %s", msg.getPayload()));
} else {
LOGGER.log(DEBUG, () -> String.format("Acked %s", msg.getPayload()));
msg.ack();
}
return msg;
}

@Test
@Order(1)
void resendAckTestPart1(SeContainer cdi) {
MockConnector mockConnector = cdi.select(MockConnector.class, TEST_CONNECTOR_ANNOTATION).get();
//Messages starting with NO_ACK is not acked by ChannelAck bean
List<String> testData = List.of("0", "1", "2", "NO_ACK-1", "NO_ACK-2", "NO_ACK-3");
AbstractSampleBean bean = cdi.select(AbstractSampleBean.ChannelAck.class).get();
produceAndCheck(bean, testData, TEST_QUEUE_ACK, testData);
bean.restart();
produce(TEST_QUEUE_ACK, testData, m -> {});
mockConnector.outgoing("mock-conn-channel", String.class)
.awaitPayloads(Duration.ofSeconds(5), testData.toArray(String[]::new));
}

@Test
@Order(2)
void resendAckTestPart2(SeContainer cdi) {
try {
AbstractSampleBean bean = cdi.select(AbstractSampleBean.ChannelAck.class).get();
//Send nothing just check if not acked messages are redelivered
produceAndCheck(bean, List.of(), TEST_QUEUE_ACK, List.of("NO_ACK-1", "NO_ACK-2", "NO_ACK-3"));
} finally {
//cleanup not acked messages
consumeAllCurrent(TEST_QUEUE_ACK)
.map(JmsMessage::of)
.forEach(Message::ack);
}
MockConnector mockConnector = cdi.select(MockConnector.class, TEST_CONNECTOR_ANNOTATION).get();

//Check if not acked messages are redelivered
mockConnector.outgoing("mock-conn-channel", String.class)
.requestMax()
.awaitCount(Duration.ofSeconds(5), 1)
.awaitPayloads(Duration.ofSeconds(5), "NO_ACK-1", "NO_ACK-2", "NO_ACK-3");
}

@AfterAll
static void afterAll() {
AbstractJmsTest.clearQueue(TEST_QUEUE_ACK);
}
}