Skip to content

Commit

Permalink
🎉 Source DB2: added ssl support (#7355)
Browse files Browse the repository at this point in the history
* added ssl conection to source-db2 and added new acceptance tests

* updated ssl source-db2 version

* updated ssl source-db2 version

* updated ssl source-db2 version

* updated ssl source-db2 version

* updated spec

* fixed code style

* marked encryption as required and add NPE checker

* fixed remarks

* fixed remarks

* bump new version
  • Loading branch information
andriikorotkov committed Nov 9, 2021
1 parent 25c89a5 commit fdda80c
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "447e0381-3780-4b46-bb62-00a4e3c8b8e2",
"name": "IBM Db2",
"dockerRepository": "airbyte/source-db2",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/db2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@
- name: IBM Db2
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
dockerRepository: airbyte/source-db2
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
sourceType: database
- name: Instagram
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-db2
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,6 +25,9 @@ public class Db2Source extends AbstractJdbcSource implements Source {
private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";

private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";

public Db2Source() {
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration());
}
Expand All @@ -32,14 +41,31 @@ public static void main(final String[] args) throws Exception {

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", String.format("jdbc:db2://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("db").asText()))
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:db2://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("db").asText()));

var result = Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", jdbcUrl.toString())
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.build());

// assume ssl if not explicitly mentioned.
var additionalParams = obtainConnectionOptions(config.get("encryption"));
if (!additionalParams.isEmpty()) {
jdbcUrl.append(":").append(String.join(";", additionalParams));
jdbcUrl.append(";");
result = Jsons.jsonNode(ImmutableMap.builder()
.put("jdbc_url", jdbcUrl.toString())
.put("username", config.get("username").asText())
.put("password", config.get("password").asText())
.put("connection_properties", additionalParams)
.build());
}

return result;
}

@Override
Expand All @@ -49,4 +75,53 @@ public Set<String> getExcludedInternalNameSpaces() {
"SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS");
}

/* Helpers */

private List<String> obtainConnectionOptions(JsonNode encryption) {
List<String> additionalParameters = new ArrayList<>();
if (!encryption.isNull()) {
String encryptionMethod = encryption.get("encryption_method").asText();
if ("encrypted_verify_certificate".equals(encryptionMethod)) {
var keyStorePassword = getKeyStorePassword(encryption.get("key_store_password"));
try {
convertAndImportCertificate(encryption.get("ssl_certificate").asText(), keyStorePassword);
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}
additionalParameters.add("sslConnection=true");
additionalParameters.add("sslTrustStoreLocation=" + KEY_STORE_FILE_PATH);
additionalParameters.add("sslTrustStorePassword=" + keyStorePassword);
}
}
return additionalParameters;
}

private static String getKeyStorePassword(JsonNode encryptionKeyStorePassword) {
var keyStorePassword = KEY_STORE_PASS;
if (!encryptionKeyStorePassword.isNull() || !encryptionKeyStorePassword.isEmpty()) {
keyStorePassword = encryptionKeyStorePassword.asText();
}
return keyStorePassword;
}

private static void convertAndImportCertificate(String certificate, String keyStorePassword)
throws IOException, InterruptedException {
Runtime run = Runtime.getRuntime();
try (PrintWriter out = new PrintWriter("certificate.pem")) {
out.print(certificate);
}
runProcess("openssl x509 -outform der -in certificate.pem -out certificate.der", run);
runProcess(
"keytool -import -alias rds-root -keystore " + KEY_STORE_FILE_PATH + " -file certificate.der -storepass " + keyStorePassword + " -noprompt",
run);
}

private static void runProcess(String cmd, Runtime run) throws IOException, InterruptedException {
Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,88 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "IBM Db2 Source Spec",
"type": "object",
"required": ["host", "port", "db", "username", "password"],
"required": ["host", "port", "db", "username", "password", "encryption"],
"additionalProperties": false,
"properties": {
"host": {
"description": "Host of the Db2.",
"type": "string"
"type": "string",
"order": 0
},
"port": {
"description": "Port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 8123,
"examples": ["8123"]
"examples": ["8123"],
"order": 1
},
"db": {
"description": "Name of the database.",
"type": "string",
"examples": ["default"]
"examples": ["default"],
"order": 2
},
"username": {
"description": "Username to use to access the database.",
"type": "string"
"type": "string",
"order": 3
},
"password": {
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true
"airbyte_secret": true,
"order": 4
},
"encryption": {
"title": "Encryption",
"type": "object",
"description": "Encryption method to use when communicating with the database",
"order": 5,
"oneOf": [
{
"title": "Unencrypted",
"additionalProperties": false,
"description": "Data transfer will not be encrypted.",
"required": ["encryption_method"],
"properties": {
"encryption_method": {
"type": "string",
"const": "unencrypted",
"enum": ["unencrypted"],
"default": "unencrypted"
}
}
},
{
"title": "TLS Encrypted (verify certificate)",
"additionalProperties": false,
"description": "Verify and use the cert provided by the server.",
"required": ["encryption_method", "ssl_certificate"],
"properties": {
"encryption_method": {
"type": "string",
"const": "encrypted_verify_certificate",
"enum": ["encrypted_verify_certificate"],
"default": "encrypted_verify_certificate"
},
"ssl_certificate": {
"title": "SSL PEM file",
"description": "Privacy Enhanced Mail (PEM) files are concatenated certificate containers frequently used in certificate installations",
"type": "string",
"airbyte_secret": true,
"multiline": true
},
"key_store_password": {
"title": "Key Store Password. This field is optional. If you do not fill in this field, the password will be randomly generated.",
"description": "Key Store Password",
"type": "string",
"airbyte_secret": true
}
}
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "unencrypted")
.build()))
.build());

database = Databases.createJdbcDatabase(
Expand Down

0 comments on commit fdda80c

Please sign in to comment.