Skip to content

Commit

Permalink
Merge pull request #22 from dilanSachi/add-message-properties
Browse files Browse the repository at this point in the history
Add additional message properties
  • Loading branch information
dilanSachi committed Nov 8, 2023
2 parents 0e2639a + 9b4768d commit 8792364
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 13 deletions.
6 changes: 3 additions & 3 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "ballerinax"
name = "ibm.ibmmq"
version = "0.1.0"
version = "0.1.1"
authors = ["Ballerina"]
keywords = ["ibm-mq"]
repository = "https://github.com/ballerina-platform/module-ballerinax-ibm.ibmmq"
Expand All @@ -12,8 +12,8 @@ distribution = "2201.8.0"
[[platform.java17.dependency]]
groupId = "io.ballerina.stdlib"
artifactId = "ibm.ibmmq-native"
version = "0.1.0"
path = "../native/build/libs/ibm.ibmmq-native-0.1.0.jar"
version = "0.1.1"
path = "../native/build/libs/ibm.ibmmq-native-0.1.1-SNAPSHOT.jar"

[[platform.java17.dependency]]
groupId = "org.json"
Expand Down
40 changes: 38 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@
dependencies-toml-version = "2"
distribution-version = "2201.8.0"

[[package]]
org = "ballerina"
name = "io"
version = "1.6.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.value"}
]
modules = [
{org = "ballerina", packageName = "io", moduleName = "io"}
]

[[package]]
org = "ballerina"
name = "jballerina.java"
Expand All @@ -24,6 +37,15 @@ dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "lang.value"
version = "0.0.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]

[[package]]
org = "ballerina"
name = "test"
Expand All @@ -37,13 +59,27 @@ modules = [
{org = "ballerina", packageName = "test", moduleName = "test"}
]

[[package]]
org = "ballerina"
name = "time"
version = "2.4.0"
scope = "testOnly"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
modules = [
{org = "ballerina", packageName = "time", moduleName = "time"}
]

