Skip to content

Commit

Permalink
🎉 New Source DB2 secure-only (#7792)
Browse files Browse the repository at this point in the history
* added ssl conection to source-db2 and added new acceptance tests

* updated ssl source-db2 version

* updated ssl source-db2 version

* updated ssl source-db2 version

* updated ssl source-db2 version

* updated spec

* fixed code style

* marked encryption as required and add NPE checker

* created source-db2-strict-encrypt

* fixed minor remarks

* fixed code style

* fixed code style

* fixed code style

* updated acceptance tests

* added java doc for certificate generation
  • Loading branch information
andriikorotkov committed Nov 11, 2021
1 parent 760f429 commit def1667
Show file tree
Hide file tree
Showing 10 changed files with 611 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-db2-strict-encrypt

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-db2-strict-encrypt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-db2-strict-encrypt:dev
tests:
spec:
- spec_path: "src/test/resources/expected_spec.json"
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.source.db2_strict_encrypt.Db2StrictEncryptSource'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:connectors:source-db2')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')
implementation project(':airbyte-protocol:models')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation group: 'com.ibm.db2', name: 'jcc', version: '11.5.5.0'

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation project(':airbyte-test-utils')
testImplementation "org.testcontainers:db2:1.15.3"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-db2')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation 'org.apache.commons:commons-lang3:3.11'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2_strict_encrypt;

import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Db2JdbcStreamingQueryConfiguration implements
JdbcStreamingQueryConfiguration {

@Override
public void accept(final Connection connection, final PreparedStatement preparedStatement)
throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(1000);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2_strict_encrypt;

import com.fasterxml.jackson.databind.node.ArrayNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.spec_modification.SpecModifyingSource;
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Db2StrictEncryptSource extends SpecModifyingSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2StrictEncryptSource.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";

public Db2StrictEncryptSource() {
super(new Db2Source());
}

@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
// We need to remove the first item from one Of, which is responsible for connecting to the source
// without encrypted.
((ArrayNode) spec.getConnectionSpecification().get("properties").get("encryption").get("oneOf")).remove(0);
return spec;
}

public static void main(final String[] args) throws Exception {
final Source source = new Db2StrictEncryptSource();
LOGGER.info("starting source: {}", Db2StrictEncryptSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", Db2StrictEncryptSource.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.testcontainers.containers.Db2Container;

public class Db2StrictEncryptSourceCertificateAcceptanceTest extends SourceAcceptanceTest {

private static final String SCHEMA_NAME = "SOURCE_INTEGRATION_TEST";
private static final String STREAM_NAME1 = "ID_AND_NAME1";
private static final String STREAM_NAME2 = "ID_AND_NAME2";

private static final String TEST_KEY_STORE_PASS = "Passw0rd";
private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";
private static final String SSL_CONFIG = ":sslConnection=true;sslTrustStoreLocation=" + KEY_STORE_FILE_PATH +
";sslTrustStorePassword=" + TEST_KEY_STORE_PASS + ";";

private Db2Container db;
private JsonNode config;
private JdbcDatabase database;

@Override
protected String getImageName() {
return "airbyte/source-db2-strict-encrypt:dev";
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class);
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("ID"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME1),
Field.of("ID", JsonSchemaPrimitive.NUMBER),
Field.of("NAME", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME2),
Field.of("ID", JsonSchemaPrimitive.NUMBER),
Field.of("NAME", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}

@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected List<String> getRegexTests() {
return Collections.emptyList();
}

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
db = new Db2Container("ibmcom/db2:11.5.5.0").withCommand().acceptLicense()
.withExposedPorts(50000);
db.start();

var certificate = getCertificate();
try {
convertAndImportCertificate(certificate);
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to import certificate into Java Keystore");
}

config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("port", db.getMappedPort(50000))
.put("db", db.getDatabaseName())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("encryption", Jsons.jsonNode(ImmutableMap.builder()
.put("encryption_method", "encrypted_verify_certificate")
.put("ssl_certificate", certificate)
.put("key_store_password", TEST_KEY_STORE_PASS)
.build()))
.build());

String jdbcUrl = String.format("jdbc:db2://%s:%s/%s",
config.get("host").asText(),
db.getMappedPort(50000),
config.get("db").asText()) + SSL_CONFIG;

database = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
jdbcUrl,
Db2Source.DRIVER_CLASS);

final String createSchemaQuery = String.format("CREATE SCHEMA %s", SCHEMA_NAME);
final String createTableQuery1 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME1);
final String createTableQuery2 = String
.format("CREATE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, STREAM_NAME2);
final String insertIntoTableQuery1 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME1);
final String insertIntoTableQuery2 = String
.format("INSERT INTO %s.%s (ID, NAME) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash')",
SCHEMA_NAME, STREAM_NAME2);

database.execute(createSchemaQuery);
database.execute(createTableQuery1);
database.execute(createTableQuery2);
database.execute(insertIntoTableQuery1);
database.execute(insertIntoTableQuery2);

database.close();
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
new File("certificate.pem").delete();
new File("certificate.der").delete();
new File(KEY_STORE_FILE_PATH).delete();
db.close();
}

/* Helpers */

private String getCertificate() throws IOException, InterruptedException {
// To enable SSL connection on the server, we need to generate self-signed certificates for the server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will take effect after restart.
// The certificate for generating a user certificate has the extension *.arm.
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -keydb -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS + "\" -stash");
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
+ "\" -label \"mylabel\" -dn \"CN=testcompany\" -size 2048 -sigalg SHA256_WITH_RSA");
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -extract -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
+ "\" -label \"mylabel\" -target \"server.arm\" -format ascii -fips");

db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVR_KEYDB /database/config/db2inst1/server.kdb");
db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVR_STASH /database/config/db2inst1/server.sth");
db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVR_LABEL mylabel");
db.execInContainer("su", "-", "db2inst1", "-c", "db2 update dbm cfg using SSL_SVCENAME 50000");
db.execInContainer("su", "-", "db2inst1", "-c", "db2set -i db2inst1 DB2COMM=SSL");
db.execInContainer("su", "-", "db2inst1", "-c", "db2stop force");
db.execInContainer("su", "-", "db2inst1", "-c", "db2start");
return db.execInContainer("su", "-", "db2inst1", "-c", "cat server.arm").getStdout();
}

private static void convertAndImportCertificate(String certificate) throws IOException, InterruptedException {
Runtime run = Runtime.getRuntime();
try (PrintWriter out = new PrintWriter("certificate.pem")) {
out.print(certificate);
}
runProcess("openssl x509 -outform der -in certificate.pem -out certificate.der", run);
runProcess(
"keytool -import -alias rds-root -keystore " + KEY_STORE_FILE_PATH + " -file certificate.der -storepass " + TEST_KEY_STORE_PASS
+ " -noprompt",
run);
}

private static void runProcess(String cmd, Runtime run) throws IOException, InterruptedException {
Process pr = run.exec(cmd);
if (!pr.waitFor(30, TimeUnit.SECONDS)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + cmd);
}
}

}

0 comments on commit def1667

Please sign in to comment.