diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 909240a918920..adea5c0496d6b 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -286,41 +286,37 @@ jobs: - name: Run End-to-End Frontend Tests run: ./tools/bin/e2e_test.sh + test_kube: + runs-on: ubuntu-latest + name: Run Acceptance Tests (Kube) + steps: + - name: Checkout Airbyte + uses: actions/checkout@v2 -# DISABLED UNTIL WE HAVE TEMPORAL ON KUBE -# test_kube: -# runs-on: ubuntu-latest -# steps: -# - name: Checkout Airbyte -# uses: actions/checkout@v2 -# -# - uses: actions/setup-java@v1 -# with: -# java-version: '14' -# -# - uses: actions/setup-node@v1 -# with: -# node-version: '14.7' -# -# - uses: actions/setup-python@v2 -# with: -# python-version: '3.7' -# -# - name: Setup Minikube -# uses: manusa/actions-setup-minikube@v2.3.0 -# with: -# minikube version: 'v1.16.0' -# kubernetes version: 'v1.19.2' -# -# - name: Install socat -# run: sudo apt-get install socat -# -# - name: Build Core Docker Images and Run Tests -# run: CORE_ONLY=true ./gradlew --no-daemon composeBuild test -x :airbyte-webapp:test --scan -# env: -# GIT_REVISION: ${{ github.sha }} -# CORE_ONLY: true -# -# - name: Run Kubernetes End-to-End Acceptance Tests -# run: | -# ./tools/bin/acceptance_test_kube.sh + - uses: actions/setup-java@v1 + with: + java-version: '14' + + - uses: actions/setup-node@v1 + with: + node-version: '14.7' + + - uses: actions/setup-python@v2 + with: + python-version: '3.7' + + - name: Setup Minikube + uses: manusa/actions-setup-minikube@v2.4.1 + with: + minikube version: 'v1.21.0-beta.0' + kubernetes version: 'v1.20.7' + + - name: Install socat (required for port forwarding) + run: sudo apt-get install socat + + - name: Build Core Docker Images and Run Tests + run: CORE_ONLY=true ./gradlew --no-daemon build --scan --rerun-tasks + + - name: Run Kubernetes End-to-End Acceptance Tests + run: | + IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh diff --git a/airbyte-commons-docker/build.gradle b/airbyte-commons-docker/build.gradle new file mode 100644 index 0000000000000..4c2ec5099bfc2 --- /dev/null +++ b/airbyte-commons-docker/build.gradle @@ -0,0 +1,11 @@ +plugins { + id "java-library" +} + +dependencies { + implementation 'org.apache.commons:commons-compress:1.20' + implementation 'com.github.docker-java:docker-java:3.2.8' + implementation 'com.github.docker-java:docker-java-transport-httpclient5:3.2.8' + + testImplementation 'org.apache.commons:commons-lang3:3.11' +} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/docker/DockerUtils.java b/airbyte-commons-docker/src/main/java/io/airbyte/commons/docker/DockerUtils.java similarity index 53% rename from airbyte-commons/src/main/java/io/airbyte/commons/docker/DockerUtils.java rename to airbyte-commons-docker/src/main/java/io/airbyte/commons/docker/DockerUtils.java index 3f29b82e3bec2..26f015f2f6919 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/docker/DockerUtils.java +++ b/airbyte-commons-docker/src/main/java/io/airbyte/commons/docker/DockerUtils.java @@ -24,10 +24,36 @@ package io.airbyte.commons.docker; +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.BuildImageResultCallback; +import com.github.dockerjava.core.DefaultDockerClientConfig; +import com.github.dockerjava.core.DockerClientConfig; +import com.github.dockerjava.core.DockerClientImpl; +import com.github.dockerjava.httpclient5.ApacheDockerHttpClient; +import com.github.dockerjava.transport.DockerHttpClient; +import java.io.File; +import java.util.Set; + public class DockerUtils { + private static final DockerClientConfig CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); + private static final DockerHttpClient HTTP_CLIENT = new ApacheDockerHttpClient.Builder() + .dockerHost(CONFIG.getDockerHost()) + .sslConfig(CONFIG.getSSLConfig()) + .maxConnections(100) + .build(); + private static final DockerClient DOCKER_CLIENT = DockerClientImpl.getInstance(CONFIG, HTTP_CLIENT); + public static String getTaggedImageName(String dockerRepository, String tag) { return String.join(":", dockerRepository, tag); } + public static String buildImage(String dockerFilePath, String tag) { + return DOCKER_CLIENT.buildImageCmd() + .withDockerfile(new File(dockerFilePath)) + .withTags(Set.of(tag)) + .exec(new BuildImageResultCallback()) + .awaitImageId(); + } + } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/docker/DockerUtilsTest.java b/airbyte-commons-docker/src/test/java/io/airbyte/commons/docker/DockerUtilsTest.java similarity index 100% rename from airbyte-commons/src/test/java/io/airbyte/commons/docker/DockerUtilsTest.java rename to airbyte-commons-docker/src/test/java/io/airbyte/commons/docker/DockerUtilsTest.java diff --git a/airbyte-commons/build.gradle b/airbyte-commons/build.gradle index b3b1fd9f49947..7ea1d6aa92f9c 100644 --- a/airbyte-commons/build.gradle +++ b/airbyte-commons/build.gradle @@ -3,7 +3,6 @@ plugins { } dependencies { - testImplementation 'org.apache.commons:commons-lang3:3.11' - - implementation group: 'org.apache.commons', name: 'commons-compress', version: '1.20' + implementation 'org.apache.commons:commons-compress:1.20' + implementation 'org.apache.commons:commons-lang3:3.11' } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java b/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java index 18db80d069f89..9c6067b9d351c 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/string/Strings.java @@ -26,6 +26,7 @@ import com.google.common.collect.Streams; import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomStringUtils; public class Strings { @@ -35,4 +36,8 @@ public static String join(Iterable iterable, CharSequence separator) { .collect(Collectors.joining(separator)); } + public static String addRandomSuffix(String base, String separator, int suffixLength) { + return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase(); + } + } diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 56756ca9e9a65..afee5bee1b6af 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -25,6 +25,7 @@ package io.airbyte.config; import java.nio.file.Path; +import java.util.Set; public interface Configs { @@ -62,6 +63,8 @@ public interface Configs { String getTemporalHost(); + Set getTemporalWorkerPorts(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 161050c4a0d8e..be36464db307b 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -26,8 +26,11 @@ import com.google.common.base.Preconditions; import java.nio.file.Path; +import java.util.Arrays; import java.util.Optional; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,7 @@ public class EnvConfigs implements Configs { private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS"; private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB"; private static final String TEMPORAL_HOST = "TEMPORAL_HOST"; + private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS"; private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1; private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60; @@ -166,6 +170,13 @@ public String getTemporalHost() { return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233"); } + @Override + public Set getTemporalWorkerPorts() { + return Arrays.stream(getEnvOrDefault(TEMPORAL_WORKER_PORTS, "").split(",")) + .map(Integer::valueOf) + .collect(Collectors.toSet()); + } + private String getEnvOrDefault(String key, String defaultValue) { return getEnvOrDefault(key, defaultValue, Function.identity()); } diff --git a/airbyte-db/src/test/java/io/airbyte/db/PostgresUtilsTest.java b/airbyte-db/src/test/java/io/airbyte/db/PostgresUtilsTest.java index 6fb77d2156370..5542a61ef96df 100644 --- a/airbyte-db/src/test/java/io/airbyte/db/PostgresUtilsTest.java +++ b/airbyte-db/src/test/java/io/airbyte/db/PostgresUtilsTest.java @@ -31,12 +31,12 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,7 +58,7 @@ static void init() { @BeforeEach void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); final JsonNode config = getConfig(PSQL_DB, dbName); diff --git a/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java b/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java index 04c2e018f845f..b18ec728ac72a 100644 --- a/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java +++ b/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java @@ -31,13 +31,13 @@ import com.google.common.collect.Lists; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Databases; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -65,7 +65,7 @@ static void init() { @BeforeEach void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); config = getConfig(PSQL_DB, dbName); diff --git a/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java b/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java index 046c75e1632d3..59244a0524bef 100644 --- a/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java +++ b/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java @@ -34,6 +34,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.stream.MoreStreams; +import io.airbyte.commons.string.Strings; import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.math.BigDecimal; @@ -48,7 +49,6 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -75,7 +75,7 @@ static void init() { @BeforeEach void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); final JsonNode config = getConfig(PSQL_DB, dbName); diff --git a/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java b/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java index d1a8457d97ebd..03bfcbf86d793 100644 --- a/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java +++ b/airbyte-db/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java @@ -38,6 +38,7 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.Connection; import java.sql.PreparedStatement; @@ -47,7 +48,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.dbcp2.BasicDataSource; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -79,7 +79,7 @@ static void init() { void setup() throws Exception { jdbcStreamingQueryConfiguration = mock(JdbcStreamingQueryConfiguration.class); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); final JsonNode config = getConfig(PSQL_DB, dbName); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index b9b540d9a4c1b..ff73b2fbf8d0d 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -43,6 +43,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.StandardNameTransformer; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; @@ -55,7 +56,6 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,7 +190,8 @@ protected void setup(TestDestinationEnv testEnv) throws Exception { final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); final String datasetLocation = "US"; - final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8); + final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); + config = Jsons.jsonNode(ImmutableMap.builder() .put(CONFIG_PROJECT_ID, projectId) .put(CONFIG_CREDS, credentialsJsonString) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 5d12c31859b6c..b7e67f76eb56c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -43,6 +43,7 @@ import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.JavaBaseConstants; @@ -69,7 +70,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -136,7 +136,7 @@ void setup(TestInfo info) throws IOException { .build() .getService(); - final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8); + final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8); final String datasetLocation = "EU"; MESSAGE_USERS1.getRecord().setNamespace(datasetId); MESSAGE_USERS2.getRecord().setNamespace(datasetId); diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java index 0aa7f45f58a79..0c9d0dd3c822b 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.base.JavaBaseConstants; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.JSONFormat; import org.jooq.JSONFormat.RecordFormat; import org.junit.jupiter.api.AfterAll; @@ -166,7 +166,7 @@ private static Database getDatabase(JsonNode config) { @Override protected void setup(TestDestinationEnv testEnv) throws SQLException { configWithoutDbName = getConfig(db); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); final Database database = getDatabase(configWithoutDbName); database.query(ctx -> { diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java index e68e9088ea9ed..8c9f8ed2a0d84 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.base.JavaBaseConstants; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.JSONFormat; import org.jooq.JSONFormat.RecordFormat; import org.junit.jupiter.api.AfterAll; @@ -175,7 +175,7 @@ private static Database getDatabase(JsonNode config) { @Override protected void setup(TestDestinationEnv testEnv) throws SQLException { configWithoutDbName = getConfig(db); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); final Database database = getDatabase(configWithoutDbName); database.query(ctx -> { diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java index b242ca5519ac6..d15576c67d1a3 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftCopyDestinationAcceptanceTest.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.base.JavaBaseConstants; @@ -37,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.JSONFormat; import org.jooq.JSONFormat.RecordFormat; @@ -133,7 +133,7 @@ private List retrieveRecordsFromTable(String tableName, String schemaN // for each test we create a new schema in the database. run the test in there and then remove it. @Override protected void setup(TestDestinationEnv testEnv) throws Exception { - final String schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5)); + final String schemaName = Strings.addRandomSuffix("integration_test", "_", 5); final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); baseConfig = getStaticConfig(); getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index bbf84e9fa8aeb..c054132dfe418 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -29,6 +29,7 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; @@ -38,7 +39,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest { @@ -130,7 +130,7 @@ private List retrieveRecordsFromTable(String tableName, String schema) // for each test we create a new schema in the database. run the test in there and then remove it. @Override protected void setup(TestDestinationEnv testEnv) throws Exception { - final String schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5)); + final String schemaName = Strings.addRandomSuffix("integration_test", "_", 5); final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); baseConfig = getStaticConfig(); diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java index 5d0bb9b96c3c4..8a29b9013da8f 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java @@ -28,13 +28,13 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.Set; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -62,7 +62,7 @@ static void init() { @BeforeEach public void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java index 22971c90a4400..308c2a5528f1a 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/DefaultJdbcStressTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; @@ -35,7 +36,6 @@ import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.Optional; import java.util.Set; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -65,7 +65,7 @@ static void init() { @BeforeEach public void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10); config = Jsons.jsonNode(ImmutableMap.of("host", "localhost", "port", 5432, "database", "charles", "username", "postgres", "password", "")); diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java index 739833ed53280..d495fe3001d75 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceStressTest.java @@ -35,7 +35,6 @@ import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.Optional; import java.util.Set; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -65,7 +64,7 @@ static void init() { @BeforeEach public void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String schemaName = Strings.addRandomSuffix("db", "_", 10);; config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java index 3ef5f0e40c12c..69926fab1cfda 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/MssqlSourceAcceptanceTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; @@ -42,7 +43,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import org.apache.commons.lang3.RandomStringUtils; import org.testcontainers.containers.MSSQLServerContainer; public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest { @@ -63,7 +63,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws SQLExcept .put("username", db.getUsername()) .put("password", db.getPassword()) .build()); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); final Database database = getDatabase(configWithoutDbName); database.query(ctx -> { diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java index 7c9d1333789e2..e127e198bcc32 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java @@ -28,11 +28,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -67,7 +67,7 @@ public void setup() throws Exception { configWithoutDbName.get("port").asInt()), "com.microsoft.sqlserver.jdbc.SQLServerDriver"); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); database.execute(ctx -> ctx.createStatement().execute(String.format("CREATE DATABASE %s;", dbName))); diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java index bc3024c25644d..aa1d26203d081 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.protocol.models.AirbyteCatalog; @@ -40,7 +41,6 @@ import io.airbyte.protocol.models.SyncMode; import java.sql.SQLException; import java.util.List; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -77,7 +77,7 @@ static void init() { @BeforeEach void setup() throws SQLException { configWithoutDbName = getConfig(db); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); final Database database = getDatabase(configWithoutDbName); database.query(ctx -> { diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlStressTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlStressTest.java index b6f4c3ad3e617..3f1d1d562beb7 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlStressTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlStressTest.java @@ -28,12 +28,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcStressTest; import java.util.Optional; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -70,7 +70,7 @@ public void setup() throws Exception { configWithoutDbName.get("port").asInt()), "com.microsoft.sqlserver.jdbc.SQLServerDriver"); - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); database.execute(ctx -> ctx.createStatement().execute(String.format("CREATE DATABASE %s;", dbName))); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index 1d7520a22dcc5..29d2262605fad 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -34,7 +35,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -68,7 +68,7 @@ public void setup() throws Exception { config = Jsons.jsonNode(ImmutableMap.builder() .put("host", container.getHost()) .put("port", container.getFirstMappedPort()) - .put("database", "db_" + RandomStringUtils.randomAlphabetic(10)) + .put("database", Strings.addRandomSuffix("db", "_", 10)) .put("username", TEST_USER) .put("password", TEST_PASSWORD) .build()); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java index db2ce0e0a3f18..57a6668901103 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTests.java @@ -29,12 +29,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.sql.Connection; import java.sql.DriverManager; import java.util.Properties; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MySQLContainer; @@ -60,7 +60,7 @@ public void testSettingTimezones() throws Exception { Properties properties = new Properties(); properties.putAll(ImmutableMap.of("user", "root", "password", TEST_PASSWORD, "serverTimezone", "Europe/Moscow")); DriverManager.getConnection(container.getJdbcUrl(), properties); - String dbName = "db_" + RandomStringUtils.randomAlphabetic(10); + final String dbName = Strings.addRandomSuffix("db", "_", 10); config = getConfig(container, dbName, "serverTimezone=Europe/Moscow"); try (Connection connection = DriverManager.getConnection(container.getJdbcUrl(), properties)) { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlStressTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlStressTest.java index 187c03471cc85..fc0fb9a2c3514 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlStressTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlStressTest.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; @@ -35,7 +36,6 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Optional; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -71,7 +71,7 @@ public void setup() throws Exception { config = Jsons.jsonNode(ImmutableMap.builder() .put("host", container.getHost()) .put("port", container.getFirstMappedPort()) - .put("database", "db_" + RandomStringUtils.randomAlphabetic(10)) + .put("database", Strings.addRandomSuffix("db", "_", 10)) .put("username", TEST_USER) .put("password", TEST_PASSWORD) .build()); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index 4031d7366e4d0..eb82777543873 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -41,6 +41,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.Database; @@ -72,7 +73,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; @@ -159,7 +159,7 @@ static void tearDown() { void setup() throws Exception { source = new PostgresSource(); - dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); final String initScriptName = "init_" + dbName.concat(".sql"); final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index aca5ed3128fc8..e38104b73c3a7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -28,10 +28,10 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.test.utils.PostgreSQLContainerHelper; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -52,7 +52,7 @@ static void init() { @BeforeEach public void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java index e8e8799795dc0..f6052a9f17920 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceSSLTest.java @@ -35,6 +35,7 @@ import com.google.common.collect.Sets; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.Database; import io.airbyte.db.Databases; @@ -56,7 +57,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -113,7 +113,7 @@ static void init() { @BeforeEach void setup() throws Exception { - dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); final String initScriptName = "init_" + dbName.concat(".sql"); final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 010efdf9bf659..5326c4fab0599 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -35,6 +35,7 @@ import com.google.common.collect.Sets; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.Database; import io.airbyte.db.Databases; @@ -56,7 +57,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomStringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -115,7 +115,7 @@ static void init() { @BeforeEach void setup() throws Exception { - dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); final String initScriptName = "init_" + dbName.concat(".sql"); final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java index 188aa4cf7c3f5..d2e43980f8e08 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresStressTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; @@ -36,7 +37,6 @@ import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.Optional; import java.util.Set; -import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -66,7 +66,7 @@ static void init() { @BeforeEach public void setup() throws Exception { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); config = Jsons.jsonNode(ImmutableMap.builder() .put("host", PSQL_DB.getHost()) diff --git a/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftSourceAcceptanceTest.java index 758bb978474a6..1b70cbd471d01 100644 --- a/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftSourceAcceptanceTest.java @@ -28,6 +28,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; @@ -44,7 +45,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import org.apache.commons.lang3.RandomStringUtils; public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest { @@ -71,7 +71,7 @@ protected void setupEnvironment(TestDestinationEnv environment) throws Exception config.get("database").asText()), RedshiftSource.DRIVER_CLASS); - schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5)).toLowerCase(); + schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase(); final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); database.execute(connection -> { connection.createStatement().execute(createSchemaQuery); diff --git a/airbyte-scheduler/app/build.gradle b/airbyte-scheduler/app/build.gradle index 43b2deaed2bca..0317ba9423a62 100644 --- a/airbyte-scheduler/app/build.gradle +++ b/airbyte-scheduler/app/build.gradle @@ -3,6 +3,10 @@ plugins { } dependencies { + implementation 'io.fabric8:kubernetes-client:5.3.1' + implementation 'io.kubernetes:client-java-api:10.0.0' + implementation 'io.kubernetes:client-java:10.0.0' + implementation 'io.kubernetes:client-java-extended:10.0.0' implementation 'io.temporal:temporal-sdk:1.0.4' implementation project(':airbyte-analytics') diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 2565b0e8323d3..c855603570940 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -49,6 +49,8 @@ import io.airbyte.workers.temporal.TemporalClient; import io.airbyte.workers.temporal.TemporalPool; import io.airbyte.workers.temporal.TemporalUtils; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.nio.file.Path; @@ -56,8 +58,10 @@ import java.time.Instant; import java.util.Map; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -161,7 +165,9 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi private static ProcessFactory getProcessBuilderFactory(Configs configs) { if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { - return new KubeProcessFactory(configs.getWorkspaceRoot()); + final KubernetesClient kubeClient = new DefaultKubernetesClient(); + final BlockingQueue ports = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts()); + return new KubeProcessFactory("default", kubeClient, ports); } else { return new DockerProcessFactory( configs.getWorkspaceRoot(), diff --git a/airbyte-scheduler/persistence/build.gradle b/airbyte-scheduler/persistence/build.gradle index 6d8bed7cf08da..4025f0552f980 100644 --- a/airbyte-scheduler/persistence/build.gradle +++ b/airbyte-scheduler/persistence/build.gradle @@ -4,6 +4,7 @@ plugins { dependencies { implementation project(':airbyte-analytics') + implementation project(':airbyte-commons-docker') implementation project(':airbyte-config:models') implementation project(':airbyte-config:persistence') implementation project(':airbyte-db') diff --git a/airbyte-server/build.gradle b/airbyte-server/build.gradle index dfb935359b4d4..45cc73bfb3b1e 100644 --- a/airbyte-server/build.gradle +++ b/airbyte-server/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation project(':airbyte-analytics') implementation project(':airbyte-api') + implementation project(':airbyte-commons-docker') implementation project(':airbyte-config:models') implementation project(':airbyte-config:persistence') implementation project(':airbyte-config:init') diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/PostgreSQLContainerHelper.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/PostgreSQLContainerHelper.java index 2d66b07f0b550..e717f99ef01a1 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/PostgreSQLContainerHelper.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/PostgreSQLContainerHelper.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; @@ -35,7 +36,6 @@ import java.util.UUID; import org.jooq.SQLDialect; import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils; import org.testcontainers.utility.MountableFile; public class PostgreSQLContainerHelper { @@ -53,7 +53,7 @@ public static void runSqlScript(MountableFile file, PostgreSQLContainer db) { } public static JsonNode createDatabaseWithRandomNameAndGetPostgresConfig(PostgreSQLContainer psqlDb) { - final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); return createDatabaseAndGetPostgresConfig(psqlDb, dbName); } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 644f1b4eaf456..3a3fe33cb877a 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -26,7 +26,6 @@ import static io.airbyte.api.client.model.ConnectionSchedule.TimeUnitEnum.MINUTES; import static java.lang.Thread.sleep; -import static java.time.temporal.ChronoUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -86,6 +85,7 @@ import io.airbyte.db.Databases; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.io.IOException; +import java.net.Inet4Address; import java.net.URI; import java.nio.charset.Charset; import java.sql.SQLException; @@ -102,7 +102,6 @@ import org.jooq.JSONB; import org.jooq.Record; import org.jooq.Result; -import org.junit.Assume; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -125,8 +124,8 @@ public class AcceptanceTests { private static final Logger LOGGER = LoggerFactory.getLogger(AcceptanceTests.class); - // Skip networking related failures on kube using this flag private static final boolean IS_KUBE = System.getenv().containsKey("KUBE"); + private static final boolean IS_MINIKUBE = System.getenv().containsKey("IS_MINIKUBE"); private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_"; private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}"; @@ -438,9 +437,6 @@ public void testIncrementalSync() throws Exception { @Test @Order(9) public void testScheduledSync() throws Exception { - // skip with Kube. HTTP client error with port forwarding - Assume.assumeFalse(IS_KUBE); - final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); final UUID destinationId = createDestination().getDestinationId(); @@ -455,7 +451,10 @@ public void testScheduledSync() throws Exception { // When a new connection is created, Airbyte might sync it immediately (before the sync interval). // Then it will wait the sync interval. - sleep(Duration.of(30, SECONDS).toMillis()); + // todo: wait for two attempts in the UI + // if the wait isn't long enough, failures say "Connection refused" because the assert kills the + // syncs in progress + sleep(Duration.ofMinutes(2).toMillis()); assertSourceAndTargetDbInSync(sourcePsql, false); } @@ -826,11 +825,18 @@ private JsonNode getDbConfig(PostgreSQLContainer psql, boolean hiddenPassword, b try { final Map dbConfig = new HashMap<>(); - // todo (cgardens) - hack to get building passing in CI. need to follow up on why this was necessary - // (and affect on the k8s version of these tests). - dbConfig.put("host", "localhost"); - // necessary for minikube tests on Github Actions instead of psql.getHost() - // dbConfig.put("host", Inet4Address.getLocalHost().getHostAddress()); + // don't use psql.getHost() directly since the ip we need differs depending on environment + if (IS_KUBE) { + if (IS_MINIKUBE) { + // used with minikube driver=none instance + dbConfig.put("host", Inet4Address.getLocalHost().getHostAddress()); + } else { + // used on a single node with docker driver + dbConfig.put("host", "host.docker.internal"); + } + } else { + dbConfig.put("host", "localhost"); + } if (hiddenPassword) { dbConfig.put("password", "**********"); @@ -919,7 +925,7 @@ private static void waitForSuccessfulJob(JobsApi jobsApi, JobRead originalJob) t private static JobRead waitForJob(JobsApi jobsApi, JobRead originalJob, Set jobStatuses) throws InterruptedException, ApiException { JobRead job = originalJob; int count = 0; - while (count < 60 && jobStatuses.contains(job.getStatus())) { + while (count < 200 && jobStatuses.contains(job.getStatus())) { sleep(1000); count++; diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 255ccc8a46b71..6140d0a9d63b2 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -10,13 +10,13 @@ configurations { } dependencies { - - implementation 'org.apache.commons:commons-lang3:3.11' + implementation 'io.fabric8:kubernetes-client:5.3.1' implementation 'io.kubernetes:client-java-api:10.0.0' implementation 'io.kubernetes:client-java:10.0.0' implementation 'io.kubernetes:client-java-extended:10.0.0' implementation 'io.temporal:temporal-sdk:1.0.4' implementation 'org.apache.ant:ant:1.10.10' + implementation 'org.apache.commons:commons-lang3:3.11' implementation project(':airbyte-config:models') implementation project(':airbyte-db') @@ -24,9 +24,11 @@ dependencies { implementation project(':airbyte-protocol:models') testImplementation 'org.mockito:mockito-inline:2.13.0' - testImplementation 'org.testcontainers:testcontainers:1.14.3' + testImplementation 'org.testcontainers:testcontainers:1.15.3' testImplementation 'org.testcontainers:postgresql:1.15.1' testImplementation 'org.postgresql:postgresql:42.2.18' + + testImplementation project(':airbyte-commons-docker') } jsonSchema2Pojo { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java index 02bf21ea8f17f..59670c1cc9178 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java @@ -25,7 +25,7 @@ package io.airbyte.workers; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.io.IOs; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.OperatorDbt; @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.tools.ant.types.Commandline; import org.slf4j.Logger; @@ -76,8 +77,8 @@ public boolean run(String jobId, int attempt, Path jobRoot, JsonNode config, Ope } public boolean transform(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception { - IOs.writeFile(jobRoot, DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh")); try { + final Map files = ImmutableMap.of(DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh")); final List dbtArguments = new ArrayList<>(); dbtArguments.add("/data/job/transform/" + DBT_ENTRYPOINT_SH); Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments())); @@ -87,7 +88,7 @@ public boolean transform(String jobId, int attempt, Path jobRoot, JsonNode confi if (!dbtConfig.getDbtArguments().contains("--project-dir=")) { dbtArguments.add("--project-dir=/data/job/transform/git_repo/"); } - process = processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), "/bin/bash", dbtArguments); + process = processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), false, files, "/bin/bash", dbtArguments); LineGobbler.gobble(process.getInputStream(), LOGGER::info); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java index 977a9453105a0..680327140a20a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java @@ -24,7 +24,6 @@ package io.airbyte.workers; -import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; @@ -66,11 +65,11 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche @Override public StandardCheckConnectionOutput run(StandardCheckConnectionInput input, Path jobRoot) throws WorkerException { - final JsonNode configDotJson = input.getConnectionConfiguration(); - IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(configDotJson)); - try { - process = integrationLauncher.check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME); + process = integrationLauncher.check( + jobRoot, + WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, + Jsons.serialize(input.getConnectionConfiguration())); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -98,7 +97,7 @@ public StandardCheckConnectionOutput run(StandardCheckConnectionInput input, Pat } } catch (Exception e) { - throw new WorkerException("Error while getting checking connection."); + throw new WorkerException("Error while getting checking connection.", e); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java index 0699425127e4c..ca78819537ec5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java @@ -24,7 +24,6 @@ package io.airbyte.workers; -import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -62,12 +61,11 @@ public DefaultDiscoverCatalogWorker(final IntegrationLauncher integrationLaunche @Override public AirbyteCatalog run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException { - - final JsonNode configDotJson = discoverSchemaInput.getConnectionConfiguration(); - IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(configDotJson)); - try { - process = integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME); + process = integrationLauncher.discover( + jobRoot, + WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, + Jsons.serialize(discoverSchemaInput.getConnectionConfiguration())); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java index 725a250b6efcf..a151841bc6518 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -113,7 +113,7 @@ static void gentleCloseWithHeartbeat(final Process process, try { process.waitFor(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - LOGGER.error("Exception during grace period for process to finish", e); + LOGGER.error("Exception during grace period for process to finish. This can happen when cancelling jobs."); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 4c87a3904af1b..555de7fb96271 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -26,7 +26,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.commons.io.IOs; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; import io.airbyte.config.OperatorDbt; @@ -36,6 +36,7 @@ import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.process.ProcessFactory; import java.nio.file.Path; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +66,10 @@ public DefaultNormalizationRunner(final DestinationType destinationType, final P @Override public boolean configureDbt(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception { - IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config)); - return runProcess(jobId, attempt, jobRoot, "configure-dbt", + final Map files = ImmutableMap.of( + WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config)); + + return runProcess(jobId, attempt, jobRoot, files, "configure-dbt", "--integration-type", destinationType.toString().toLowerCase(), "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--git-repo", dbtConfig.getGitRepoUrl(), @@ -75,18 +78,19 @@ public boolean configureDbt(String jobId, int attempt, Path jobRoot, JsonNode co @Override public boolean normalize(String jobId, int attempt, Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception { - IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config)); - IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); + final Map files = ImmutableMap.of( + WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config), + WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); - return runProcess(jobId, attempt, jobRoot, "run", + return runProcess(jobId, attempt, jobRoot, files, "run", "--integration-type", destinationType.toString().toLowerCase(), "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME); } - private boolean runProcess(String jobId, int attempt, Path jobRoot, final String... args) throws Exception { + private boolean runProcess(String jobId, int attempt, Path jobRoot, Map files, final String... args) throws Exception { try { - process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, null, args); + process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, false, files, null, args); LineGobbler.gobble(process.getInputStream(), LOGGER::info); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 2ee209eec8778..1f6e12d5fcb99 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -24,10 +24,15 @@ package io.airbyte.workers.process; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.workers.WorkerException; import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,29 +63,35 @@ public Process spec(final Path jobRoot) throws WorkerException { attempt, jobRoot, imageName, + false, + Collections.emptyMap(), null, "spec"); } @Override - public Process check(final Path jobRoot, final String configFilename) throws WorkerException { + public Process check(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException { return processFactory.create( jobId, attempt, jobRoot, imageName, + false, + ImmutableMap.of(configFilename, configContents), null, "check", "--config", configFilename); } @Override - public Process discover(final Path jobRoot, final String configFilename) throws WorkerException { + public Process discover(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException { return processFactory.create( jobId, attempt, jobRoot, imageName, + false, + ImmutableMap.of(configFilename, configContents), null, "discover", "--config", configFilename); @@ -89,17 +100,27 @@ public Process discover(final Path jobRoot, final String configFilename) throws @Override public Process read(final Path jobRoot, final String configFilename, + final String configContents, final String catalogFilename, - final String stateFilename) + final String catalogContents, + final String stateFilename, + final String stateContents) throws WorkerException { final List arguments = Lists.newArrayList( "read", "--config", configFilename, "--catalog", catalogFilename); + final Map files = new HashMap<>(); + files.put(configFilename, configContents); + files.put(catalogFilename, catalogContents); + if (stateFilename != null) { arguments.add("--state"); arguments.add(stateFilename); + + Preconditions.checkNotNull(stateContents); + files.put(stateFilename, stateContents); } return processFactory.create( @@ -107,17 +128,30 @@ public Process read(final Path jobRoot, attempt, jobRoot, imageName, + false, + files, null, arguments); } @Override - public Process write(Path jobRoot, String configFilename, String catalogFilename) throws WorkerException { + public Process write(final Path jobRoot, + final String configFilename, + final String configContents, + final String catalogFilename, + final String catalogContents) + throws WorkerException { + final Map files = ImmutableMap.of( + configFilename, configContents, + catalogFilename, catalogContents); + return processFactory.create( jobId, attempt, jobRoot, imageName, + true, + files, null, "write", "--config", configFilename, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java index 9b4bc8ef7a74b..4dc766504741f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/DockerProcessFactory.java @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,38 +80,52 @@ private static Path prepareImageExistsScript() { } @Override - public Process create(String jobId, int attempt, final Path jobRoot, final String imageName, final String entrypoint, final String... args) + public Process create(String jobId, + int attempt, + final Path jobRoot, + final String imageName, + final boolean usesStdin, + final Map files, + final String entrypoint, + final String... args) throws WorkerException { + try { + if (!checkImageExists(imageName)) { + throw new WorkerException("Could not find image: " + imageName); + } - if (!checkImageExists(imageName)) { - throw new WorkerException("Could not find image: " + imageName); - } + if (!jobRoot.toFile().exists()) { + Files.createDirectory(jobRoot); + } - final List cmd = - Lists.newArrayList( - "docker", - "run", - "--rm", - "--init", - "-i", - "-v", - String.format("%s:%s", workspaceMountSource, DATA_MOUNT_DESTINATION), - "-v", - String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION), - "-w", - rebasePath(jobRoot).toString(), - "--network", - networkName); - if (!Strings.isNullOrEmpty(entrypoint)) { - cmd.add("--entrypoint"); - cmd.add(entrypoint); - } - cmd.add(imageName); - cmd.addAll(Arrays.asList(args)); + for (Map.Entry file : files.entrySet()) { + IOs.writeFile(jobRoot, file.getKey(), file.getValue()); + } + + final List cmd = + Lists.newArrayList( + "docker", + "run", + "--rm", + "--init", + "-i", + "-v", + String.format("%s:%s", workspaceMountSource, DATA_MOUNT_DESTINATION), + "-v", + String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION), + "-w", + rebasePath(jobRoot).toString(), + "--network", + networkName); + if (!Strings.isNullOrEmpty(entrypoint)) { + cmd.add("--entrypoint"); + cmd.add(entrypoint); + } + cmd.add(imageName); + cmd.addAll(Arrays.asList(args)); - LOGGER.info("Preparing command: {}", Joiner.on(" ").join(cmd)); + LOGGER.info("Preparing command: {}", Joiner.on(" ").join(cmd)); - try { return new ProcessBuilder(cmd).start(); } catch (IOException e) { throw new WorkerException(e.getMessage(), e); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/IntegrationLauncher.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/IntegrationLauncher.java index cda95cdec595f..96f4ac403c098 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/IntegrationLauncher.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/IntegrationLauncher.java @@ -31,33 +31,33 @@ public interface IntegrationLauncher { Process spec(final Path jobRoot) throws WorkerException; - Process check(final Path jobRoot, final String configFilename) throws WorkerException; + Process check(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException; - Process discover(final Path jobRoot, final String configFilename) throws WorkerException; + Process discover(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException; Process read(final Path jobRoot, final String configFilename, + final String configContents, final String catalogFilename, - final String stateFilename) + final String catalogContents, + final String stateFilename, + final String stateContents) throws WorkerException; default Process read(final Path jobRoot, final String configFilename, - final String catalogFilename) + final String configContents, + final String catalogFilename, + final String catalogContents) throws WorkerException { - return read(jobRoot, configFilename, catalogFilename, null); + return read(jobRoot, configFilename, configContents, catalogFilename, catalogContents, null, null); } Process write(final Path jobRoot, final String configFilename, - final String catalogFilename) + final String configContents, + final String catalogFilename, + final String catalogContents) throws WorkerException; - // TODO: this version should be removed once we've moved away from singer protocol - default Process write(final Path jobRoot, - final String configFilename) - throws WorkerException { - return write(jobRoot, configFilename, null); - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java new file mode 100644 index 0000000000000..0a3fa3d9b27e5 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -0,0 +1,507 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers.process; + +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.string.Strings; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.DeletionPropagation; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.apache.commons.io.output.NullOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Process abstraction backed by a Kube Pod running in a Kubernetes cluster 'somewhere'. The + * parent process starting a Kube Pod Process needs to exist within the Kube networking space. This + * is so the parent process can forward data into the child's stdin and read the child's stdout and + * stderr streams and copy configuration files over. + * + * This is made possible by: + *
  • 1) An init container that creates 3 named pipes corresponding to stdin, stdout and std err on + * a shared volume.
  • + *
  • 2) Config files (e.g. config.json, catalog.json etc) are copied from the parent process into + * a shared volume.
  • + *
  • 3) Redirecting the stdin named pipe to the original image's entrypoint and it's output into + * the respective named pipes for stdout and stderr.
  • + *
  • 4) Each named pipe has a corresponding side car. Each side car forwards its stream + * accordingly using socat. e.g. stderr/stdout is forwarded to parent process while input from the + * parent process is forwarded into stdin.
  • + *
  • 5) The parent process listens on the stdout and stederr sockets for an incoming TCP + * connection. It also initiates a TCP connection to the child process aka the Kube pod on the + * specified stdin socket.
  • + *
  • 6) The child process is able to access configuration data via the shared volume. It's inputs + * and outputs - stdin, stdout and stderr - are forwarded the parent process via the sidecars.
  • + * + * + * See the constructor for more information. + */ + +// TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700. +public class KubePodProcess extends Process { + + private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class); + + private static final String INIT_CONTAINER_NAME = "init"; + + private static final String PIPES_DIR = "/pipes"; + private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin"; + private static final String STDOUT_PIPE_FILE = PIPES_DIR + "/stdout"; + private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr"; + private static final String CONFIG_DIR = "/config"; + private static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING"; + + // 143 is the typical SIGTERM exit code. + private static final int KILLED_EXIT_CODE = 143; + private static final int STDIN_REMOTE_PORT = 9001; + + private final KubernetesClient client; + private final Pod podDefinition; + // Necessary since it is not possible to retrieve the pod's actual exit code upon termination. This + // is because the Kube API server does not keep + // terminated pod history like it does for successful pods. + // This variable should be set in functions where the pod is forcefully terminated. See + // getReturnCode() for more info. + private final AtomicBoolean wasKilled = new AtomicBoolean(false); + + private final OutputStream stdin; + private InputStream stdout; + private InputStream stderr; + + private final Consumer portReleaser; + private final ServerSocket stdoutServerSocket; + private final int stdoutLocalPort; + private final ServerSocket stderrServerSocket; + private final int stderrLocalPort; + private final ExecutorService executorService; + + // TODO(Davin): Cache this result. + public static String getCommandFromImage(KubernetesClient client, String imageName, String namespace) throws InterruptedException { + final String podName = Strings.addRandomSuffix("airbyte-command-fetcher", "-", 5); + + Container commandFetcher = new ContainerBuilder() + .withName("airbyte-command-fetcher") + .withImage(imageName) + .withCommand("sh", "-c", "echo \"AIRBYTE_ENTRYPOINT=$AIRBYTE_ENTRYPOINT\"") + .build(); + + Pod pod = new PodBuilder() + .withApiVersion("v1") + .withNewMetadata() + .withName(podName) + .endMetadata() + .withNewSpec() + .withRestartPolicy("Never") + .withContainers(commandFetcher) + .endSpec() + .build(); + LOGGER.info("Creating pod..."); + Pod podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod); + LOGGER.info("Waiting until command fetcher pod completes..."); + // TODO(Davin): If a pod is missing, this will wait for up to 2 minutes before error-ing out. + // Figure out a better way. + client.resource(podDefinition).waitUntilCondition(p -> p.getStatus().getPhase().equals("Succeeded"), 2, TimeUnit.MINUTES); + + var logs = client.pods().inNamespace(namespace).withName(podName).getLog(); + if (!logs.contains("AIRBYTE_ENTRYPOINT")) { + throw new RuntimeException( + "Missing AIRBYTE_ENTRYPOINT from command fetcher logs. This should not happen. Check the echo command has not been changed."); + } + + var envVal = logs.split("=")[1].strip(); + if (envVal.isEmpty()) { + // default to returning default entrypoint in bases + return "/airbyte/base.sh"; + } + + return envVal; + } + + public static String getPodIP(KubernetesClient client, String podName, String namespace) { + var pod = client.pods().inNamespace(namespace).withName(podName).get(); + if (pod == null) { + throw new RuntimeException("Error: unable to find pod!"); + } + return pod.getStatus().getPodIP(); + } + + private static Container getInit(boolean usesStdin, List mainVolumeMounts) { + var initEntrypointStr = String.format("mkfifo %s && mkfifo %s", STDOUT_PIPE_FILE, STDERR_PIPE_FILE); + + if (usesStdin) { + initEntrypointStr = String.format("mkfifo %s && ", STDIN_PIPE_FILE) + initEntrypointStr; + } + + initEntrypointStr = initEntrypointStr + String.format(" && until [ -f %s ]; do sleep 5; done;", SUCCESS_FILE_NAME); + + return new ContainerBuilder() + .withName(INIT_CONTAINER_NAME) + .withImage("busybox:1.28") + .withWorkingDir(CONFIG_DIR) + .withCommand("sh", "-c", initEntrypointStr) + .withVolumeMounts(mainVolumeMounts) + .build(); + } + + private static Container getMain(String image, boolean usesStdin, String entrypoint, List mainVolumeMounts, String[] args) { + var argsStr = String.join(" ", args); + var entrypointStr = entrypoint + " " + argsStr + " "; + + var entrypointStrWithPipes = entrypointStr + String.format(" 2> %s > %s", STDERR_PIPE_FILE, STDOUT_PIPE_FILE); + if (usesStdin) { + entrypointStrWithPipes = String.format("cat %s | ", STDIN_PIPE_FILE) + entrypointStrWithPipes; + } + + return new ContainerBuilder() + .withName("main") + .withImage(image) + .withCommand("sh", "-c", entrypointStrWithPipes) + .withWorkingDir(CONFIG_DIR) + .withVolumeMounts(mainVolumeMounts) + .build(); + } + + private static void copyFilesToKubeConfigVolume(KubernetesClient client, String podName, String namespace, Map files) { + List> fileEntries = new ArrayList<>(files.entrySet()); + + for (Map.Entry file : fileEntries) { + Path tmpFile = null; + try { + tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue())); + + LOGGER.info("Uploading file: " + file.getKey()); + + client.pods().inNamespace(namespace).withName(podName).inContainer(INIT_CONTAINER_NAME) + .file(CONFIG_DIR + "/" + file.getKey()) + .upload(tmpFile); + + } finally { + if (tmpFile != null) { + tmpFile.toFile().delete(); + } + } + } + } + + /** + * The calls in this function aren't straight-forward due to api limitations. There is no proper way + * to directly look for containers within a pod or query if a container is in a running state beside + * checking if the getRunning field is set. We could put this behind an interface, but that seems + * heavy-handed compared to the 10 lines here. + */ + private static void waitForInitPodToRun(KubernetesClient client, Pod podDefinition) throws InterruptedException { + LOGGER.info("Waiting for init container to be ready before copying files..."); + client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()) + .waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES); + LOGGER.info("Init container present.."); + client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()) + .waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().get(0).getState().getRunning() != null, 5, TimeUnit.MINUTES); + LOGGER.info("Init container ready.."); + } + + public KubePodProcess(KubernetesClient client, + Consumer portReleaser, + String podName, + String namespace, + String image, + int stdoutLocalPort, + int stderrLocalPort, + boolean usesStdin, + final Map files, + final String entrypointOverride, + final String... args) + throws IOException, InterruptedException { + this.client = client; + this.portReleaser = portReleaser; + this.stdoutLocalPort = stdoutLocalPort; + this.stderrLocalPort = stderrLocalPort; + + stdoutServerSocket = new ServerSocket(stdoutLocalPort); + stderrServerSocket = new ServerSocket(stderrLocalPort); + executorService = Executors.newFixedThreadPool(2); + setupStdOutAndStdErrListeners(); + + String entrypoint = entrypointOverride == null ? getCommandFromImage(client, image, namespace) : entrypointOverride; + LOGGER.info("Found entrypoint: {}", entrypoint); + + Volume pipeVolume = new VolumeBuilder() + .withName("airbyte-pipes") + .withNewEmptyDir() + .endEmptyDir() + .build(); + + VolumeMount pipeVolumeMount = new VolumeMountBuilder() + .withName("airbyte-pipes") + .withMountPath(PIPES_DIR) + .build(); + + Volume configVolume = new VolumeBuilder() + .withName("airbyte-config") + .withNewEmptyDir() + .endEmptyDir() + .build(); + + VolumeMount configVolumeMount = new VolumeMountBuilder() + .withName("airbyte-config") + .withMountPath(CONFIG_DIR) + .build(); + + var volumes = List.of(pipeVolume, configVolume); + var mainVolumeMounts = List.of(pipeVolumeMount, configVolumeMount); + + Container init = getInit(usesStdin, mainVolumeMounts); + Container main = getMain(image, usesStdin, entrypoint, mainVolumeMounts, args); + + Container remoteStdin = new ContainerBuilder() + .withName("remote-stdin") + .withImage("alpine/socat:1.7.4.1-r1") + .withCommand("sh", "-c", "socat -d -d -d TCP-L:9001 STDOUT > " + STDIN_PIPE_FILE) + .withVolumeMounts(pipeVolumeMount) + .build(); + + var localIp = InetAddress.getLocalHost().getHostAddress(); + Container relayStdout = new ContainerBuilder() + .withName("relay-stdout") + .withImage("alpine/socat:1.7.4.1-r1") + .withCommand("sh", "-c", String.format("cat %s | socat -d -d -d - TCP:%s:%s", STDOUT_PIPE_FILE, localIp, stdoutLocalPort)) + .withVolumeMounts(pipeVolumeMount) + .build(); + Container relayStderr = new ContainerBuilder() + .withName("relay-stderr") + .withImage("alpine/socat:1.7.4.1-r1") + .withCommand("sh", "-c", String.format("cat %s | socat -d -d -d - TCP:%s:%s", STDERR_PIPE_FILE, localIp, stderrLocalPort)) + .withVolumeMounts(pipeVolumeMount) + .build(); + + List containers = usesStdin ? List.of(main, remoteStdin, relayStdout, relayStderr) : List.of(main, relayStdout, relayStderr); + + Pod pod = new PodBuilder() + .withApiVersion("v1") + .withNewMetadata() + .withName(podName) + .endMetadata() + .withNewSpec() + .withRestartPolicy("Never") + .withInitContainers(init) + .withContainers(containers) + .withVolumes(volumes) + .endSpec() + .build(); + + LOGGER.info("Creating pod..."); + this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod); + waitForInitPodToRun(client, podDefinition); + + LOGGER.info("Copying files..."); + Map filesWithSuccess = new HashMap<>(files); + + // We always copy the empty success file to ensure our waiting step can detect the init container in + // RUNNING. Otherwise, the container can complete and exit before we are able to detect it. + filesWithSuccess.put(SUCCESS_FILE_NAME, ""); + copyFilesToKubeConfigVolume(client, podName, namespace, filesWithSuccess); + + LOGGER.info("Waiting until pod is ready..."); + client.resource(podDefinition).waitUntilReady(30, TimeUnit.MINUTES); + + // allow writing stdin to pod + LOGGER.info("Reading pod IP..."); + var podIp = getPodIP(client, podName, namespace); + LOGGER.info("Pod IP: {}", podIp); + + if (usesStdin) { + LOGGER.info("Creating stdin socket..."); + var socketToDestStdIo = new Socket(podIp, STDIN_REMOTE_PORT); + this.stdin = socketToDestStdIo.getOutputStream(); + } else { + LOGGER.info("Using null stdin output stream..."); + this.stdin = NullOutputStream.NULL_OUTPUT_STREAM; + } + } + + private void setupStdOutAndStdErrListeners() { + executorService.submit(() -> { + try { + LOGGER.info("Creating stdout socket server..."); + var socket = stdoutServerSocket.accept(); // blocks until connected + LOGGER.info("Setting stdout..."); + this.stdout = socket.getInputStream(); + } catch (IOException e) { + e.printStackTrace(); // todo: propagate exception / join at the end of constructor + } + }); + executorService.submit(() -> { + try { + LOGGER.info("Creating stderr socket server..."); + var socket = stderrServerSocket.accept(); // blocks until connected + LOGGER.info("Setting stderr..."); + this.stderr = socket.getInputStream(); + } catch (IOException e) { + e.printStackTrace(); // todo: propagate exception / join at the end of constructor + } + }); + } + + @Override + public OutputStream getOutputStream() { + return this.stdin; + } + + @Override + public InputStream getInputStream() { + return this.stdout; + } + + @Override + public InputStream getErrorStream() { + return this.stderr; + } + + /** + * Immediately terminates the Kube Pod backing this process and cleans up IO resources. + */ + @Override + public int waitFor() throws InterruptedException { + try { + Pod refreshedPod = client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get(); + client.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS); + wasKilled.set(true); + return exitValue(); + } finally { + close(); + } + } + + /** + * Intended to gracefully clean up after a completed Kube Pod. This should only be called if the + * process is successful. + */ + @Override + public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException { + try { + return super.waitFor(timeout, unit); + } finally { + close(); + } + } + + /** + * Immediately terminates the Kube Pod backing this process and cleans up IO resources. + */ + @Override + public void destroy() { + LOGGER.info("Destroying Kube process: {}", podDefinition.getMetadata().getName()); + try { + client.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete(); + wasKilled.set(true); + } finally { + close(); + LOGGER.info("Destroyed Kube process: {}", podDefinition.getMetadata().getName()); + } + } + + /** + * Close all open resource in the opposite order of resource creation. + */ + private void close() { + Exceptions.swallow(this.stdin::close); + Exceptions.swallow(this.stdout::close); + Exceptions.swallow(this.stderr::close); + Exceptions.swallow(this.stdoutServerSocket::close); + Exceptions.swallow(this.stderrServerSocket::close); + Exceptions.swallow(this.executorService::shutdownNow); + Exceptions.swallow(() -> portReleaser.accept(stdoutLocalPort)); + Exceptions.swallow(() -> portReleaser.accept(stderrLocalPort)); + } + + private boolean isTerminal(Pod pod) { + if (pod.getStatus() != null) { + return pod.getStatus() + .getContainerStatuses() + .stream() + .anyMatch(e -> e.getState() != null && e.getState().getTerminated() != null); + } else { + return false; + } + } + + private int getReturnCode(Pod pod) { + var name = pod.getMetadata().getName(); + Pod refreshedPod = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get(); + if (refreshedPod == null) { + if (wasKilled.get()) { + LOGGER.info("Unable to find pod {} to retrieve exit value. Defaulting to value {}. This is expected if the job was cancelled.", name, + KILLED_EXIT_CODE); + return KILLED_EXIT_CODE; + } + // If the pod cannot be found and was not killed, it either means 1) the pod was not created + // properly 2) this method is incorrectly called. + throw new RuntimeException("Cannot find pod while trying to retrieve exit code. This probably means the Pod was not correctly created."); + } + if (!isTerminal(refreshedPod)) { + throw new IllegalThreadStateException("Kube pod process has not exited yet."); + } + + return refreshedPod.getStatus().getContainerStatuses() + .stream() + .filter(containerStatus -> containerStatus.getState() != null && containerStatus.getState().getTerminated() != null) + .map(containerStatus -> { + int statusCode = containerStatus.getState().getTerminated().getExitCode(); + LOGGER.info("Exit code for pod {}, container {} is {}", name, containerStatus.getName(), statusCode); + return statusCode; + }) + .reduce(Integer::sum) + .orElseThrow(); + } + + @Override + public int exitValue() { + return getReturnCode(podDefinition); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 9d9bf88646b97..2ff6d351ae975 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -24,17 +24,14 @@ package io.airbyte.workers.process; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.workers.WorkerException; +import io.fabric8.kubernetes.client.KubernetesClient; import java.nio.file.Path; -import java.util.Arrays; -import java.util.List; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.function.Consumer; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,54 +40,61 @@ public class KubeProcessFactory implements ProcessFactory { private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class); - private static final Path WORKSPACE_MOUNT_DESTINATION = Path.of("/workspace"); + private final String namespace; + private final KubernetesClient kubeClient; + private final BlockingQueue ports; + private final Set claimedPorts = new HashSet<>(); - private final Path workspaceRoot; - - public KubeProcessFactory(Path workspaceRoot) { - this.workspaceRoot = workspaceRoot; + public KubeProcessFactory(String namespace, KubernetesClient kubeClient, BlockingQueue ports) { + this.namespace = namespace; + this.kubeClient = kubeClient; + this.ports = ports; } @Override - public Process create(String jobId, int attempt, final Path jobRoot, final String imageName, final String entrypoint, final String... args) + public Process create(String jobId, + int attempt, + final Path jobRoot, + final String imageName, + final boolean usesStdin, + final Map files, + final String entrypoint, + final String... args) throws WorkerException { - try { - final String template = MoreResources.readResource("kube_runner_template.yaml"); - // used to differentiate source and destination processes with the same id and attempt final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(); - - ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); - - final String rendered = template.replaceAll("JOBID", jobId) - .replaceAll("ATTEMPTID", String.valueOf(attempt)) - .replaceAll("IMAGE", imageName) - .replaceAll("SUFFIX", suffix) - .replaceAll("ARGS", Jsons.serialize(Arrays.asList(args))) - .replaceAll("WORKDIR", jobRoot.toString()); - - final JsonNode node = yamlMapper.readTree(rendered); - final String overrides = Jsons.serialize(node); - final String podName = "airbyte-worker-" + jobId + "-" + attempt + "-" + suffix; - final List cmd = - Lists.newArrayList( - "kubectl", - "run", - "--generator=run-pod/v1", - "--rm", - "-i", - "--pod-running-timeout=24h", - "--image=" + imageName, - "--restart=Never", - "--overrides=" + overrides, // fails if you add quotes around the overrides string - podName); - // TODO handle entrypoint override (to run DbtTransformationRunner for example) - LOGGER.debug("Preparing command: {}", Joiner.on(" ").join(cmd)); - - return new ProcessBuilder(cmd).start(); + final int stdoutLocalPort = ports.take(); + claimedPorts.add(stdoutLocalPort); + LOGGER.info("stdoutLocalPort = " + stdoutLocalPort); + + final int stderrLocalPort = ports.take(); + claimedPorts.add(stderrLocalPort); + LOGGER.info("stderrLocalPort = " + stderrLocalPort); + + final Consumer portReleaser = port -> { + if (!ports.contains(port)) { + ports.add(port); + LOGGER.info("Port consumer releasing: " + port); + } else { + LOGGER.info("Port consumer skipping releasing: " + port); + } + }; + + return new KubePodProcess( + kubeClient, + portReleaser, + podName, + namespace, + imageName, + stdoutLocalPort, + stderrLocalPort, + usesStdin, + files, + entrypoint, + args); } catch (Exception e) { throw new WorkerException(e.getMessage()); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java index 2b599ce1b1329..d61c6f0e8bf18 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/ProcessFactory.java @@ -27,6 +27,7 @@ import io.airbyte.workers.WorkerException; import java.nio.file.Path; import java.util.List; +import java.util.Map; public interface ProcessFactory { @@ -37,23 +38,34 @@ public interface ProcessFactory { * @param attempt attempt Id * @param jobPath Workspace directory to run the process from * @param imageName Docker image name to start the process from + * @param files file name to contents map that will be written into the working dir of the process + * prior to execution * @param entrypoint If not null, the default entrypoint program of the docker image can be changed * by this argument * @param args arguments to pass to the docker image being run in the new process * @return the ProcessBuilder object to run the process * @throws WorkerException */ - Process create(String jobId, int attempt, final Path jobPath, final String imageName, final String entrypoint, final String... args) + Process create(String jobId, + int attempt, + final Path jobPath, + final String imageName, + final boolean usesStdin, + final Map files, + final String entrypoint, + final String... args) throws WorkerException; default Process create(String jobId, int attempt, final Path jobPath, final String imageName, + final boolean usesStdin, + final Map files, final String entrypoint, final List args) throws WorkerException { - return create(jobId, attempt, jobPath, imageName, entrypoint, args.toArray(new String[0])); + return create(jobId, attempt, jobPath, imageName, usesStdin, files, entrypoint, args.toArray(new String[0])); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index a0398a99aff7d..b2fa3e6b02de0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -74,15 +74,14 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher, @Override public void start(StandardTargetConfig destinationConfig, Path jobRoot) throws IOException, WorkerException { Preconditions.checkState(destinationProcess == null); - IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, - Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration())); - IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(destinationConfig.getCatalog())); LOGGER.info("Running destination..."); destinationProcess = integrationLauncher.write( jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, - WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME); + Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()), + WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, + Jsons.serialize(destinationConfig.getCatalog())); // stdout logs are logged elsewhere since stdout also contains data LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination"); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index dae70ddbf2f8c..ce2a2bdf7da5b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -78,16 +78,13 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { public void start(StandardTapConfig input, Path jobRoot) throws Exception { Preconditions.checkState(sourceProcess == null); - IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getSourceConnectionConfiguration())); - IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, Jsons.serialize(input.getCatalog())); - if (input.getState() != null) { - IOs.writeFile(jobRoot, WorkerConstants.INPUT_STATE_JSON_FILENAME, Jsons.serialize(input.getState().getState())); - } - sourceProcess = integrationLauncher.read(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, + Jsons.serialize(input.getSourceConnectionConfiguration()), WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, - input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME); + Jsons.serialize(input.getCatalog()), + input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME, + input.getState() == null ? null : Jsons.serialize(input.getState().getState())); // stdout logs are logged elsewhere since stdout also contains data LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source"); diff --git a/airbyte-workers/src/main/resources/kube_runner_template.yaml b/airbyte-workers/src/main/resources/kube_runner_template.yaml deleted file mode 100644 index dd2b0ba04d6e6..0000000000000 --- a/airbyte-workers/src/main/resources/kube_runner_template.yaml +++ /dev/null @@ -1,30 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - name: airbyte-worker-JOBID-ATTEMPTID-SUFFIX -spec: - affinity: - podAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - - labelSelector: - matchExpressions: - - key: airbyte - operator: In - values: - - scheduler - topologyKey: kubernetes.io/hostname - restartPolicy: Never - containers: - - name: worker - image: IMAGE - workingDir: WORKDIR - args: ARGS - stdin: true - stdinOnce: true - volumeMounts: - - name: airbyte-volume-workspace - mountPath: /workspace - volumes: - - name: airbyte-volume-workspace - persistentVolumeClaim: - claimName: airbyte-volume-workspace diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultCheckConnectionWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultCheckConnectionWorkerTest.java index 7fbfda6d717b3..d8ce34885a8b9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultCheckConnectionWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultCheckConnectionWorkerTest.java @@ -74,7 +74,7 @@ public void setup() throws IOException, WorkerException { integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS); process = mock(Process.class); - when(integrationLauncher.check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)).thenReturn(process); + when(integrationLauncher.check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDS))).thenReturn(process); final InputStream inputStream = mock(InputStream.class); when(process.getInputStream()).thenReturn(inputStream); when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(new byte[0])); @@ -124,7 +124,7 @@ public void testProcessFail() { @Test public void testExceptionThrownInRun() throws WorkerException { - doThrow(new RuntimeException()).when(integrationLauncher).check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME); + doThrow(new RuntimeException()).when(integrationLauncher).check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDS)); final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, failureStreamFactory); assertThrows(WorkerException.class, () -> worker.run(input, jobRoot)); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java index f5e3f4484ee1d..40fb64b257b30 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultDiscoverCatalogWorkerTest.java @@ -83,7 +83,7 @@ public void setup() throws Exception { integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS); process = mock(Process.class); - when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)).thenReturn(process); + when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))).thenReturn(process); final InputStream inputStream = mock(InputStream.class); when(process.getInputStream()).thenReturn(inputStream); when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(new byte[0])); @@ -101,11 +101,6 @@ public void testDiscoverSchema() throws Exception { assertEquals(CATALOG, output); - // test that config is written to correct location on disk. - assertEquals( - Jsons.jsonNode(INPUT.getConnectionConfiguration()), - Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME))); - Assertions.assertTimeout(Duration.ofSeconds(5), () -> { while (process.getErrorStream().available() != 0) { Thread.sleep(50); @@ -134,7 +129,7 @@ public void testDiscoverSchemaProcessFail() throws Exception { @Test public void testDiscoverSchemaException() throws WorkerException { - when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)) + when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))) .thenThrow(new RuntimeException()); final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(integrationLauncher, streamFactory); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index fa5dce79172e1..3c998a542b35f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.WorkerException; @@ -40,6 +42,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,7 +66,11 @@ void setup() throws IOException, WorkerException { config = mock(JsonNode.class); catalog = mock(ConfiguredAirbyteCatalog.class); - when(processFactory.create(JOB_ID, JOB_ATTEMPT, jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, null, "run", + final Map files = ImmutableMap.of( + WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config), + WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); + + when(processFactory.create(JOB_ID, JOB_ATTEMPT, jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, false, files, null, "run", "--integration-type", "bigquery", "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME)) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 04bd6fa851b88..78d83a6f9bbdf 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -24,9 +24,12 @@ package io.airbyte.workers.process; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.workers.WorkerException; import java.nio.file.Path; +import java.util.Collections; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -37,6 +40,15 @@ class AirbyteIntegrationLauncherTest { private static final int JOB_ATTEMPT = 0; private static final Path JOB_ROOT = Path.of("abc"); public static final String FAKE_IMAGE = "fake_image"; + private static final Map CONFIG_FILES = ImmutableMap.of( + "config", "{}"); + private static final Map CONFIG_CATALOG_FILES = ImmutableMap.of( + "config", "{}", + "catalog", "{}"); + private static final Map CONFIG_CATALOG_STATE_FILES = ImmutableMap.of( + "config", "{}", + "catalog", "{}", + "state", "{}"); private ProcessFactory processFactory; private AirbyteIntegrationLauncher launcher; @@ -51,32 +63,32 @@ void setUp() { void spec() throws WorkerException { launcher.spec(JOB_ROOT); - Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null, "spec"); + Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, Collections.emptyMap(), null, "spec"); } @Test void check() throws WorkerException { - launcher.check(JOB_ROOT, "config"); + launcher.check(JOB_ROOT, "config", "{}"); - Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null, + Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null, "check", "--config", "config"); } @Test void discover() throws WorkerException { - launcher.discover(JOB_ROOT, "config"); + launcher.discover(JOB_ROOT, "config", "{}"); - Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null, + Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null, "discover", "--config", "config"); } @Test void read() throws WorkerException { - launcher.read(JOB_ROOT, "config", "catalog", "state"); + launcher.read(JOB_ROOT, "config", "{}", "catalog", "{}", "state", "{}"); - Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null, + Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_CATALOG_STATE_FILES, null, Lists.newArrayList( "read", "--config", "config", @@ -86,9 +98,9 @@ void read() throws WorkerException { @Test void write() throws WorkerException { - launcher.write(JOB_ROOT, "config", "catalog"); + launcher.write(JOB_ROOT, "config", "{}", "catalog", "{}"); - Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null, + Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, true, CONFIG_CATALOG_FILES, null, "write", "--config", "config", "--catalog", "catalog"); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java index c1cc081363ee2..ed06d4c051497 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/DockerProcessFactoryTest.java @@ -26,6 +26,9 @@ import static org.junit.jupiter.api.Assertions.*; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; import io.airbyte.workers.WorkerException; import java.io.IOException; import java.nio.file.Files; @@ -54,4 +57,17 @@ public void testImageDoesNotExist() throws IOException, WorkerException { assertFalse(processFactory.checkImageExists("airbyte/fake:0.1.2")); } + @Test + public void testFileWriting() throws IOException, WorkerException { + Path workspaceRoot = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "process_factory"); + Path jobRoot = workspaceRoot.resolve("job"); + + final DockerProcessFactory processFactory = new DockerProcessFactory(workspaceRoot, "", "", ""); + processFactory.create("job_id", 0, jobRoot, "airbyte/scheduler:dev", false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi"); + + assertEquals( + Jsons.jsonNode(ImmutableMap.of("data", 2)), + Jsons.deserialize(IOs.readFile(jobRoot, "config.json"))); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java new file mode 100644 index 0000000000000..731f5af26c85f --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/KubePodProcessTest.java @@ -0,0 +1,136 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers.process; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.airbyte.commons.docker.DockerUtils; +import io.airbyte.commons.string.Strings; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.io.Resources; + +// Disabled until we start minikube on the node. +@Disabled +public class KubePodProcessTest { + + private static final KubernetesClient K8s = new DefaultKubernetesClient(); + + private static final String ENTRYPOINT = "sh"; + + private static final String TEST_IMAGE_WITH_VAR_PATH = "Dockerfile.with_var"; + private static final String TEST_IMAGE_WITH_VAR_NAME = "worker-test:with-var"; + + private static final String TEST_IMAGE_NO_VAR_PATH = "Dockerfile.no_var"; + private static final String TEST_IMAGE_NO_VAR_NAME = "worker-test:no-var"; + + @BeforeAll + public static void setup() { + var varDockerfile = Resources.getResource(TEST_IMAGE_WITH_VAR_PATH); + DockerUtils.buildImage(varDockerfile.getPath(), TEST_IMAGE_WITH_VAR_NAME); + + var noVarDockerfile = Resources.getResource(TEST_IMAGE_NO_VAR_PATH); + DockerUtils.buildImage(noVarDockerfile.getPath(), TEST_IMAGE_NO_VAR_NAME); + } + + @Nested + class GetCommand { + + @Test + @DisplayName("Should error if image does not have the right env var set.") + public void testGetCommandFromImageNoCommand() { + assertThrows(RuntimeException.class, () -> KubePodProcess.getCommandFromImage(K8s, TEST_IMAGE_NO_VAR_NAME, "default")); + } + + @Test + @DisplayName("Should error if image does not exists.") + public void testGetCommandFromImageMissingImage() { + assertThrows(RuntimeException.class, () -> KubePodProcess.getCommandFromImage(K8s, "bad_missing_image", "default")); + } + + @Test + @DisplayName("Should retrieve the right command if image has the right env var set.") + public void testGetCommandFromImageCommandPresent() throws IOException, InterruptedException { + var command = KubePodProcess.getCommandFromImage(K8s, TEST_IMAGE_WITH_VAR_NAME, "default"); + assertEquals(ENTRYPOINT, command); + } + + } + + @Nested + class GetPodIp { + + @Test + @DisplayName("Should error when the given pod does not exists.") + public void testGetPodIpNoPod() { + assertThrows(RuntimeException.class, () -> KubePodProcess.getPodIP(K8s, "pod-does-not-exist", "default")); + } + + @Test + @DisplayName("Should return the correct pod ip.") + public void testGetPodIpGoodPod() throws InterruptedException { + var sleep = new ContainerBuilder() + .withImage("busybox") + .withName("sleep") + .withCommand("sleep", "100000") + .build(); + + var podName = Strings.addRandomSuffix("test-get-pod-good-pod", "-", 5); + Pod podDef = new PodBuilder() + .withApiVersion("v1") + .withNewMetadata() + .withName(podName) + .endMetadata() + .withNewSpec() + .withRestartPolicy("Never") + .withRestartPolicy("Never") + .withContainers(sleep) + .endSpec() + .build(); + + String namespace = "default"; + Pod pod = K8s.pods().inNamespace(namespace).createOrReplace(podDef); + K8s.resource(pod).waitUntilReady(20, TimeUnit.SECONDS); + + var ip = KubePodProcess.getPodIP(K8s, podName, namespace); + var exp = K8s.pods().inNamespace(namespace).withName(podName).get().getStatus().getPodIP(); + assertEquals(exp, ip); + K8s.resource(podDef).inNamespace(namespace).delete(); + } + + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java index 96ccedcbda332..697977f46f815 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.StandardTargetConfig; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.workers.TestConfigHelpers; @@ -87,8 +88,13 @@ public void setup() throws IOException, WorkerException { integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS); final InputStream inputStream = mock(InputStream.class); - when(integrationLauncher.write(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME)) - .thenReturn(process); + when(integrationLauncher.write( + jobRoot, + WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, + Jsons.serialize(DESTINATION_CONFIG.getDestinationConnectionConfiguration()), + WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, + Jsons.serialize(DESTINATION_CONFIG.getCatalog()))) + .thenReturn(process); when(process.isAlive()).thenReturn(true); when(process.getInputStream()).thenReturn(inputStream); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java index 2d40c766d8000..a08bc2af946b2 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java @@ -33,9 +33,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.config.StandardTapConfig; import io.airbyte.config.State; @@ -67,17 +67,19 @@ class DefaultAirbyteSourceTest { private static final String STREAM_NAME = "user_preferences"; private static final String FIELD_NAME = "favorite_color"; + private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "the future.")); + private static final JsonNode CONFIG = Jsons.jsonNode(Map.of( + "apiKey", "123", + "region", "us-east")); private static final ConfiguredAirbyteCatalog CATALOG = CatalogHelpers.createConfiguredAirbyteCatalog( "hudi:latest", NAMESPACE, Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING)); private static final StandardTapConfig SOURCE_CONFIG = new StandardTapConfig() - .withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "the future.")))) - .withSourceConnectionConfiguration(Jsons.jsonNode(Map.of( - "apiKey", "123", - "region", "us-east"))) - .withCatalog(CatalogHelpers.createConfiguredAirbyteCatalog("hudi:latest", NAMESPACE, Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING))); + .withState(new State().withState(STATE)) + .withSourceConnectionConfiguration(CONFIG) + .withCatalog(CATALOG); private static final List MESSAGES = Lists.newArrayList( AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"), @@ -100,8 +102,11 @@ public void setup() throws IOException, WorkerException { when(integrationLauncher.read( jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, + Jsons.serialize(CONFIG), WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, - WorkerConstants.INPUT_STATE_JSON_FILENAME)).thenReturn(process); + Jsons.serialize(CATALOG), + WorkerConstants.INPUT_STATE_JSON_FILENAME, + Jsons.serialize(STATE))).thenReturn(process); when(process.isAlive()).thenReturn(true); when(process.getInputStream()).thenReturn(inputStream); when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("qwer".getBytes(StandardCharsets.UTF_8))); @@ -131,16 +136,6 @@ public void testSuccessfulLifecycle() throws Exception { source.close(); - assertEquals( - Jsons.jsonNode(SOURCE_CONFIG.getSourceConnectionConfiguration()), - Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME))); - assertEquals( - Jsons.jsonNode(SOURCE_CONFIG.getState().getState()), - Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.INPUT_STATE_JSON_FILENAME))); - assertEquals( - Jsons.jsonNode(CATALOG), - Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME))); - assertEquals(MESSAGES, messages); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { diff --git a/airbyte-workers/src/test/resources/Dockerfile.no_var b/airbyte-workers/src/test/resources/Dockerfile.no_var new file mode 100644 index 0000000000000..85bf94d1bad3c --- /dev/null +++ b/airbyte-workers/src/test/resources/Dockerfile.no_var @@ -0,0 +1,3 @@ +FROM alpine:3 + +ENTRYPOINT "sh" diff --git a/airbyte-workers/src/test/resources/Dockerfile.with_var b/airbyte-workers/src/test/resources/Dockerfile.with_var new file mode 100644 index 0000000000000..0104279c9ae0c --- /dev/null +++ b/airbyte-workers/src/test/resources/Dockerfile.with_var @@ -0,0 +1,5 @@ +FROM alpine:3 + +ENV AIRBYTE_ENTRYPOINT="sh" + +ENTRYPOINT "sh" diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 7f1b06dc19b01..7972de9de95eb 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -17,3 +17,4 @@ PAPERCUPS_STORYTIME=disabled IS_DEMO=false TEMPORAL_HOST=airbyte-temporal-svc:7233 INTERNAL_API_HOST=airbyte-server-svc:8001 +TEMPORAL_WORKER_PORTS=9000,9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029 diff --git a/kube/overlays/dev/kustomization.yaml b/kube/overlays/dev/kustomization.yaml index ae958026d24b8..bfcfd90f80a4a 100644 --- a/kube/overlays/dev/kustomization.yaml +++ b/kube/overlays/dev/kustomization.yaml @@ -20,5 +20,4 @@ images: configMapGenerator: - name: airbyte-env - envs: - - .env + env: .env diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index da7c0cf4565ec..eb49ddb12e5c7 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -17,3 +17,4 @@ PAPERCUPS_STORYTIME=enabled IS_DEMO=false TEMPORAL_HOST=airbyte-temporal-svc:7233 INTERNAL_API_HOST=airbyte-server-svc:8001 +TEMPORAL_WORKER_PORTS=9000,9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029 diff --git a/kube/overlays/stable/kustomization.yaml b/kube/overlays/stable/kustomization.yaml index f60589c6546b6..1033286b54fe6 100644 --- a/kube/overlays/stable/kustomization.yaml +++ b/kube/overlays/stable/kustomization.yaml @@ -20,5 +20,4 @@ images: configMapGenerator: - name: airbyte-env - envs: - - .env + env: .env diff --git a/kube/resources/scheduler.yaml b/kube/resources/scheduler.yaml index 3dd8c3ab18dfe..35a161b6bb251 100644 --- a/kube/resources/scheduler.yaml +++ b/kube/resources/scheduler.yaml @@ -81,8 +81,42 @@ spec: configMapKeyRef: name: airbyte-env key: TEMPORAL_HOST + - name: TEMPORAL_WORKER_PORTS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_WORKER_PORTS ports: - - containerPort: 8001 + - containerPort: 9000 + - containerPort: 9001 + - containerPort: 9002 + - containerPort: 9003 + - containerPort: 9004 + - containerPort: 9005 + - containerPort: 9006 + - containerPort: 9007 + - containerPort: 9008 + - containerPort: 9009 + - containerPort: 9010 + - containerPort: 9011 + - containerPort: 9012 + - containerPort: 9013 + - containerPort: 9014 + - containerPort: 9015 + - containerPort: 9016 + - containerPort: 9017 + - containerPort: 9018 + - containerPort: 9019 + - containerPort: 9020 + - containerPort: 9021 + - containerPort: 9022 + - containerPort: 9023 + - containerPort: 9024 + - containerPort: 9025 + - containerPort: 9026 + - containerPort: 9027 + - containerPort: 9028 + - containerPort: 9029 volumeMounts: - name: airbyte-volume-configs mountPath: /configs diff --git a/settings.gradle b/settings.gradle index f781d0d65417c..5c759460eb717 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,7 @@ include ':airbyte-api' include ':airbyte-cli' include ':airbyte-cdk:python' include ':airbyte-commons' +include ':airbyte-commons-docker' include ':airbyte-config:models' include ':airbyte-config:init' include ':airbyte-config:persistence' diff --git a/tools/bin/acceptance_test_kube.sh b/tools/bin/acceptance_test_kube.sh index 4e0c0f8486d65..b4c795a7b9f26 100755 --- a/tools/bin/acceptance_test_kube.sh +++ b/tools/bin/acceptance_test_kube.sh @@ -14,7 +14,14 @@ kubectl apply -k kube/overlays/dev kubectl wait --for=condition=Available deployment/airbyte-server --timeout=300s || (kubectl describe pods && exit 1) kubectl wait --for=condition=Available deployment/airbyte-scheduler --timeout=300s || (kubectl describe pods && exit 1) -sleep 20s +# allocates a lot of time to start kube. takes a while for postgres+temporal to work things out +sleep 120s + +server_logs () { echo "server logs:" && kubectl logs deployment.apps/airbyte-server; } +scheduler_logs () { echo "scheduler logs:" && kubectl logs deployment.apps/airbyte-scheduler; } +print_all_logs () { server_logs; scheduler_logs; } + +trap "echo 'kube logs:' && print_all_logs" EXIT kubectl port-forward svc/airbyte-server-svc 8001:8001 &