From 7ec833896824d65aee553b781cb5e9f9ec5bc746 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 6 May 2020 20:57:06 +0100 Subject: [PATCH] Enable one-way TLS with config properties. Signed-off-by: Andrew Schofield --- README.md | 53 ++++++++++--------- config/mq-source.properties | 24 ++++++--- .../connect/mqsource/JMSReader.java | 25 ++++++--- .../connect/mqsource/MQSourceConnector.java | 8 +++ 4 files changed, 73 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index a46c73c..d120900 100644 --- a/README.md +++ b/README.md @@ -213,30 +213,31 @@ For troubleshooting, or to better understand the handshake performed by the IBM ## Configuration The configuration options for the Kafka Connect source connector for IBM MQ are as follows: -| Name | Description | Type | Default | Valid values | -| ---------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- | -| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name | -| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings | -| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] | -| mq.channel.name | The name of the server-connection channel | string | | MQ channel name | -| mq.queue | The name of the source MQ queue | string | | MQ queue name | -| mq.user.name | The user name for authenticating with the queue manager | string | | User name | -| mq.password | The password for authenticating with the queue manager | string | | Password | -| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file | -| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder | -| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | | -| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination | -| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | | -| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | -| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | -| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file | -| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | | -| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file | -| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | | -| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | -| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | | -| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | | -| topic | The name of the target Kafka topic | string | | Topic name | +| Name | Description | Type | Default | Valid values | +| --------------------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- | +| topic | The name of the target Kafka topic | string | | Topic name | +| mq.queue.manager | The name of the MQ queue manager | string | | MQ queue manager name | +| mq.connection.mode | The connection mode - bindings or client | string | client | client, bindings | +| mq.connection.name.list | List of connection names for queue manager | string | | host(port)[,host(port),...] | +| mq.channel.name | The name of the server-connection channel | string | | MQ channel name | +| mq.queue | The name of the source MQ queue | string | | MQ queue name | +| mq.user.name | The user name for authenticating with the queue manager | string | | User name | +| mq.password | The password for authenticating with the queue manager | string | | Password | +| mq.user.authentication.mqcsp | Whether to use MQ connection security parameters (MQCSP) | boolean | true | | +| mq.ccdt.url | The URL for the CCDT file containing MQ connection details | string | | URL for obtaining a CCDT file | +| mq.record.builder | The class used to build the Kafka Connect record | string | | Class implementing RecordBuilder | +| mq.message.body.jms | Whether to interpret the message body as a JMS message type | boolean | false | | +| mq.record.builder.key.header | The JMS message header to use as the Kafka record key | string | | JMSMessageID, JMSCorrelationID, JMSCorrelationIDAsBytes, JMSDestination | +| mq.jms.properties.copy.to.kafka.headers | Whether to copy JMS message properties to Kafka headers | boolean | false | | +| mq.ssl.cipher.suite | The name of the cipher suite for TLS (SSL) connection | string | | Blank or valid cipher suite | +| mq.ssl.peer.name | The distinguished name pattern of the TLS (SSL) peer | string | | Blank or DN pattern | +| mq.ssl.keystore.location | The path to the JKS keystore to use for SSL (TLS) connections | string | JVM keystore | Local path to a JKS file | +| mq.ssl.keystore.password | The password of the JKS keystore to use for SSL (TLS) connections | string | | | +| mq.ssl.truststore.location | The path to the JKS truststore to use for SSL (TLS) connections | string | JVM truststore | Local path to a JKS file | +| mq.ssl.truststore.password | The password of the JKS truststore to use for SSL (TLS) connections | string | | | +| mq.ssl.use.ibm.cipher.mappings | Whether to set system property to control use of IBM cipher mappings | boolean | | | +| mq.batch.size | The maximum number of messages in a batch (unit of work) | integer | 250 | 1 or greater | +| mq.message.mqmd.read | Whether to enable reading of all MQMD fields | boolean | false | | ### Using a CCDT file @@ -277,6 +278,10 @@ To use a file for the `mq.password` in Kubernetes, you create a Secret using the You may receive an `org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed` error when trying to run the MQ source connector using SSL to connect to your Kafka cluster. In the case that the error is caused by the following exception: `Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching XXXXX found.`, Java may be replacing the IP address of your cluster with the corresponding hostname in your `/etc/hosts` file. For example, to push Docker images to a custom Docker repository, you may add an entry in this file which corresponds to the IP of your repository e.g. `123.456.78.90 mycluster.icp`. To fix this, you can comment out this line in your `/etc/hosts` file. +### Unsupported cipher suite + +When configuring TLS connection to MQ, you may find that the queue manager rejects the cipher suite, in spite of the name looking correct. There are two different naming conventions for cipher suites (https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q113220_.htm). Setting the configuration option `mq.ssl.use.ibm.cipher.mappings=false` often resolves cipher suite problems. + ## Support A commercially supported version of this connector is available for customers with a support entitlement for [IBM Event Streams](https://www.ibm.com/cloud/event-streams). diff --git a/config/mq-source.properties b/config/mq-source.properties index 6ebc1b9..5628f39 100644 --- a/config/mq-source.properties +++ b/config/mq-source.properties @@ -1,4 +1,4 @@ -# Copyright 2017, 2018, 2019 IBM Corporation +# Copyright 2017, 2020 IBM Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +18,9 @@ connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector # You can increase this for higher throughput, but message ordering will be lost tasks.max=1 +# The name of the target Kafka topic - required +topic= + # The name of the MQ queue manager - required mq.queue.manager= @@ -73,17 +76,26 @@ mq.record.builder=com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBu # The distinguished name pattern of the TLS (SSL) peer - optional # mq.ssl.peer.name= +# Location and password for the keystore and truststore for SSL (TLS) connections +# mq.ssl.keystore.location= +# mq.ssl.keystore.password= +# mq.ssl.truststore.location= +# mq.ssl.truststore.password= + +# Whether to set system property to control use of IBM cipher mappings - optional +# mq.ssl.use.ibm.cipher.mappings=false + # Whether to enable reading of all MQMD fields (default false) - optional # mq.message.mqmd.read= -# The name of the target Kafka topic - required -topic= +# The maximum number of messages in a batch (unit of work) - optional +# mq.batch.size=250 # The converters control conversion of data between the internal Kafka Connect representation and the messages in Kafka. # key.converter=org.apache.kafka.connect.converters.ByteArrayConverter -# key.converter=org.apache.kafka.connect.storage.StringConverter +key.converter=org.apache.kafka.connect.storage.StringConverter # key.converter=org.apache.kafka.connect.json.JsonConverter -value.converter=org.apache.kafka.connect.converters.ByteArrayConverter -# value.converter=org.apache.kafka.connect.storage.StringConverter +# value.converter=org.apache.kafka.connect.converters.ByteArrayConverter +value.converter=org.apache.kafka.connect.storage.StringConverter # value.converter=org.apache.kafka.connect.json.JsonConverter diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java index 2e488a2..34f6274 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java @@ -1,5 +1,5 @@ /** - * Copyright 2017, 2018, 2019 IBM Corporation + * Copyright 2017, 2020 IBM Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,9 @@ import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.security.*; +import java.security.GeneralSecurityException; +import java.security.KeyStore; +import java.security.SecureRandom; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,8 +38,10 @@ import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.jms.Message; +import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import org.apache.kafka.connect.errors.ConnectException; @@ -108,8 +112,13 @@ public void configure(Map props) { String sslTruststoreLocation = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_LOCATION); String sslTruststorePassword = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_TRUSTSTORE_PASSWORD); String useMQCSP = props.get(MQSourceConnector.CONFIG_NAME_MQ_USER_AUTHENTICATION_MQCSP); + String useIBMCipherMappings = props.get(MQSourceConnector.CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); String topic = props.get(MQSourceConnector.CONFIG_NAME_TOPIC); + if (useIBMCipherMappings != null) { + System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", useIBMCipherMappings); + } + int transportType = WMQConstants.WMQ_CM_CLIENT; if (connectionMode != null) { if (connectionMode.equals(MQSourceConnector.CONFIG_VALUE_MQ_CONNECTION_MODE_CLIENT)) { @@ -489,23 +498,26 @@ private SSLContext buildSslContext(String sslKeystoreLocation, String sslKeystor log.trace("[{}] Entry {}.buildSslContext", Thread.currentThread().getId(), this.getClass().getName()); try { - final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + KeyManager[] keyManagers = null; + TrustManager[] trustManagers = null; if (sslKeystoreLocation != null) { + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(loadKeyStore(sslKeystoreLocation, sslKeystorePassword), sslKeystorePassword.toCharArray()); + keyManagers = kmf.getKeyManagers(); } if (sslTruststoreLocation != null) { + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(loadKeyStore(sslTruststoreLocation, sslTruststorePassword)); + trustManagers = tmf.getTrustManagers(); } final SSLContext sslContext = SSLContext.getInstance("TLS"); - sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + sslContext.init(keyManagers, trustManagers, new SecureRandom()); log.trace("[{}] Exit {}.buildSslContext, retval={}", Thread.currentThread().getId(), this.getClass().getName(), sslContext); return sslContext; - } catch (GeneralSecurityException e) { throw new ConnectException("Error creating SSLContext", e); } @@ -520,7 +532,6 @@ private KeyStore loadKeyStore(String location, String password) throws GeneralSe log.trace("[{}] Exit {}.loadKeyStore, retval={}", Thread.currentThread().getId(), this.getClass().getName(), ks); return ks; - } catch (IOException e) { throw new ConnectException("Error reading keystore " + location, e); } diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 3f6d769..bd47cc1 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -113,6 +113,10 @@ public class MQSourceConnector extends SourceConnector { public static final String CONFIG_DOCUMENTATION_MQ_SSL_TRUSTSTORE_PASSWORD = "The password of the JKS truststore to use for the TLS (SSL) connection."; public static final String CONFIG_DISPLAY_MQ_SSL_TRUSTSTORE_PASSWORD = "SSL truststore password"; + public static final String CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "mq.ssl.use.ibm.cipher.mappings"; + public static final String CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Whether to set system property to control use of IBM cipher mappings."; + public static final String CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS = "Use IBM cipher mappings"; + public static final String CONFIG_NAME_MQ_BATCH_SIZE = "mq.batch.size"; public static final String CONFIG_DOCUMENTATION_MQ_BATCH_SIZE = "The maximum number of messages in a batch. A batch uses a single unit of work."; public static final String CONFIG_DISPLAY_MQ_BATCH_SIZE = "Batch size"; @@ -297,6 +301,10 @@ public class MQSourceConnector extends SourceConnector { CONFIG_DOCUMENTATION_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER, CONFIG_GROUP_MQ, 21, Width.MEDIUM, CONFIG_DISPLAY_MQ_JMS_PROPERTY_COPY_TO_KAFKA_HEADER); + config.define(CONFIG_NAME_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, Type.BOOLEAN, null, Importance.LOW, + CONFIG_DOCUMENTATION_MQ_SSL_USE_IBM_CIPHER_MAPPINGS, CONFIG_GROUP_MQ, 22, Width.SHORT, + CONFIG_DISPLAY_MQ_SSL_USE_IBM_CIPHER_MAPPINGS); + config.define(CONFIG_NAME_TOPIC, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, Importance.HIGH, CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM, CONFIG_DISPLAY_TOPIC);