From 90df9873693e657b7f15d483a993b2dd6bf87650 Mon Sep 17 00:00:00 2001 From: JiriOndrusek Date: Mon, 22 Apr 2024 10:52:02 +0200 Subject: [PATCH] JT400 tests can not be run in parallel #6018 --- integration-tests/jt400/README.adoc | 47 +-- .../src/main/resources/application.properties | 10 +- .../quarkus/component/jt400/it/Jt400Test.java | 48 +-- .../component/jt400/it/Jt400TestResource.java | 306 +++++++----------- 4 files changed, 175 insertions(+), 236 deletions(-) diff --git a/integration-tests/jt400/README.adoc b/integration-tests/jt400/README.adoc index 241dbf8c7725..a95f2cca831e 100644 --- a/integration-tests/jt400/README.adoc +++ b/integration-tests/jt400/README.adoc @@ -88,6 +88,31 @@ CRTDTAQ DTAQ(LIBRARY/TESTKEYED) SEQ(*KEYED) KEYLEN(20) MAXLEN(100) CRTDTAQ DTAQ(LIBRARY/TESTLIFO) SEQ(*LIFO) MAXLEN(100) ``` +==== Synchronization for parallel executions + +The tests do not work by default for parallel executions. +For parallel scenario, the locking file has to be provided. + +You can create such file by running + +``` +QSH CMD('touch #file_lock_path') +for example QSH CMD('touch /home/#username/cq_jt400_lock') +``` + +How to provide a locking file: + +``` +export JT400_LOCK_FILE=#file_lock_path +``` + +or for Windows: + +``` +$Env:JT400_LOCK_FILE="#file_lock_path" +``` +*If locking file is not provided, tests may fail their executions in parallel mode* + ==== Using different object names If your test object names are different from the default ones, you can override default values via environmental variable @@ -109,27 +134,5 @@ $Env:JT400_LIFO_QUEUE="#lifoqueue_if_not_TESTLIFO.DTAQe" $Env:JT400_KEYED_QUEUE="#lkeyedqueue_if_not_TESTKEYED.DTAQ" $Env:JT400_MESSAGE_QUEUE="#messagequeue_if_not_TESTMSGQ.MSGQ" $Env:JT400_MESSAGE_REPLYTO_QUEUE="#messagequeueinquiry_if_not_REPLYMSGQ.MSGQ" -$Env:JT400_USER_SPACE="#userspace_if_not_PROGCALL" -``` - -=== Clear queues after unexpected failures - -If tests finishes without unexpected failure, tests are taking care of clearing the data. -In some cases data might stay written into the real server if test fails unexpectedly. -This state should might alter following executions. - -To force full clear (of each queue) can be achieved by add ing parameter -``` --Dcq.jt400.clear-all=true ``` -Be aware that with `-Dcq.jt400.clear-all=true`, the tests can not successfully finish in parallel run. - -Usage of clear queues parameter is *strongly* suggested during development - - -==== Parallel runs and locking - -Simple locking mechanism is implemented for the test to allow parallel executions. -Whenever test is started, new entry is written into keyed data queue `JT400_KEYED_QUEUE` with the key `cq.jt400.global-lock` and entry is removed after the run. -Tests are able to clear this lock even if previous execution fails unexpectedly. \ No newline at end of file diff --git a/integration-tests/jt400/src/main/resources/application.properties b/integration-tests/jt400/src/main/resources/application.properties index 296c10e1a058..0dd299e81571 100644 --- a/integration-tests/jt400/src/main/resources/application.properties +++ b/integration-tests/jt400/src/main/resources/application.properties @@ -14,12 +14,6 @@ ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- -#quarkus.test.flat-class-path = ${quarkus.test.flat-class-path} - -# workaround for mocked tests, should be solvable by excluding mocked java files from compilation of skip-mock-tests profile -# I can not make it work though, but to not block the native support by this, I'm setting flat path to true for all tests -quarkus.test.flat-class-path = true - #jt400 server connection information cq.jt400.url=${JT400_URL:system} cq.jt400.username=${JT400_USERNAME:username} @@ -31,4 +25,6 @@ cq.jt400.user-space=${JT400_USER_SPACE:PROGCALL} cq.jt400.message-queue=${JT400_MESSAGE_QUEUE:TESTMSGQ.MSGQ} cq.jt400.message-replyto-queue=${JT400_MESSAGE_REPLYTO_QUEUE:REPLYMSGQ.MSGQ} cq.jt400.keyed-queue=${JT400_KEYED_QUEUE:TESTKEYED.DTAQ} -cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ} \ No newline at end of file +cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ} + +cq.jt400.lock-file=${JT400_LOCK_FILE} \ No newline at end of file diff --git a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java index 3627dd198aaa..2f9bfd7c6955 100644 --- a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java +++ b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java @@ -28,7 +28,6 @@ import org.awaitility.Awaitility; import org.hamcrest.Matchers; import org.jboss.logging.Logger; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; @@ -41,25 +40,34 @@ public class Jt400Test { private final int MSG_LENGTH = 20; //tests may be executed in parallel, therefore the timeout is a little bigger in case the test has to wait for another one - private final int WAIT_IN_SECONDS = 20; + private static final int WAIT_IN_SECONDS = 30; @BeforeAll public static void beforeAll() throws Exception { + //lock execution + getClientHelper().lock(); + //for development purposes // logQueues(); - //lock execution - Jt400TestResource.CLIENT_HELPER.lock(); - } - - @AfterAll - public static void afterAll() throws Exception { - getClientHelper().unlock(); + //clear al data in advance to be sure that there is no data in the queues + //it is not possible to clear data after the run because of CPF2451 Message queue REPLYMSGQ is allocated to another job + //wait is required also because of CPF2451, usually takes ~20 seconds to release connections to a reply queue + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until( + () -> { + try { + return getClientHelper().clear(); + } catch (Exception e) { + LOGGER.debug("Clear failed because of: " + e.getMessage()); + return false; + } + }, + Matchers.is(true)); } private static void logQueues() throws Exception { StringBuilder sb = new StringBuilder("\n"); - sb.append("**********************************************************"); + sb.append("************************************************************"); sb.append(getClientHelper().dumpQueues()); sb.append("\n**********************************************************\n"); LOGGER.info(sb.toString()); @@ -67,7 +75,7 @@ private static void logQueues() throws Exception { @Test public void testDataQueue() { - LOGGER.debug("** testDataQueue() ** has started "); + LOGGER.debug("**** testDataQueue() ** has started "); String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); String answer = "Hello From DQ: " + msg; @@ -91,7 +99,7 @@ public void testDataQueue() { @Test public void testDataQueueBinary() throws Exception { - LOGGER.debug("** testDataQueueBinary() ** has started "); + LOGGER.debug("**** testDataQueueBinary() ** has started "); String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); String answer = "Hello (bin) " + msg; @@ -118,7 +126,7 @@ public void testDataQueueBinary() throws Exception { @Test public void testKeyedDataQueue() { - LOGGER.debug("** testKeyedDataQueue() ** has started "); + LOGGER.debug("**** testKeyedDataQueue() ** has started "); String msg1 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); String msg2 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); String answer1 = "Hello From KDQ: " + msg1; @@ -170,7 +178,7 @@ public void testKeyedDataQueue() { @Test public void testMessageQueue() throws Exception { - LOGGER.debug("** testMessageQueue() ** has started "); + LOGGER.debug("**** testMessageQueue() ** has started "); //write String msg = RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT); String answer = "Hello from MQ: " + msg; @@ -205,19 +213,20 @@ public void testMessageQueue() throws Exception { @Test public void testInquiryMessageQueue() throws Exception { - LOGGER.debug("** testInquiryMessageQueue() **: has started "); + LOGGER.debug("**** testInquiryMessageQueue() **: has started "); + String msg = RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT); String replyMsg = "reply to: " + msg; - - LOGGER.debug("testInquiryMessageQueue: writing " + msg); + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, msg); + getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, replyMsg); //sending a message using the same client as component getClientHelper().sendInquiry(msg); + LOGGER.debug("testInquiryMessageQueue: message " + msg + " written via client"); //register deletion of the message in case some following task fails QueuedMessage queuedMessage = getClientHelper().peekReplyToQueueMessage(msg); if (queuedMessage != null) { - getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu, queuedMessage.getKey()); LOGGER.debug("testInquiryMessageQueue: message confirmed by peek: " + msg); } @@ -227,6 +236,7 @@ public void testInquiryMessageQueue() throws Exception { .post("/jt400/inquiryMessageSetExpected") .then() .statusCode(204); + //start route before sending message (and wait for start) Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until( () -> RestAssured.get("/jt400/route/start/inquiryRoute") @@ -237,7 +247,7 @@ public void testInquiryMessageQueue() throws Exception { LOGGER.debug("testInquiryMessageQueue: inquiry route started"); //await to be processed - Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).until( + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until( () -> RestAssured.get("/jt400/inquiryMessageProcessed") .then() .statusCode(200) diff --git a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java index 3c09d0f562b6..99bdf6d1487a 100644 --- a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java +++ b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java @@ -16,7 +16,6 @@ */ package org.apache.camel.quarkus.component.jt400.it; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Enumeration; @@ -24,7 +23,6 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -32,22 +30,18 @@ import java.util.stream.Collectors; import com.ibm.as400.access.AS400; -import com.ibm.as400.access.AS400SecurityException; import com.ibm.as400.access.DataQueue; import com.ibm.as400.access.DataQueueEntry; -import com.ibm.as400.access.ErrorCompletingRequestException; +import com.ibm.as400.access.IFSFileInputStream; +import com.ibm.as400.access.IFSKey; import com.ibm.as400.access.KeyedDataQueue; -import com.ibm.as400.access.KeyedDataQueueEntry; import com.ibm.as400.access.MessageQueue; -import com.ibm.as400.access.ObjectDoesNotExistException; import com.ibm.as400.access.QueuedMessage; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; -import org.apache.commons.lang3.RandomStringUtils; import org.awaitility.Awaitility; import org.eclipse.microprofile.config.ConfigProvider; import org.hamcrest.Matchers; import org.jboss.logging.Logger; -import org.junit.jupiter.api.Assertions; public class Jt400TestResource implements QuarkusTestResourceLifecycleManager { private static final Logger LOGGER = Logger.getLogger(Jt400TestResource.class); @@ -59,8 +53,6 @@ public static enum RESOURCE_TYPE { replyToQueueu; } - private static final Optional JT400_CLEAR_ALL = ConfigProvider.getConfig().getOptionalValue("cq.jt400.clear-all", - String.class); private static final String JT400_URL = ConfigProvider.getConfig().getValue("cq.jt400.url", String.class); private static final String JT400_USERNAME = ConfigProvider.getConfig().getValue("cq.jt400.username", String.class); private static final String JT400_PASSWORD = ConfigProvider.getConfig().getValue("cq.jt400.password", String.class); @@ -73,14 +65,15 @@ public static enum RESOURCE_TYPE { private static final String JT400_LIFO_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.lifo-queue", String.class); private static final String JT400_KEYED_QUEUE = ConfigProvider.getConfig().getValue("cq.jt400.keyed-queue", String.class); + private static final Optional JT400_LOCK_FILE = ConfigProvider.getConfig().getOptionalValue("cq.jt400.lock-file", + String.class); //depth of repetitive reads for lifo queue clearing private final static int CLEAR_DEPTH = 100; - public final static String LOCK_KEY = "cq.jt400.global-lock"; - //5 minute timeout to obtain a log for the tests execution - private final static int LOCK_TIMEOUT = 300000; + //10 minute timeout to obtain a log for the tests execution + private final static int LOCK_TIMEOUT = 600000; - private static AS400 as400 = new AS400(JT400_URL, JT400_USERNAME, JT400_PASSWORD);; + private static AS400 lockAs400; @Override public Map start() { @@ -90,13 +83,9 @@ public Map start() { @Override public void stop() { - if (as400 != null) { - try { - CLIENT_HELPER.clearAll(JT400_CLEAR_ALL.isPresent() && Boolean.parseBoolean(JT400_CLEAR_ALL.get())); - } catch (Exception e) { - LOGGER.debug("Clearing of the external queues failed", e); - } - as400.close(); + //no need to unlock, once the as400 connection is release, the lock is released + if (lockAs400 != null) { + lockAs400.close(); } } @@ -106,27 +95,36 @@ private static String getObjectPath(String object) { public static Jt400ClientHelper CLIENT_HELPER = new Jt400ClientHelper() { - private String key = null; + private boolean cleared = false; Map> toRemove = new HashMap<>(); + IFSFileInputStream lockFile; + IFSKey lockKey; + @Override public QueuedMessage peekReplyToQueueMessage(String msg) throws Exception { return getQueueMessage(JT400_REPLY_TO_MESSAGE_QUEUE, msg); } private QueuedMessage getQueueMessage(String queue, String msg) throws Exception { - MessageQueue messageQueue = new MessageQueue(as400, - getObjectPath(queue)); - Enumeration msgs = messageQueue.getMessages(); + AS400 as400 = createAs400(); + try { + MessageQueue messageQueue = new MessageQueue(as400, + getObjectPath(queue)); + Enumeration msgs = messageQueue.getMessages(); - while (msgs.hasMoreElements()) { - QueuedMessage queuedMessage = msgs.nextElement(); + while (msgs.hasMoreElements()) { + QueuedMessage queuedMessage = msgs.nextElement(); - if (msg.equals(queuedMessage.getText())) { - return queuedMessage; + if (msg.equals(queuedMessage.getText())) { + return queuedMessage; + } } + return null; + + } finally { + as400.close(); } - return null; } @Override @@ -141,192 +139,126 @@ public void registerForRemoval(RESOURCE_TYPE type, Object value) { } @Override - public void clearAll(boolean all) throws Exception { - //message queue - MessageQueue mq = new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)); - if (all) { - mq.remove(); - } else if (toRemove.containsKey(RESOURCE_TYPE.messageQueue)) { - clearMessageQueue(RESOURCE_TYPE.messageQueue, mq); + public boolean clear() throws Exception { + + //clear only once + if (cleared) { + return false; } - //lifo queue - DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE)); - if (all) { + try (AS400 as400 = createAs400()) { + //reply-to queue + new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).remove(); + + //message queue + new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).remove(); + + //lifo queue + DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE)); for (int i = 01; i < CLEAR_DEPTH; i++) { if (dq.read() == null) { break; } } - } else if (toRemove.containsKey(RESOURCE_TYPE.lifoQueueu)) { - for (Object entry : toRemove.get(RESOURCE_TYPE.lifoQueueu)) { - List otherMessages = new LinkedList<>(); - DataQueueEntry dqe = dq.read(); - while (dqe != null && !(entry.equals(dqe.getString()) - || entry.equals(new String(dqe.getData(), StandardCharsets.UTF_8)))) { - otherMessages.add(dqe.getData()); - dqe = dq.read(); - } - //write back other messages in reverse order (it is a lifo) - Collections.reverse(otherMessages); - for (byte[] msg : otherMessages) { - dq.write(msg); - } - } - } - //reply-to queue - MessageQueue rq = new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)); - if (all) { - rq.remove(); - } else if (toRemove.containsKey(RESOURCE_TYPE.replyToQueueu)) { - clearMessageQueue(RESOURCE_TYPE.replyToQueueu, rq); - } - //keyed queue - KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)); - if (all) { - kdq.clear(); - } else if (toRemove.containsKey(RESOURCE_TYPE.keyedDataQue)) { - for (Object entry : toRemove.get(RESOURCE_TYPE.keyedDataQue)) { - kdq.clear((String) entry); - } + //keyed queue + new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)).clear(); } - } - private void clearMessageQueue(RESOURCE_TYPE type, MessageQueue mq) throws AS400SecurityException, - ErrorCompletingRequestException, InterruptedException, IOException, ObjectDoesNotExistException { - if (!toRemove.get(type).isEmpty()) { - List msgs = Collections.list(mq.getMessages()); - Map keys = msgs.stream().collect(Collectors.toMap(q -> q.getText(), q -> q.getKey())); - for (Object entry : toRemove.get(type)) { - if (entry instanceof String) { - mq.remove(keys.get((String) entry)); - } else { - mq.remove((byte[]) entry); - } - } - } + return true; } /** - * Keyed dataque (FIFO) is used for locking purposes. - * - * - Each participant saves unique token into a key cq.jt400.global-lock - * - Each participant the reads the FIFO queue and if the resulted string is its own unique token, execution is allowed - * - When execution ends, the key is removed - * - * If the token is not its own - * -read of the token is repeated until timeout or its own token is returned (so the second participant waits, until the - * first participant removes its token) - * - * Dead lock prevention - * - * - part of the unique token is timestamp, if participant finds a token, which is too old, token is removed - * - action to clear-all data removes also the locking tokens - * - * - * Therefore only 1 token (thus 1 participant) is allowed to run the tests, the others have to wait - * - * @throws Exception + * Locking is implemented via file locking, which is present in JTOpen. */ @Override public void lock() throws Exception { - if (key == null) { - key = generateKey(); - //write key into keyed queue - KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)); - - Assertions.assertTrue(kdq.isFIFO(), "keyed dataqueue has to be FIFO"); - - kdq.write(LOCK_KEY, key); - - //added 5 seconds for the timeout, to have some spare time for removing old locks - Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(LOCK_TIMEOUT + 5000, TimeUnit.SECONDS) - .until( - () -> { - KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY); - if (kdqe == null) { - //if kdqe is null, try to lock again - LOGGER.debug("locked in the queueu was removed, locking again with " + key); - kdq.write(LOCK_KEY, key); - } - String peekedKey = kdqe == null ? null : kdqe.getString(); - //if waiting takes more than 300s, check whether the actual lock can be removed - LOGGER.debug("peeked lock " + peekedKey + "(my lock is " + key + ")"); - - if (peekedKey != null && !key.equals(peekedKey)) { - long peekedTime = Long.parseLong(peekedKey.substring(11)); - if (System.currentTimeMillis() - peekedTime > LOCK_TIMEOUT) { - //read the key (therefore remove it) - String readKey = kdq.read(LOCK_KEY).getString(); - System.out.println("Removed old lock " + readKey); - peekedKey = kdq.peek(LOCK_KEY).getString(); - } - } - return peekedKey; - }, - Matchers.is(key)); - } - } - @Override - public void unlock() throws Exception { - Assertions.assertEquals(key, - new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)).read(LOCK_KEY).getString()); - //clear key - key = null; - } + //if no lock file is proposed, throw an error + if (JT400_LOCK_FILE.isEmpty()) { + throw new IllegalStateException("No file for locking is provided."); + } - private String generateKey() { - return RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT) + ":" + System.currentTimeMillis(); + if (lockKey == null) { + lockAs400 = createAs400(); + lockFile = new IFSFileInputStream(lockAs400, JT400_LOCK_FILE.get()); + + LOGGER.debug("Asked for lock."); + + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(LOCK_TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + try { + lockKey = lockFile.lock(1l); + } catch (Exception e) { + //lock was not acquired + return false; + } + LOGGER.debug("Acquired lock (for file `" + JT400_LOCK_FILE.get() + "`."); + return true; + + }, + Matchers.is(true)); + } } @Override public String dumpQueues() throws Exception { - StringBuilder sb = new StringBuilder(); - - sb.append("\n* MESSAGE QUEUE\n"); - sb.append("\t" + Collections.list(new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).getMessages()) - .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", "))); - - sb.append("\n* INQUIRY QUEUE\n"); - sb.append("\t" + Collections - .list(new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages()) - .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", "))); - - sb.append("\n* LIFO QUEUE\n"); - DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE)); - DataQueueEntry dqe; - List lifoMessages = new LinkedList<>(); - List lifoTexts = new LinkedList<>(); - do { - dqe = dq.read(); - if (dqe != null) { - lifoTexts.add(dqe.getString() + " (" + new String(dqe.getData(), StandardCharsets.UTF_8) + ")"); - lifoMessages.add(dqe.getData()); + AS400 as400 = createAs400(); + try { + StringBuilder sb = new StringBuilder(); + + sb.append("\n* MESSAGE QUEUE\n"); + sb.append("\t" + Collections.list(new MessageQueue(as400, getObjectPath(JT400_MESSAGE_QUEUE)).getMessages()) + .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", "))); + + sb.append("\n* INQUIRY QUEUE\n"); + sb.append("\t" + Collections + .list(new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages()) + .stream().map(mq -> mq.getText()).sorted().collect(Collectors.joining(", "))); + + sb.append("\n* LIFO QUEUE\n"); + DataQueue dq = new DataQueue(as400, getObjectPath(JT400_LIFO_QUEUE)); + DataQueueEntry dqe; + List lifoMessages = new LinkedList<>(); + List lifoTexts = new LinkedList<>(); + do { + dqe = dq.read(); + if (dqe != null) { + lifoTexts.add(dqe.getString() + " (" + new String(dqe.getData(), StandardCharsets.UTF_8) + ")"); + lifoMessages.add(dqe.getData()); + } + } while (dqe != null); + + //write back other messages in reverse order (it is a lifo) + Collections.reverse(lifoMessages); + for (byte[] msg : lifoMessages) { + dq.write(msg); } - } while (dqe != null); + sb.append(lifoTexts.stream().collect(Collectors.joining(", "))); - //write back other messages in reverse order (it is a lifo) - Collections.reverse(lifoMessages); - for (byte[] msg : lifoMessages) { - dq.write(msg); - } - sb.append(lifoTexts.stream().collect(Collectors.joining(", "))); + //there is no api to list keyed queue, without knowledge of keys + return sb.toString(); - sb.append("\n* KEYED DATA QUEUE\n"); - KeyedDataQueue kdq = new KeyedDataQueue(as400, getObjectPath(JT400_KEYED_QUEUE)); - KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY); - sb.append("\tlock: " + (kdqe == null ? "null" : kdqe.getString())); - return sb.toString(); + } finally { + as400.close(); + } } public void sendInquiry(String msg) throws Exception { - new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg, - getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)); + AS400 as400 = createAs400(); + try { + new MessageQueue(as400, getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg, + getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)); + } finally { + as400.close(); + } } }; + private static AS400 createAs400() { + return new AS400(JT400_URL, JT400_USERNAME, JT400_PASSWORD); + } + } interface Jt400ClientHelper { @@ -339,14 +271,12 @@ interface Jt400ClientHelper { //------------------- clear listeners ------------------------------ - void clearAll(boolean all) throws Exception; + boolean clear() throws Exception; //----------------------- locking void lock() throws Exception; - void unlock() throws Exception; - String dumpQueues() throws Exception; }