From eee9e53b13e9a10b271f1b5b0ca8c08eb4f12972 Mon Sep 17 00:00:00 2001 From: Joel Hanson Date: Wed, 12 Nov 2025 18:14:03 +0530 Subject: [PATCH] fix: allow headers when message body is not jms Signed-off-by: Joel Hanson --- CONTRIBUTING.md | 4 + .../connect/mqsource/MQSourceTaskIT.java | 286 ++++++++++++++++-- .../builders/DefaultRecordBuilderIT.java | 143 +++++++++ .../builders/JsonRecordBuilderIT.java | 55 ++++ .../mqsource/builders/BaseRecordBuilder.java | 43 ++- 5 files changed, 478 insertions(+), 53 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ce2aa1c..a83953a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -11,6 +11,10 @@ To contribute code or documentation, please submit a [pull request](https://gith A good way to familiarize yourself with the codebase and contribution process is to look for and tackle low-hanging fruit in the [issue tracker](https://github.com/ibm-messaging/kafka-connect-mq-source/issues). +## Testing with ARM64 + +For instructions on testing with ARM64, see [IBM MQ 9.3.3.0 container image now available for Apple Silicon](https://community.ibm.com/community/user/blogs/richard-coppen/2023/06/30/ibm-mq-9330-container-image-now-available-for-appl). + ## Create issues If you would like to implement a new feature, please [raise an issue](https://github.com/ibm-messaging/kafka-connect-mq-source/issues) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index ebf05e4..50d2651 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -1067,34 +1067,21 @@ public void verifyHeadersWithErrorTolerance() throws Exception { assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); // Expected DLQ Headers - // ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, - // value=mytopic, schema=Schema{STRING}), - // ConnectHeader(key=__connect.errors.class.name, - // value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}), - // ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] - // to Kafka Connect data failed due to serialization error: , - // schema=Schema{STRING}), ConnectHeader(key=__connect.errors.timestamp, - // value=1749036171558, schema=Schema{STRING}), - // ConnectHeader(key=__connect.errors.cause.message, - // value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token - // 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', - // 'true' or 'false') - // at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` - // disabled); line: 1, column: 9], schema=Schema{STRING}), - // ConnectHeader(key=__connect.errors.cause.class, - // value=org.apache.kafka.common.errors.SerializationException, - // schema=Schema{STRING}), - // ConnectHeader(key=__connect.errors.exception.stacktrace, - // value=org.apache.kafka.connect.errors.DataException: Converting byte[] to - // Kafka Connect data failed due to serialization error: - // at - // org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333) - // at - // com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81) - // at - // com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238) - // at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... - // [truncated], schema=Schema{STRING})]) + /** + * ConnectHeaders(headers=[ConnectHeader(key=__connect.errors.topic, value=mytopic, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.class.name, value=org.apache.kafka.connect.errors.DataException, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.exception.message, value=Converting byte[] to Kafka Connect data failed due to serialization error: , schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.timestamp, value=1749036171558, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.cause.message, value=com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Invalid': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false') + * at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 9], schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.cause.class, value=org.apache.kafka.common.errors.SerializationException, schema=Schema{STRING}), + * ConnectHeader(key=__connect.errors.exception.stacktrace, value=org.apache.kafka.connect.errors.DataException: + * Converting byte[] to Kafka Connect data failed due to serialization error: + * at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333) + * at com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder.getValue(JsonRecordBuilder.java:81) + * at com.ibm.eventstreams.connect.mqsource.builders.BaseRecordBuilder.toSourceRecord(BaseRecordBuilder.java:238) + * at com.ibm.eventstreams.connect.mqsource.JMSWorker.toSourceRecord(JMSWork... [truncated], schema=Schema{STRING})]) + */ assertThat(headers.lastWithName("__connect.errors.topic").value()) .isEqualTo("mytopic"); assertThat(headers.lastWithName("__connect.errors.exception.class.name").value()) @@ -1497,4 +1484,247 @@ public void testPollWithShortMaxPollTime() throws Exception { assertThat(records.size() < 100); } + + @Test + public void shouldSetJmsPropertiesIFJMSIsDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("message"); + message.setStringProperty("teststring", "myvalue"); + message.setIntProperty("volume", 11); + message.setDoubleProperty("decimalmeaning", 42.0); + + // Both invalid and valid messages are received + final List testMessages = Arrays.asList( + message + ); + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, testMessages); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + assertThat(processedRecords.get(0).topic()).isEqualTo("mytopic"); + + final Headers headers = processedRecords.get(0).headers(); + + // Actual headers + assertThat(headers.lastWithName("teststring").value()).isEqualTo("myvalue"); + assertThat(headers.lastWithName("volume").value()).isEqualTo("11"); + assertThat(headers.lastWithName("decimalmeaning").value()).isEqualTo("42.0"); + } + + @Test + public void shouldSetJmsPropertiesWithDefaultRecordBuilderWhenJMSIsEnabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("test message content"); + message.setStringProperty("customHeader", "headerValue"); + message.setIntProperty("priority", 5); + message.setDoubleProperty("price", 99.99); + message.setBooleanProperty("isActive", true); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + assertThat(processedRecords.get(0).topic()).isEqualTo("mytopic"); + assertThat(processedRecords.get(0).value()).isEqualTo("test message content"); + + final Headers headers = processedRecords.get(0).headers(); + + // Verify JMS properties are copied to Kafka headers + assertThat(headers.lastWithName("customHeader").value()).isEqualTo("headerValue"); + assertThat(headers.lastWithName("priority").value()).isEqualTo("5"); + assertThat(headers.lastWithName("price").value()).isEqualTo("99.99"); + assertThat(headers.lastWithName("isActive").value()).isEqualTo("true"); + } + + @Test + public void shouldNotSetJmsPropertiesWithDefaultRecordBuilderWhenCopyIsDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("test message"); + message.setStringProperty("shouldNotAppear", "value"); + message.setIntProperty("count", 42); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + assertThat(processedRecords.get(0).value()).isEqualTo("test message"); + + final Headers headers = processedRecords.get(0).headers(); + + // Verify no JMS properties are copied when disabled + assertThat(headers.lastWithName("shouldNotAppear")).isNull(); + assertThat(headers.lastWithName("count")).isNull(); + } + + @Test + public void shouldSetJmsPropertiesWithJsonRecordBuilderWhenJMSIsEnabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("{ \"id\": 123, \"name\": \"test\" }"); + message.setStringProperty("correlationId", "corr-123"); + message.setIntProperty("retryCount", 3); + message.setDoubleProperty("amount", 150.75); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + assertThat(processedRecords.get(0).topic()).isEqualTo("mytopic"); + + // Verify JSON content is parsed correctly + final Map value = (Map) processedRecords.get(0).value(); + assertThat(value.get("id")).isEqualTo(123L); + assertThat(value.get("name")).isEqualTo("test"); + + final Headers headers = processedRecords.get(0).headers(); + + // Verify JMS properties are copied to Kafka headers + assertThat(headers.lastWithName("correlationId").value()).isEqualTo("corr-123"); + assertThat(headers.lastWithName("retryCount").value()).isEqualTo("3"); + assertThat(headers.lastWithName("amount").value()).isEqualTo("150.75"); + } + + @Test + public void shouldNotSetJmsPropertiesWithJsonRecordBuilderWhenCopyIsDisabled() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "false"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("{ \"data\": \"value\" }"); + message.setStringProperty("hiddenProperty", "shouldNotAppear"); + message.setIntProperty("hiddenCount", 99); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final Headers headers = processedRecords.get(0).headers(); + + // Verify no JMS properties are copied when disabled + assertThat(headers.lastWithName("hiddenProperty")).isNull(); + assertThat(headers.lastWithName("hiddenCount")).isNull(); + } + + @Test + public void shouldHandleMultipleJmsPropertiesWithDifferentTypesDefaultBuilder() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("multi-property message"); + message.setStringProperty("stringProp", "text"); + message.setIntProperty("intProp", 100); + message.setLongProperty("longProp", 999999999L); + message.setFloatProperty("floatProp", 3.14f); + message.setDoubleProperty("doubleProp", 2.71828); + message.setBooleanProperty("boolProp", false); + message.setByteProperty("byteProp", (byte) 127); + message.setShortProperty("shortProp", (short) 32000); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final Headers headers = processedRecords.get(0).headers(); + + // Verify all property types are correctly converted to string headers + assertThat(headers.lastWithName("stringProp").value()).isEqualTo("text"); + assertThat(headers.lastWithName("intProp").value()).isEqualTo("100"); + assertThat(headers.lastWithName("longProp").value()).isEqualTo("999999999"); + assertThat(headers.lastWithName("floatProp").value()).isEqualTo("3.14"); + assertThat(headers.lastWithName("doubleProp").value()).isEqualTo("2.71828"); + assertThat(headers.lastWithName("boolProp").value()).isEqualTo("false"); + assertThat(headers.lastWithName("byteProp").value()).isEqualTo("127"); + assertThat(headers.lastWithName("shortProp").value()).isEqualTo("32000"); + } + + @Test + public void shouldHandleMultipleJmsPropertiesWithDifferentTypesJsonBuilder() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_TOPIC, "mytopic"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_MESSAGE_BODY_JMS, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, "true"); + connectorConfigProps.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, + "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + connectTask.start(connectorConfigProps); + + final TextMessage message = getJmsContext().createTextMessage("{ \"type\": \"multi-prop\" }"); + message.setStringProperty("env", "production"); + message.setIntProperty("maxRetries", 5); + message.setLongProperty("createdAt", 1609459200000L); + message.setDoubleProperty("threshold", 0.95); + message.setBooleanProperty("enabled", true); + + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, Arrays.asList(message)); + + final List processedRecords = connectTask.poll(); + assertThat(processedRecords).hasSize(1); + + final Headers headers = processedRecords.get(0).headers(); + + // Verify all property types are correctly converted + assertThat(headers.lastWithName("env").value()).isEqualTo("production"); + assertThat(headers.lastWithName("maxRetries").value()).isEqualTo("5"); + assertThat(headers.lastWithName("createdAt").value()).isEqualTo("1609459200000"); + assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95"); + assertThat(headers.lastWithName("enabled").value()).isEqualTo("true"); + } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java index 41dad6f..2607440 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilderIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -31,10 +32,12 @@ import javax.jms.TextMessage; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; import com.ibm.eventstreams.connect.mqsource.AbstractJMSContextIT; +import com.ibm.eventstreams.connect.mqsource.JMSWorker; public class DefaultRecordBuilderIT extends AbstractJMSContextIT { @@ -169,4 +172,144 @@ public void testBuildWithOffset() throws Exception { assertNull(record.valueSchema()); } + @Test + public void testToSourceRecord_DefaultRecordBuilder_AnyMessage_JmsFalse_ByteArrayConverter() throws Exception { + // Test: DefaultRecordBuilder with mq.message.body.jms=false + // Expected: String data output + Map connectorProps = getDefaultConnectorProperties(); + connectorProps.put("mq.message.body.jms", "false"); + connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + JMSWorker worker = new JMSWorker(); + worker.configure(getPropertiesConfig(connectorProps)); + worker.connect(); + + try { + TextMessage textMessage = getJmsContext().createTextMessage("Text message"); + textMessage.setStringProperty("customHeader", "headerValue"); + textMessage.setIntProperty("priority", 5); + textMessage.setDoubleProperty("price", 99.99); + + Map sourceOffset = new HashMap<>(); + sourceOffset.put("sequence-id", 1L); + + Map sourcePartition = new HashMap<>(); + sourcePartition.put("source", "myqmgr/myq"); + + SourceRecord sourceRecord = worker.toSourceRecord(textMessage, true, sourceOffset, sourcePartition); + + assertThat(sourceRecord).isNotNull(); + assertThat(sourceRecord.value()).isInstanceOf(String.class); + assertThat(sourceRecord.valueSchema()).isNull(); + + // Verify data + String value = (String) sourceRecord.value(); + assertThat(value).isNotNull(); + + // Verify JMS properties are copied to Kafka headers + Headers headers = sourceRecord.headers(); + assertThat(headers.lastWithName("customHeader").value()).isEqualTo("headerValue"); + assertThat(headers.lastWithName("priority").value()).isEqualTo("5"); + assertThat(headers.lastWithName("price").value()).isEqualTo("99.99"); + } finally { + worker.stop(); + } + } + + @Test + public void testToSourceRecord_DefaultRecordBuilder_BytesMessage_JmsTrue() throws Exception { + // Test: DefaultRecordBuilder with JMS BytesMessage, mq.message.body.jms=true + // Expected: Binary data output + Map connectorProps = getDefaultConnectorProperties(); + connectorProps.put("mq.message.body.jms", "true"); + connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + JMSWorker worker = new JMSWorker(); + worker.configure(getPropertiesConfig(connectorProps)); + worker.connect(); + + try { + BytesMessage bytesMessage = getJmsContext().createBytesMessage(); + byte[] testData = "Binary bytes message".getBytes(StandardCharsets.UTF_8); + bytesMessage.writeBytes(testData); + bytesMessage.setStringProperty("messageType", "binary"); + bytesMessage.setIntProperty("version", 2); + bytesMessage.setLongProperty("timestamp", 1234567890L); + bytesMessage.reset(); // Reset to make the message readable + + Map sourceOffset = new HashMap<>(); + sourceOffset.put("sequence-id", 2L); + + Map sourcePartition = new HashMap<>(); + sourcePartition.put("source", "myqmgr/myq"); + + SourceRecord sourceRecord = worker.toSourceRecord(bytesMessage, true, sourceOffset, sourcePartition); + + assertThat(sourceRecord).isNotNull(); + assertThat(sourceRecord.value()).isInstanceOf(byte[].class); + assertThat(sourceRecord.valueSchema()).isNull(); + + // Verify binary data matches + byte[] value = (byte[]) sourceRecord.value(); + assertArrayEquals(testData, value); + + // Verify JMS properties are copied to Kafka headers + Headers headers = sourceRecord.headers(); + assertThat(headers.lastWithName("messageType").value()).isEqualTo("binary"); + assertThat(headers.lastWithName("version").value()).isEqualTo("2"); + assertThat(headers.lastWithName("timestamp").value()).isEqualTo("1234567890"); + } finally { + worker.stop(); + } + } + + @Test + public void testToSourceRecord_DefaultRecordBuilder_TextMessage_JmsTrue() throws Exception { + // Test: DefaultRecordBuilder with JMS TextMessage, mq.message.body.jms=true and StringConverter + // Expected: String data output + Map connectorProps = getDefaultConnectorProperties(); + connectorProps.put("mq.message.body.jms", "true"); + connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + JMSWorker worker = new JMSWorker(); + worker.configure(getPropertiesConfig(connectorProps)); + worker.connect(); + + try { + String testText = "This is a text message"; + TextMessage textMessage = getJmsContext().createTextMessage(testText); + textMessage.setStringProperty("source", "system-a"); + textMessage.setIntProperty("retryCount", 3); + textMessage.setDoubleProperty("threshold", 0.95); + textMessage.setBooleanProperty("enabled", true); + + Map sourceOffset = new HashMap<>(); + sourceOffset.put("sequence-id", 3L); + + Map sourcePartition = new HashMap<>(); + sourcePartition.put("source", "myqmgr/myq"); + + SourceRecord sourceRecord = worker.toSourceRecord(textMessage, true, sourceOffset, sourcePartition); + + assertThat(sourceRecord).isNotNull(); + assertThat(sourceRecord.value()).isInstanceOf(String.class); + assertNull(sourceRecord.valueSchema()); + + // Verify string data matches + String value = (String) sourceRecord.value(); + assertEquals(testText, value); + + // Verify JMS properties are copied to Kafka headers + Headers headers = sourceRecord.headers(); + assertThat(headers.lastWithName("source").value()).isEqualTo("system-a"); + assertThat(headers.lastWithName("retryCount").value()).isEqualTo("3"); + assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95"); + assertThat(headers.lastWithName("enabled").value()).isEqualTo("true"); + } finally { + worker.stop(); + } + } } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java index ded71b4..93e33df 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java @@ -15,6 +15,7 @@ */ package com.ibm.eventstreams.connect.mqsource.builders; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -29,10 +30,12 @@ import javax.jms.TextMessage; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; import com.ibm.eventstreams.connect.mqsource.AbstractJMSContextIT; +import com.ibm.eventstreams.connect.mqsource.JMSWorker; public class JsonRecordBuilderIT extends AbstractJMSContextIT { @@ -150,4 +153,56 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception { builder.toSourceRecord(getJmsContext(), topic, isJMS, message); }); } + + + @Test + public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception { + // Test: JsonRecordBuilder with JSON message and JsonConverter + // Expected: JSON output with no schema + Map connectorProps = getDefaultConnectorProperties(); + connectorProps.put("mq.message.body.jms", "true"); // Not used by JsonRecordBuilder + connectorProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorProps.put("mq.jms.properties.copy.to.kafka.headers", "true"); + + JMSWorker worker = new JMSWorker(); + worker.configure(getPropertiesConfig(connectorProps)); + worker.connect(); + + try { + String jsonText = "{ \"id\": 123, \"name\": \"test\", \"active\": true }"; + TextMessage textMessage = getJmsContext().createTextMessage(jsonText); + textMessage.setStringProperty("source", "system-a"); + textMessage.setIntProperty("retryCount", 3); + textMessage.setDoubleProperty("threshold", 0.95); + textMessage.setBooleanProperty("enabled", true); + + Map sourceOffset = new HashMap<>(); + sourceOffset.put("sequence-id", 4L); + + Map sourcePartition = new HashMap<>(); + sourcePartition.put("source", "myqmgr/myq"); + + SourceRecord sourceRecord = worker.toSourceRecord(textMessage, true, sourceOffset, sourcePartition); + + assertThat(sourceRecord).isNotNull(); + assertThat(sourceRecord.value()).isInstanceOf(Map.class); + assertNull(sourceRecord.valueSchema()); // JSON with no schema + + // Verify JSON data + @SuppressWarnings("unchecked") + Map value = (Map) sourceRecord.value(); + assertEquals(123L, value.get("id")); + assertEquals("test", value.get("name")); + assertEquals(true, value.get("active")); + + // Verify JMS properties are copied to Kafka headers + Headers headers = sourceRecord.headers(); + assertThat(headers.lastWithName("source").value()).isEqualTo("system-a"); + assertThat(headers.lastWithName("retryCount").value()).isEqualTo("3"); + assertThat(headers.lastWithName("threshold").value()).isEqualTo("0.95"); + assertThat(headers.lastWithName("enabled").value()).isEqualTo("true"); + } finally { + worker.stop(); + } + } } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java index 4d18a6f..8d0c545 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java @@ -1,5 +1,5 @@ /** - * Copyright 2018, 2019, 2023, 2024 IBM Corporation + * Copyright 2018, 2019, 2023, 2024, 2025 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,31 +204,23 @@ public SourceRecord toSourceRecord(final JMSContext context, final String topic, // Extract key and value final SchemaAndValue value = this.getValue(context, topic, messageBodyJms, message); key = this.getKey(context, topic, message); - // Create and return appropriate record based on configuration - if (copyJmsPropertiesFlag && messageBodyJms) { - return new SourceRecord( - sourceQueuePartition, - sourceOffset, - topic, - null, - key.schema(), - key.value(), - value.schema(), - value.value(), - message.getJMSTimestamp(), - jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message)); - } else { - return new SourceRecord( - sourceQueuePartition, - sourceOffset, - topic, - null, - key.schema(), - key.value(), - value.schema(), - value.value()); - } + final Long timestamp = messageBodyJms ? message.getJMSTimestamp() : null; + final Iterable
headers = copyJmsPropertiesFlag + ? jmsToKafkaHeaderConverter.convertJmsPropertiesToKafkaHeaders(message) + : null; + + return new SourceRecord( + sourceQueuePartition, + sourceOffset, + topic, + null, + key.schema(), + key.value(), + value.schema(), + value.value(), + timestamp, + headers); } catch (final Exception e) { // Log the error using error handler errorHandler.logError(e, topic, message);