Skip to content

Commit

Permalink
Kube Queueing POC (#3464)
Browse files Browse the repository at this point in the history
* Use CDK to generate source that can be configured to emit a certain number of records and always works.

* Checkpoint: socat works from inside the docker container.

* Override the entry point.

* Clean up and add ReadMe.

* Clean up socat.

* Checkpoint: connect to Kube cluster and list all the pods.

* Checkpoint: Sync worker pod is able to send output to the destination pod.

* Checkpoint: Sync worker creates Dest pod if none existed previously. It also waits for the pod to be ready before doing anything else. Sync worker will also remove the pod on termination.

* update readme

* Checkpoint: Dest pod does nott restart after finishing. Comment out delete command in Sync worker.

* working towards named pipes

* named pipes working

* update readme

* WIP named pipe / socat sidecar kube port forwarding (#3518)

* nearly working sources

* update

* stdin example

* move all kube testing yamls into the airbyte-workers directories. sort the airbyte-workers resource folder; place all the poc yamls together.

* Format.

* Put back the original KubeProcessBuilderFactory.

* Fix slight errors.

* Checkpoint: Worker pod knows its own IP. Successfully starts and writes to Dest pod after refactor.

* remove unused file and update readme

* Dest pod loops back into worker pod. However, the right messages do not seem to be passing in.

* Switch back to worker ip.

* SWEET VICTORY!.

* wrap kube pod in process (#3540)

also clean up kubernetes deploys.

* More clean up. (#3586)

The first 6 points of #3464.

The only interesting thing about this PR is the kube pod shutdown. For whatever reason, the OkHttpPool isn't respecting the evictAll call and 1 idle thread remains. So instead of shutting down immediately, the worker pod shuts down after 5 mins when the idle thread id reaped. There isn't an easy way to modify the pool's idle reap configuration now. I do not think this issue is blocking since it's relatively benign, so I vote we create a ticket and come back to this once we do an e2e test.

* Implements redirecting standard error as well. (#3623)

* Clean up before next implementation.

* kube process launching (#3790)

* processes must handle file mounting

* remove comment

* default to base entrypoint

* use process builder factory / select stdin / use a pool of ports

* fix up

* add super hacky copying example

* Checkpoint: Works end to end!

* Checkpoint: Use API to make sure init container is ready instead of blind sleep. Propagate exception in DefaultCheckConnectionWorker.

* Refactor KubePodProcess. Checked to make sure everything still works.

* Format.

* Clean up code. Begin putting this into variables and breaking up long constructor function.

* Add comments to explain what is happening.

* fix normalization test

* increase timeout for initcontainer

Co-authored-by: Davin Chia <davinchia@gmail.com>

* facepalm moment

* clean up kube poc pr (#3834)

* clean up

* remove source-always-works

* create separate commons-docker

* fix test

* enable kube e2e tests (#3866)

* enable kube e2e tests

* use more generally accepted env definition

* use new runners

* use its own runner and install minikube differently

* update name

* use kubectl alias

* use link instead of alias that doesn't propagate

* start minikube

* use driver=none

* go back to using action

* mess with versions

* revert runner

* install socat

* print logs after run

* also try re-runnining tasks

* always wait for file transfer

* use ports

* increase wait timeout for kube

* use different localhost ips and bump normalization to include an entrypoint

* proposed fix

* all working locally

* revert temporary changes

* revert normalization image change that's happening in a separate pr

* readability

* final comment

* Working Kube Cancel. (#3983)

* Port over the basic changes.

* Add logic to return proper exit code in the event of termination. Add comments to explain why.

* revert envs change and merge master to fix kube acceptance tests (#4012)

* use older env format

* fix build

Co-authored-by: jrhizor <me@jaredrhizor.com>
Co-authored-by: Jared Rhizor <jared@dataline.io>
  • Loading branch information
3 people committed Jun 10, 2021
1 parent 36488ef commit b04c080
Show file tree
Hide file tree
Showing 72 changed files with 1,141 additions and 313 deletions.
70 changes: 33 additions & 37 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,41 +286,37 @@ jobs:

- name: Run End-to-End Frontend Tests
run: ./tools/bin/e2e_test.sh
test_kube:
runs-on: ubuntu-latest
name: Run Acceptance Tests (Kube)
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2

# DISABLED UNTIL WE HAVE TEMPORAL ON KUBE
# test_kube:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout Airbyte
# uses: actions/checkout@v2
#
# - uses: actions/setup-java@v1
# with:
# java-version: '14'
#
# - uses: actions/setup-node@v1
# with:
# node-version: '14.7'
#
# - uses: actions/setup-python@v2
# with:
# python-version: '3.7'
#
# - name: Setup Minikube
# uses: manusa/actions-setup-minikube@v2.3.0
# with:
# minikube version: 'v1.16.0'
# kubernetes version: 'v1.19.2'
#
# - name: Install socat
# run: sudo apt-get install socat
#
# - name: Build Core Docker Images and Run Tests
# run: CORE_ONLY=true ./gradlew --no-daemon composeBuild test -x :airbyte-webapp:test --scan
# env:
# GIT_REVISION: ${{ github.sha }}
# CORE_ONLY: true
#
# - name: Run Kubernetes End-to-End Acceptance Tests
# run: |
# ./tools/bin/acceptance_test_kube.sh
- uses: actions/setup-java@v1
with:
java-version: '14'

- uses: actions/setup-node@v1
with:
node-version: '14.7'

- uses: actions/setup-python@v2
with:
python-version: '3.7'

- name: Setup Minikube
uses: manusa/actions-setup-minikube@v2.4.1
with:
minikube version: 'v1.21.0-beta.0'
kubernetes version: 'v1.20.7'

- name: Install socat (required for port forwarding)
run: sudo apt-get install socat

- name: Build Core Docker Images and Run Tests
run: CORE_ONLY=true ./gradlew --no-daemon build --scan --rerun-tasks

- name: Run Kubernetes End-to-End Acceptance Tests
run: |
IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh
11 changes: 11 additions & 0 deletions airbyte-commons-docker/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
id "java-library"
}

dependencies {
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'com.github.docker-java:docker-java:3.2.8'
implementation 'com.github.docker-java:docker-java-transport-httpclient5:3.2.8'

testImplementation 'org.apache.commons:commons-lang3:3.11'
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,36 @@

package io.airbyte.commons.docker;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.BuildImageResultCallback;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.github.dockerjava.transport.DockerHttpClient;
import java.io.File;
import java.util.Set;

public class DockerUtils {

private static final DockerClientConfig CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
private static final DockerHttpClient HTTP_CLIENT = new ApacheDockerHttpClient.Builder()
.dockerHost(CONFIG.getDockerHost())
.sslConfig(CONFIG.getSSLConfig())
.maxConnections(100)
.build();
private static final DockerClient DOCKER_CLIENT = DockerClientImpl.getInstance(CONFIG, HTTP_CLIENT);

public static String getTaggedImageName(String dockerRepository, String tag) {
return String.join(":", dockerRepository, tag);
}

public static String buildImage(String dockerFilePath, String tag) {
return DOCKER_CLIENT.buildImageCmd()
.withDockerfile(new File(dockerFilePath))
.withTags(Set.of(tag))
.exec(new BuildImageResultCallback())
.awaitImageId();
}

}
5 changes: 2 additions & 3 deletions airbyte-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ plugins {
}

dependencies {
testImplementation 'org.apache.commons:commons-lang3:3.11'

implementation group: 'org.apache.commons', name: 'commons-compress', version: '1.20'
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'org.apache.commons:commons-lang3:3.11'
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.collect.Streams;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;

public class Strings {

Expand All @@ -35,4 +36,8 @@ public static String join(Iterable<?> iterable, CharSequence separator) {
.collect(Collectors.joining(separator));
}

public static String addRandomSuffix(String base, String separator, int suffixLength) {
return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.config;

import java.nio.file.Path;
import java.util.Set;

public interface Configs {

Expand Down Expand Up @@ -62,6 +63,8 @@ public interface Configs {

String getTemporalHost();

Set<Integer> getTemporalWorkerPorts();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@

import com.google.common.base.Preconditions;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +56,7 @@ public class EnvConfigs implements Configs {
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";

private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
Expand Down Expand Up @@ -166,6 +170,13 @@ public String getTemporalHost() {
return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
}

@Override
public Set<Integer> getTemporalWorkerPorts() {
return Arrays.stream(getEnvOrDefault(TEMPORAL_WORKER_PORTS, "").split(","))
.map(Integer::valueOf)
.collect(Collectors.toSet());
}

private String getEnvOrDefault(String key, String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity());
}
Expand Down
4 changes: 2 additions & 2 deletions airbyte-db/src/test/java/io/airbyte/db/PostgresUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -58,7 +58,7 @@ static void init() {

@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final JsonNode config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -65,7 +65,7 @@ static void init() {

@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
Expand All @@ -48,7 +49,6 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -75,7 +75,7 @@ static void init() {

@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final JsonNode config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -47,7 +48,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -79,7 +79,7 @@ static void init() {
void setup() throws Exception {
jdbcStreamingQueryConfiguration = mock(JdbcStreamingQueryConfiguration.class);

final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final JsonNode config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
Expand All @@ -55,7 +56,6 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -190,7 +190,8 @@ protected void setup(TestDestinationEnv testEnv) throws Exception {
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);

config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJsonString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -69,7 +70,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -136,7 +136,7 @@ void setup(TestInfo info) throws IOException {
.build()
.getService();

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -37,7 +38,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -166,7 +166,7 @@ private static Database getDatabase(JsonNode config) {
@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -37,7 +38,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -175,7 +175,7 @@ private static Database getDatabase(JsonNode config) {
@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {
Expand Down
Loading

0 comments on commit b04c080

Please sign in to comment.