Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,31 @@ For troubleshooting, or to better understand the handshake performed by the IBM
## Configuration
The configuration options for the Kafka Connect source connector for IBM MQ are as follows:

| Name | Description | Type | Default | Valid values |
| ---------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
| mq.password | The password for authenticating with the queue manager | string | | Password |
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
| topic | The name of the target Kafka topic | string | | Topic name |
| Name | Description | Type | Default | Valid values |
| --------------------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
| topic | The name of the target Kafka topic | string | | Topic name |
| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name |
| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings |
| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] |
| mq.channel.name | The name of the server-connection channel | string | | MQ channel name |
| mq.queue | The name of the source MQ queue | string | | MQ queue name |
| mq.user.name | The user name for authenticating with the queue manager | string | | User name |
| mq.password | The password for authenticating with the queue manager | string | | Password |
| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | |
| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file |
| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder |
| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | |
| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination |
| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | |
| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite |
| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern |
| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file |
| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | |
| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file |
| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | |
| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | |
| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater |
| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | |


### Using a CCDT file
Expand Down Expand Up @@ -277,6 +278,10 @@ To use a file for the `mq.password` in Kubernetes, you create a Secret using the

You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ source connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file.

### Unsupported cipher suite

When configuring TLS connection to MQ, you may find that the queue manager rejects the cipher suite, in spite of the name looking correct. There are two different naming conventions for cipher suites (https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q113220_.htm). Setting the configuration option `mq.ssl.use.ibm.cipher.mappings=false` often resolves cipher suite problems.


## Support
A commercially supported version of this connector is available for customers with a support entitlement for [IBM Event Streams](https://www.ibm.com/cloud/event-streams).
Expand Down
24 changes: 18 additions & 6 deletions config/mq-source.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017, 2018, 2019 IBM Corporation
# Copyright 2017, 2020 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,9 @@ connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
# You can increase this for higher throughput, but message ordering will be lost
tasks.max=1

# The name of the target Kafka topic - required
topic=

# The name of the MQ queue manager - required
mq.queue.manager=

Expand Down Expand Up @@ -73,17 +76,26 @@ mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBu
# The distinguished name pattern of the TLS (SSL) peer - optional
# mq.ssl.peer.name=

# Location and password for the keystore and truststore for SSL (TLS) connections
# mq.ssl.keystore.location=
# mq.ssl.keystore.password=
# mq.ssl.truststore.location=
# mq.ssl.truststore.password=

# Whether to set system property to control use of IBM cipher mappings - optional
# mq.ssl.use.ibm.cipher.mappings=false

# Whether to enable reading of all MQMD fields (default false) - optional
# mq.message.mqmd.read=

# The name of the target Kafka topic - required
topic=
# The maximum number of messages in a batch (unit of work) - optional
# mq.batch.size=250

# The converters control conversion of data between the internal Kafka Connect representation and the messages in Kafka.
# key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
# key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# value.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# value.converter=org.apache.kafka.connect.json.JsonConverter
25 changes: 18 additions & 7 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018, 2019 IBM Corporation
* Copyright 2017, 2020 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,9 @@
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.*;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -36,8 +38,10 @@
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -108,8 +112,13 @@ public void configure(Map<String, String> props) {
String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION);
String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD);
String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP);
String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);
String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC);

if (useIBMCipherMappings != null) {
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings);
}

int transportType = WMQConstants.WMQ_CM_CLIENT;
if (connectionMode != null) {
if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) {
Expand Down Expand Up @@ -489,23 +498,26 @@ private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystor
log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName());

try {
final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyManager[] keyManagers = null;
TrustManager[] trustManagers = null;

if (sslKeystoreLocation != null) {
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray());
keyManagers = kmf.getKeyManagers();
}

if (sslTruststoreLocation != null) {
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(loadKeyStore(sslTruststoreLocation, sslTruststorePassword));
trustManagers = tmf.getTrustManagers();
}

final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
sslContext.init(keyManagers, trustManagers, new SecureRandom());

log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sslContext);
return sslContext;

} catch (GeneralSecurityException e) {
throw new ConnectException("Error creating SSLContext", e);
}
Expand All @@ -520,7 +532,6 @@ private KeyStore loadKeyStore(String location, String password) throws GeneralSe

log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks);
return ks;

} catch (IOException e) {
throw new ConnectException("Error reading keystore " + location, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD = "The password of the JKS truststore to use for the TLS (SSL) connection.";
public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD = "SSL truststore password";

public static final String CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "mq.ssl.use.ibm.cipher.mappings";
public static final String CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Whether to set system property to control use of IBM cipher mappings.";
public static final String CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Use IBM cipher mappings";

public static final String CONFIG_NAME_MQ_BATCH_SIZE = "mq.batch.size";
public static final String CONFIG_DOCUMENTATION_MQ_BATCH_SIZE = "The maximum number of messages in a batch. A batch uses a single unit of work.";
public static final String CONFIG_DISPLAY_MQ_BATCH_SIZE = "Batch size";
Expand Down Expand Up @@ -297,6 +301,10 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM,
CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER);

config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW,
CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 22, Width.SHORT,
CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS);

config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH,
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
CONFIG_DISPLAY_TOPIC);
Expand Down