Skip to content

Commit

Permalink
Add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
dilanSachi committed Nov 7, 2023
1 parent 1671151 commit 9b4768d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
42 changes: 42 additions & 0 deletions ballerina/tests/queue_producer_consumer_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/test;
import ballerina/time;

@test:Config {
groups: ["ibmmqQueue"]
Expand Down Expand Up @@ -136,3 +138,43 @@ function consumeFromAnInvalidQueueNameTest() returns error? {
}
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue"]
}
function produceAndConsumerMessageWithAdditionalPropertiesTest() returns error? {
QueueManager queueManager = check new (name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Queue producer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_OUTPUT);
Queue consumer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_INPUT_AS_Q_DEF);
time:Utc timeNow = time:utcNow();
check producer->put({
payload: "Hello World".toBytes(),
correlationId: "1234".toBytes(),
expiry: timeNow[0],
format: "mqformat",
messageId: "test-id".toBytes(),
messageType: 2,
persistence: 0,
priority: 4,
putApplicationType: 28,
replyToQueueManagerName: "QM1",
replyToQueueName: "DEV.QUEUE.1"
});
Message? message = check consumer->get();
if message !is () {
test:assertEquals(string:fromBytes(message.payload), "Hello World");
test:assertEquals(message.expiry, timeNow[0]);
test:assertEquals(message.format, "mqformat");
test:assertEquals(message.messageType, 2);
test:assertEquals(message.persistence, 0);
test:assertEquals(message.priority, 4);
test:assertEquals(message.putApplicationType, 28);
test:assertEquals(message.replyToQueueManagerName, "QM1");
test:assertEquals(message.replyToQueueName, "DEV.QUEUE.1");
} else {
test:assertFail("Expected a value for message");
}
check producer->close();
check consumer->close();
check queueManager.disconnect();
}
25 changes: 17 additions & 8 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import io.ballerina.runtime.api.creators.TypeCreator;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.utils.StringUtils;
import io.ballerina.runtime.api.values.BArray;
import io.ballerina.runtime.api.values.BError;
import io.ballerina.runtime.api.values.BMap;
import io.ballerina.runtime.api.values.BString;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Objects;
Expand Down Expand Up @@ -64,6 +64,16 @@ public class CommonUtils {
private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");
private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
private static final BString OPTIONS = StringUtils.fromString("options");
private static final BString FORMAT_FIELD = StringUtils.fromString("format");
private static final BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId");
private static final BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId");
private static final BString EXPIRY_FIELD = StringUtils.fromString("expiry");
private static final BString PRIORITY_FIELD = StringUtils.fromString("priority");
private static final BString PERSISTENCE_FIELD = StringUtils.fromString("persistence");
private static final BString MESSAGE_TYPE_FIELD = StringUtils.fromString("messageType");
private static final BString PUT_APPLICATION_TYPE_FIELD = StringUtils.fromString("putApplicationType");
private static final BString REPLY_TO_QUEUE_NAME_FIELD = StringUtils.fromString("replyToQueueName");
private static final BString REPLY_TO_QM_NAME_FIELD = StringUtils.fromString("replyToQueueManagerName");

private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();

Expand All @@ -84,21 +94,20 @@ public static MQMessage getMqMessageFromBMessage(BMap<BString, Object> bMessage)
return mqMessage;
}

public static BMap<BString, Object> getBMessageFromMQMessage(Runtime runtime, MQMessage mqMessage) {
public static BMap<BString, Object> getBMessageFromMQMessage(MQMessage mqMessage) {
BMap<BString, Object> bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME);
try {
bMessage.put(MESSAGE_HEADERS, getBHeaders(runtime, mqMessage));
bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage));
bMessage.put(FORMAT_FIELD, StringUtils.fromString(mqMessage.format));
bMessage.put(MESSAGE_ID_FIELD, ValueCreator.createArrayValue(mqMessage.messageId));
bMessage.put(CORRELATION_ID_FIELD, ValueCreator.createArrayValue(mqMessage.correlationId));
bMessage.put(CORRELATION_ID_FIELD, ValueCreator.createArrayValue((mqMessage.correlationId)));
bMessage.put(EXPIRY_FIELD, mqMessage.expiry);
bMessage.put(PRIORITY_FIELD, mqMessage.priority);
bMessage.put(PERSISTENCE_FIELD, mqMessage.persistence);
bMessage.put(MESSAGE_TYPE_FIELD, mqMessage.messageType);
bMessage.put(PUT_APPLICATION_TYPE_FIELD, mqMessage.putApplicationType);
bMessage.put(REPLY_TO_QUEUE_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueName));
bMessage.put(REPLY_TO_QM_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueManagerName));
bMessage.put(REPLY_TO_QUEUE_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueName.strip()));
bMessage.put(REPLY_TO_QM_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueManagerName.strip()));
byte[] payload = mqMessage.readStringOfByteLength(mqMessage.getDataLength())
.getBytes(StandardCharsets.UTF_8);
bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload));
Expand Down Expand Up @@ -172,10 +181,10 @@ private static void assignOptionalFieldsToMqMessage(BMap<BString, Object> bMessa
mqMessage.format = bMessage.getStringValue(FORMAT_FIELD).getValue();
}
if (bMessage.containsKey(MESSAGE_ID_FIELD)) {
mqMessage.messageId = bMessage.getArrayValue(MESSAGE_ID_FIELD).getBytes();
mqMessage.messageId = bMessage.getArrayValue(MESSAGE_ID_FIELD).getByteArray();
}
if (bMessage.containsKey(CORRELATION_ID_FIELD)) {
mqMessage.correlationId = bMessage.getArrayValue(CORRELATION_ID_FIELD).getBytes();
mqMessage.correlationId = bMessage.getArrayValue(CORRELATION_ID_FIELD).getByteArray();
}
if (bMessage.containsKey(EXPIRY_FIELD)) {
mqMessage.expiry = bMessage.getIntValue(EXPIRY_FIELD).intValue();
Expand Down

0 comments on commit 9b4768d

Please sign in to comment.