diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 23ab622..0d78e9c 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,7 +1,7 @@ [package] org = "ballerinax" name = "ibm.ibmmq" -version = "0.1.0" +version = "0.1.1" authors = ["Ballerina"] keywords = ["ibm-mq"] repository = "https://github.com/ballerina-platform/module-ballerinax-ibm.ibmmq" @@ -12,8 +12,8 @@ distribution = "2201.8.0" [[platform.java17.dependency]] groupId = "io.ballerina.stdlib" artifactId = "ibm.ibmmq-native" -version = "0.1.0" -path = "../native/build/libs/ibm.ibmmq-native-0.1.0.jar" +version = "0.1.1" +path = "../native/build/libs/ibm.ibmmq-native-0.1.1-SNAPSHOT.jar" [[platform.java17.dependency]] groupId = "org.json" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 7e95fda..d21012b 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -7,6 +7,19 @@ dependencies-toml-version = "2" distribution-version = "2201.8.0" +[[package]] +org = "ballerina" +name = "io" +version = "1.6.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.value"} +] +modules = [ + {org = "ballerina", packageName = "io", moduleName = "io"} +] + [[package]] org = "ballerina" name = "jballerina.java" @@ -24,6 +37,15 @@ dependencies = [ {org = "ballerina", name = "jballerina.java"} ] +[[package]] +org = "ballerina" +name = "lang.value" +version = "0.0.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] + [[package]] org = "ballerina" name = "test" @@ -37,13 +59,27 @@ modules = [ {org = "ballerina", packageName = "test", moduleName = "test"} ] +[[package]] +org = "ballerina" +name = "time" +version = "2.4.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"} +] +modules = [ + {org = "ballerina", packageName = "time", moduleName = "time"} +] + [[package]] org = "ballerinax" name = "ibm.ibmmq" -version = "0.1.0" +version = "0.1.1" dependencies = [ + {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, - {org = "ballerina", name = "test"} + {org = "ballerina", name = "test"}, + {org = "ballerina", name = "time"} ] modules = [ {org = "ballerinax", packageName = "ibm.ibmmq", moduleName = "ibm.ibmmq"} diff --git a/ballerina/tests/queue_producer_consumer_tests.bal b/ballerina/tests/queue_producer_consumer_tests.bal index b477555..cc210de 100644 --- a/ballerina/tests/queue_producer_consumer_tests.bal +++ b/ballerina/tests/queue_producer_consumer_tests.bal @@ -13,7 +13,9 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + import ballerina/test; +import ballerina/time; @test:Config { groups: ["ibmmqQueue"] @@ -136,3 +138,43 @@ function consumeFromAnInvalidQueueNameTest() returns error? { } check queueManager.disconnect(); } + +@test:Config { + groups: ["ibmmqQueue"] +} +function produceAndConsumerMessageWithAdditionalPropertiesTest() returns error? { + QueueManager queueManager = check new (name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN"); + Queue producer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_OUTPUT); + Queue consumer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_INPUT_AS_Q_DEF); + time:Utc timeNow = time:utcNow(); + check producer->put({ + payload: "Hello World".toBytes(), + correlationId: "1234".toBytes(), + expiry: timeNow[0], + format: "mqformat", + messageId: "test-id".toBytes(), + messageType: 2, + persistence: 0, + priority: 4, + putApplicationType: 28, + replyToQueueManagerName: "QM1", + replyToQueueName: "DEV.QUEUE.1" + }); + Message? message = check consumer->get(); + if message !is () { + test:assertEquals(string:fromBytes(message.payload), "Hello World"); + test:assertEquals(message.expiry, timeNow[0]); + test:assertEquals(message.format, "mqformat"); + test:assertEquals(message.messageType, 2); + test:assertEquals(message.persistence, 0); + test:assertEquals(message.priority, 4); + test:assertEquals(message.putApplicationType, 28); + test:assertEquals(message.replyToQueueManagerName, "QM1"); + test:assertEquals(message.replyToQueueName, "DEV.QUEUE.1"); + } else { + test:assertFail("Expected a value for message"); + } + check producer->close(); + check consumer->close(); + check queueManager.disconnect(); +} diff --git a/ballerina/types.bal b/ballerina/types.bal index 465e248..2b96a6d 100644 --- a/ballerina/types.bal +++ b/ballerina/types.bal @@ -56,8 +56,28 @@ public type Property record {| # Represents an IBM MQ message. # # + properties - Message properties +# + format - Format associated with the header +# + messageId - Message identifier +# + correlationId - Correlation identifier +# + expiry - Message lifetime +# + priority - Message priority +# + persistence - Message persistence +# + messageType - Message type +# + putApplicationType - Type of application that put the message +# + replyToQueueName - Name of reply queue +# + replyToQueueManagerName - Name of reply queue manager # + payload - Message payload public type Message record {| map properties?; + string format?; + byte[] messageId?; + byte[] correlationId?; + int expiry?; + int priority?; + int persistence?; + int messageType?; + int putApplicationType?; + string replyToQueueName?; + string replyToQueueManagerName?; byte[] payload; |}; diff --git a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java index 295a071..5bb9414 100644 --- a/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java +++ b/native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java @@ -32,6 +32,7 @@ import io.ballerina.runtime.api.values.BString; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Enumeration; import java.util.Objects; @@ -63,32 +64,53 @@ public class CommonUtils { private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor"); private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval"); private static final BString OPTIONS = StringUtils.fromString("options"); + private static final BString FORMAT_FIELD = StringUtils.fromString("format"); + private static final BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId"); + private static final BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId"); + private static final BString EXPIRY_FIELD = StringUtils.fromString("expiry"); + private static final BString PRIORITY_FIELD = StringUtils.fromString("priority"); + private static final BString PERSISTENCE_FIELD = StringUtils.fromString("persistence"); + private static final BString MESSAGE_TYPE_FIELD = StringUtils.fromString("messageType"); + private static final BString PUT_APPLICATION_TYPE_FIELD = StringUtils.fromString("putApplicationType"); + private static final BString REPLY_TO_QUEUE_NAME_FIELD = StringUtils.fromString("replyToQueueName"); + private static final BString REPLY_TO_QM_NAME_FIELD = StringUtils.fromString("replyToQueueManagerName"); private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor(); public static MQMessage getMqMessageFromBMessage(BMap bMessage) { - byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes(); MQMessage mqMessage = new MQMessage(); + BMap properties = (BMap) bMessage.getMapValue(MESSAGE_PROPERTIES); + if (Objects.nonNull(properties)) { + populateMQProperties(properties, mqMessage); + } + byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes(); + assignOptionalFieldsToMqMessage(bMessage, mqMessage); try { mqMessage.write(payload); } catch (IOException e) { throw createError(IBMMQ_ERROR, String.format("Error occurred while populating payload: %s", e.getMessage()), e); } - BMap properties = (BMap) bMessage.getMapValue(MESSAGE_PROPERTIES); - if (Objects.nonNull(properties)) { - populateMQProperties(properties, mqMessage); - } return mqMessage; } public static BMap getBMessageFromMQMessage(MQMessage mqMessage) { BMap bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME); try { - byte[] payload = new byte[mqMessage.getDataLength()]; - mqMessage.readFully(payload); - bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload)); bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage)); + bMessage.put(FORMAT_FIELD, StringUtils.fromString(mqMessage.format)); + bMessage.put(MESSAGE_ID_FIELD, ValueCreator.createArrayValue(mqMessage.messageId)); + bMessage.put(CORRELATION_ID_FIELD, ValueCreator.createArrayValue((mqMessage.correlationId))); + bMessage.put(EXPIRY_FIELD, mqMessage.expiry); + bMessage.put(PRIORITY_FIELD, mqMessage.priority); + bMessage.put(PERSISTENCE_FIELD, mqMessage.persistence); + bMessage.put(MESSAGE_TYPE_FIELD, mqMessage.messageType); + bMessage.put(PUT_APPLICATION_TYPE_FIELD, mqMessage.putApplicationType); + bMessage.put(REPLY_TO_QUEUE_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueName.strip())); + bMessage.put(REPLY_TO_QM_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueManagerName.strip())); + byte[] payload = mqMessage.readStringOfByteLength(mqMessage.getDataLength()) + .getBytes(StandardCharsets.UTF_8); + bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload)); return bMessage; } catch (MQException | IOException e) { throw createError(IBMMQ_ERROR, @@ -154,6 +176,39 @@ private static void handlePropertyValue(BMap properties, MQMess } } + private static void assignOptionalFieldsToMqMessage(BMap bMessage, MQMessage mqMessage) { + if (bMessage.containsKey(FORMAT_FIELD)) { + mqMessage.format = bMessage.getStringValue(FORMAT_FIELD).getValue(); + } + if (bMessage.containsKey(MESSAGE_ID_FIELD)) { + mqMessage.messageId = bMessage.getArrayValue(MESSAGE_ID_FIELD).getByteArray(); + } + if (bMessage.containsKey(CORRELATION_ID_FIELD)) { + mqMessage.correlationId = bMessage.getArrayValue(CORRELATION_ID_FIELD).getByteArray(); + } + if (bMessage.containsKey(EXPIRY_FIELD)) { + mqMessage.expiry = bMessage.getIntValue(EXPIRY_FIELD).intValue(); + } + if (bMessage.containsKey(PRIORITY_FIELD)) { + mqMessage.priority = bMessage.getIntValue(PRIORITY_FIELD).intValue(); + } + if (bMessage.containsKey(PERSISTENCE_FIELD)) { + mqMessage.persistence = bMessage.getIntValue(PERSISTENCE_FIELD).intValue(); + } + if (bMessage.containsKey(MESSAGE_TYPE_FIELD)) { + mqMessage.messageType = bMessage.getIntValue(MESSAGE_TYPE_FIELD).intValue(); + } + if (bMessage.containsKey(PUT_APPLICATION_TYPE_FIELD)) { + mqMessage.putApplicationType = bMessage.getIntValue(PUT_APPLICATION_TYPE_FIELD).intValue(); + } + if (bMessage.containsKey(REPLY_TO_QUEUE_NAME_FIELD)) { + mqMessage.replyToQueueName = bMessage.getStringValue(REPLY_TO_QUEUE_NAME_FIELD).getValue(); + } + if (bMessage.containsKey(REPLY_TO_QM_NAME_FIELD)) { + mqMessage.replyToQueueManagerName = bMessage.getStringValue(REPLY_TO_QM_NAME_FIELD).getValue(); + } + } + private static MQPropertyDescriptor getMQPropertyDescriptor(BMap descriptor) { MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor(); if (descriptor.containsKey(PD_VERSION)) {