[[package]]
org = "ballerinax"
name = "ibm.ibmmq"
version = "0.1.0"
version = "0.1.1"
dependencies = [
{org = "ballerina", name = "io"},
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "test"}
{org = "ballerina", name = "test"},
{org = "ballerina", name = "time"}
]
modules = [
{org = "ballerinax", packageName = "ibm.ibmmq", moduleName = "ibm.ibmmq"}
Expand Down
42 changes: 42 additions & 0 deletions ballerina/tests/queue_producer_consumer_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/test;
import ballerina/time;

@test:Config {
groups: ["ibmmqQueue"]
Expand Down Expand Up @@ -136,3 +138,43 @@ function consumeFromAnInvalidQueueNameTest() returns error? {
}
check queueManager.disconnect();
}

@test:Config {
groups: ["ibmmqQueue"]
}
function produceAndConsumerMessageWithAdditionalPropertiesTest() returns error? {
QueueManager queueManager = check new (name = "QM1", host = "localhost", channel = "DEV.APP.SVRCONN");
Queue producer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_OUTPUT);
Queue consumer = check queueManager.accessQueue("DEV.QUEUE.1", MQOO_INPUT_AS_Q_DEF);
time:Utc timeNow = time:utcNow();
check producer->put({
payload: "Hello World".toBytes(),
correlationId: "1234".toBytes(),
expiry: timeNow[0],
format: "mqformat",
messageId: "test-id".toBytes(),
messageType: 2,
persistence: 0,
priority: 4,
putApplicationType: 28,
replyToQueueManagerName: "QM1",
replyToQueueName: "DEV.QUEUE.1"
});
Message? message = check consumer->get();
if message !is () {
test:assertEquals(string:fromBytes(message.payload), "Hello World");
test:assertEquals(message.expiry, timeNow[0]);
test:assertEquals(message.format, "mqformat");
test:assertEquals(message.messageType, 2);
test:assertEquals(message.persistence, 0);
test:assertEquals(message.priority, 4);
test:assertEquals(message.putApplicationType, 28);
test:assertEquals(message.replyToQueueManagerName, "QM1");
test:assertEquals(message.replyToQueueName, "DEV.QUEUE.1");
} else {
test:assertFail("Expected a value for message");
}
check producer->close();
check consumer->close();
check queueManager.disconnect();
}
20 changes: 20 additions & 0 deletions ballerina/types.bal
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,28 @@ public type Property record {|
# Represents an IBM MQ message.
#
# + properties - Message properties
# + format - Format associated with the header
# + messageId - Message identifier
# + correlationId - Correlation identifier
# + expiry - Message lifetime
# + priority - Message priority
# + persistence - Message persistence
# + messageType - Message type
# + putApplicationType - Type of application that put the message
# + replyToQueueName - Name of reply queue
# + replyToQueueManagerName - Name of reply queue manager
# + payload - Message payload
public type Message record {|
map<Property> properties?;
string format?;
byte[] messageId?;
byte[] correlationId?;
int expiry?;
int priority?;
int persistence?;
int messageType?;
int putApplicationType?;
string replyToQueueName?;
string replyToQueueManagerName?;
byte[] payload;
|};
71 changes: 63 additions & 8 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.ballerina.runtime.api.values.BString;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Objects;
Expand Down Expand Up @@ -63,32 +64,53 @@ public class CommonUtils {
private static final BString PROPERTY_DESCRIPTOR = StringUtils.fromString("descriptor");
private static final BString WAIT_INTERVAL = StringUtils.fromString("waitInterval");
private static final BString OPTIONS = StringUtils.fromString("options");
private static final BString FORMAT_FIELD = StringUtils.fromString("format");
private static final BString MESSAGE_ID_FIELD = StringUtils.fromString("messageId");
private static final BString CORRELATION_ID_FIELD = StringUtils.fromString("correlationId");
private static final BString EXPIRY_FIELD = StringUtils.fromString("expiry");
private static final BString PRIORITY_FIELD = StringUtils.fromString("priority");
private static final BString PERSISTENCE_FIELD = StringUtils.fromString("persistence");
private static final BString MESSAGE_TYPE_FIELD = StringUtils.fromString("messageType");
private static final BString PUT_APPLICATION_TYPE_FIELD = StringUtils.fromString("putApplicationType");
private static final BString REPLY_TO_QUEUE_NAME_FIELD = StringUtils.fromString("replyToQueueName");
private static final BString REPLY_TO_QM_NAME_FIELD = StringUtils.fromString("replyToQueueManagerName");

private static final MQPropertyDescriptor defaultPropertyDescriptor = new MQPropertyDescriptor();

public static MQMessage getMqMessageFromBMessage(BMap<BString, Object> bMessage) {
byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes();
MQMessage mqMessage = new MQMessage();
BMap<BString, Object> properties = (BMap<BString, Object>) bMessage.getMapValue(MESSAGE_PROPERTIES);
if (Objects.nonNull(properties)) {
populateMQProperties(properties, mqMessage);
}
byte[] payload = bMessage.getArrayValue(MESSAGE_PAYLOAD).getBytes();
assignOptionalFieldsToMqMessage(bMessage, mqMessage);
try {
mqMessage.write(payload);
} catch (IOException e) {
throw createError(IBMMQ_ERROR,
String.format("Error occurred while populating payload: %s", e.getMessage()), e);
}
BMap<BString, Object> properties = (BMap<BString, Object>) bMessage.getMapValue(MESSAGE_PROPERTIES);
if (Objects.nonNull(properties)) {
populateMQProperties(properties, mqMessage);
}
return mqMessage;
}

public static BMap<BString, Object> getBMessageFromMQMessage(MQMessage mqMessage) {
BMap<BString, Object> bMessage = ValueCreator.createRecordValue(getModule(), BMESSAGE_NAME);
try {
byte[] payload = new byte[mqMessage.getDataLength()];
mqMessage.readFully(payload);
bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload));
bMessage.put(MESSAGE_PROPERTY, getBProperties(mqMessage));
bMessage.put(FORMAT_FIELD, StringUtils.fromString(mqMessage.format));
bMessage.put(MESSAGE_ID_FIELD, ValueCreator.createArrayValue(mqMessage.messageId));
bMessage.put(CORRELATION_ID_FIELD, ValueCreator.createArrayValue((mqMessage.correlationId)));
bMessage.put(EXPIRY_FIELD, mqMessage.expiry);
bMessage.put(PRIORITY_FIELD, mqMessage.priority);
bMessage.put(PERSISTENCE_FIELD, mqMessage.persistence);
bMessage.put(MESSAGE_TYPE_FIELD, mqMessage.messageType);
bMessage.put(PUT_APPLICATION_TYPE_FIELD, mqMessage.putApplicationType);
bMessage.put(REPLY_TO_QUEUE_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueName.strip()));
bMessage.put(REPLY_TO_QM_NAME_FIELD, StringUtils.fromString(mqMessage.replyToQueueManagerName.strip()));
byte[] payload = mqMessage.readStringOfByteLength(mqMessage.getDataLength())
.getBytes(StandardCharsets.UTF_8);
bMessage.put(MESSAGE_PAYLOAD, ValueCreator.createArrayValue(payload));
return bMessage;
} catch (MQException | IOException e) {
throw createError(IBMMQ_ERROR,
Expand Down Expand Up @@ -154,6 +176,39 @@ private static void handlePropertyValue(BMap<BString, Object> properties, MQMess
}
}

private static void assignOptionalFieldsToMqMessage(BMap<BString, Object> bMessage, MQMessage mqMessage) {
if (bMessage.containsKey(FORMAT_FIELD)) {
mqMessage.format = bMessage.getStringValue(FORMAT_FIELD).getValue();
}
if (bMessage.containsKey(MESSAGE_ID_FIELD)) {
mqMessage.messageId = bMessage.getArrayValue(MESSAGE_ID_FIELD).getByteArray();
}
if (bMessage.containsKey(CORRELATION_ID_FIELD)) {
mqMessage.correlationId = bMessage.getArrayValue(CORRELATION_ID_FIELD).getByteArray();
}
if (bMessage.containsKey(EXPIRY_FIELD)) {
mqMessage.expiry = bMessage.getIntValue(EXPIRY_FIELD).intValue();
}
if (bMessage.containsKey(PRIORITY_FIELD)) {
mqMessage.priority = bMessage.getIntValue(PRIORITY_FIELD).intValue();
}
if (bMessage.containsKey(PERSISTENCE_FIELD)) {
mqMessage.persistence = bMessage.getIntValue(PERSISTENCE_FIELD).intValue();
}
if (bMessage.containsKey(MESSAGE_TYPE_FIELD)) {
mqMessage.messageType = bMessage.getIntValue(MESSAGE_TYPE_FIELD).intValue();
}
if (bMessage.containsKey(PUT_APPLICATION_TYPE_FIELD)) {
mqMessage.putApplicationType = bMessage.getIntValue(PUT_APPLICATION_TYPE_FIELD).intValue();
}
if (bMessage.containsKey(REPLY_TO_QUEUE_NAME_FIELD)) {
mqMessage.replyToQueueName = bMessage.getStringValue(REPLY_TO_QUEUE_NAME_FIELD).getValue();
}
if (bMessage.containsKey(REPLY_TO_QM_NAME_FIELD)) {
mqMessage.replyToQueueManagerName = bMessage.getStringValue(REPLY_TO_QM_NAME_FIELD).getValue();
}
}

private static MQPropertyDescriptor getMQPropertyDescriptor(BMap descriptor) {
MQPropertyDescriptor propertyDescriptor = new MQPropertyDescriptor();
if (descriptor.containsKey(PD_VERSION)) {
Expand Down

0 comments on commit 8792364

Please sign in to comment.