Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kube Queueing POC #3464

Merged
merged 47 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a934b68
Use CDK to generate source that can be configured to emit a certain n…
davinchia May 18, 2021
d2ab458
Checkpoint: socat works from inside the docker container.
davinchia May 18, 2021
1ee5c5a
Override the entry point.
davinchia May 18, 2021
001b4a7
Clean up and add ReadMe.
davinchia May 18, 2021
b90ec71
Clean up socat.
davinchia May 18, 2021
4386d49
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia May 19, 2021
1fb1b84
Checkpoint: connect to Kube cluster and list all the pods.
davinchia May 19, 2021
bd3f17f
Checkpoint: Sync worker pod is able to send output to the destination…
davinchia May 19, 2021
f9a6c9b
Checkpoint: Sync worker creates Dest pod if none existed previously. …
davinchia May 19, 2021
921af61
update readme
jrhizor May 19, 2021
de79c79
Merge branch 'davinchia/kube-queueing-poc' of github.com:airbytehq/ai…
jrhizor May 19, 2021
c01a386
Checkpoint: Dest pod does nott restart after finishing. Comment out d…
davinchia May 19, 2021
5a53726
Merge branch 'davinchia/kube-queueing-poc' of github.com:airbytehq/ai…
davinchia May 19, 2021
435e9db
working towards named pipes
jrhizor May 19, 2021
39707d4
Merge branch 'davinchia/kube-queueing-poc' of github.com:airbytehq/ai…
jrhizor May 19, 2021
b813830
named pipes working
jrhizor May 20, 2021
6f13510
update readme
jrhizor May 20, 2021
0a59f84
WIP named pipe / socat sidecar kube port forwarding (#3518)
jrhizor May 21, 2021
c4d976b
move all kube testing yamls into the airbyte-workers directories. sor…
davinchia May 21, 2021
16fca70
Format.
davinchia May 21, 2021
01be8ef
Put back the original KubeProcessBuilderFactory.
davinchia May 21, 2021
e8849f7
Fix slight errors.
davinchia May 21, 2021
2cdb8f5
Checkpoint: Worker pod knows its own IP. Successfully starts and writ…
davinchia May 21, 2021
a9bb131
remove unused file and update readme
jrhizor May 21, 2021
fc19aff
Dest pod loops back into worker pod. However, the right messages do n…
davinchia May 21, 2021
6180099
Switch back to worker ip.
davinchia May 21, 2021
4a3666c
Merge branch 'davinchia/kube-queueing-poc' of github.com:airbytehq/ai…
davinchia May 21, 2021
4109ef7
SWEET VICTORY!.
davinchia May 21, 2021
b447fe8
wrap kube pod in process (#3540)
jrhizor May 25, 2021
d605c74
More clean up. (#3586)
davinchia May 26, 2021
b5bbf16
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia May 26, 2021
be11548
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia May 26, 2021
e66190e
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia May 26, 2021
618a835
Implements redirecting standard error as well. (#3623)
davinchia May 29, 2021
2cbd150
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia May 31, 2021
fd6653f
Clean up before next implementation.
davinchia May 31, 2021
0c77fa7
Merge branch 'master' into davinchia/kube-queueing-poc
jrhizor Jun 1, 2021
4fd01f3
kube process launching (#3790)
jrhizor Jun 2, 2021
87e4a1f
Merge branch 'master' into davinchia/kube-queueing-poc
jrhizor Jun 2, 2021
a2e8555
facepalm moment
jrhizor Jun 2, 2021
5a1a3c2
clean up kube poc pr (#3834)
jrhizor Jun 3, 2021
b479910
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia Jun 4, 2021
e619191
Merge remote-tracking branch 'origin' into davinchia/kube-queueing-poc
davinchia Jun 7, 2021
03d9ed9
enable kube e2e tests (#3866)
jrhizor Jun 9, 2021
c7db626
Working Kube Cancel. (#3983)
davinchia Jun 9, 2021
f6061b4
Merge branch 'master' into davinchia/kube-queueing-poc
jrhizor Jun 9, 2021
b058f2b
revert envs change and merge master to fix kube acceptance tests (#4012)
jrhizor Jun 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 33 additions & 37 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,41 +286,37 @@ jobs:

- name: Run End-to-End Frontend Tests
run: ./tools/bin/e2e_test.sh
test_kube:
runs-on: ubuntu-latest
name: Run Acceptance Tests (Kube)
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2

# DISABLED UNTIL WE HAVE TEMPORAL ON KUBE
# test_kube:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout Airbyte
# uses: actions/checkout@v2
#
# - uses: actions/setup-java@v1
# with:
# java-version: '14'
#
# - uses: actions/setup-node@v1
# with:
# node-version: '14.7'
#
# - uses: actions/setup-python@v2
# with:
# python-version: '3.7'
#
# - name: Setup Minikube
# uses: manusa/actions-setup-minikube@v2.3.0
# with:
# minikube version: 'v1.16.0'
# kubernetes version: 'v1.19.2'
#
# - name: Install socat
# run: sudo apt-get install socat
#
# - name: Build Core Docker Images and Run Tests
# run: CORE_ONLY=true ./gradlew --no-daemon composeBuild test -x :airbyte-webapp:test --scan
# env:
# GIT_REVISION: ${{ github.sha }}
# CORE_ONLY: true
#
# - name: Run Kubernetes End-to-End Acceptance Tests
# run: |
# ./tools/bin/acceptance_test_kube.sh
- uses: actions/setup-java@v1
with:
java-version: '14'

- uses: actions/setup-node@v1
with:
node-version: '14.7'

- uses: actions/setup-python@v2
with:
python-version: '3.7'

- name: Setup Minikube
uses: manusa/actions-setup-minikube@v2.4.1
with:
minikube version: 'v1.21.0-beta.0'
kubernetes version: 'v1.20.7'

- name: Install socat (required for port forwarding)
run: sudo apt-get install socat

- name: Build Core Docker Images and Run Tests
run: CORE_ONLY=true ./gradlew --no-daemon build --scan --rerun-tasks

- name: Run Kubernetes End-to-End Acceptance Tests
run: |
IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh
11 changes: 11 additions & 0 deletions airbyte-commons-docker/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
id "java-library"
}

dependencies {
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'com.github.docker-java:docker-java:3.2.8'
implementation 'com.github.docker-java:docker-java-transport-httpclient5:3.2.8'

testImplementation 'org.apache.commons:commons-lang3:3.11'
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,36 @@

package io.airbyte.commons.docker;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.BuildImageResultCallback;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.github.dockerjava.transport.DockerHttpClient;
import java.io.File;
import java.util.Set;

public class DockerUtils {

private static final DockerClientConfig CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
private static final DockerHttpClient HTTP_CLIENT = new ApacheDockerHttpClient.Builder()
.dockerHost(CONFIG.getDockerHost())
.sslConfig(CONFIG.getSSLConfig())
.maxConnections(100)
.build();
private static final DockerClient DOCKER_CLIENT = DockerClientImpl.getInstance(CONFIG, HTTP_CLIENT);

public static String getTaggedImageName(String dockerRepository, String tag) {
return String.join(":", dockerRepository, tag);
}

public static String buildImage(String dockerFilePath, String tag) {
return DOCKER_CLIENT.buildImageCmd()
.withDockerfile(new File(dockerFilePath))
.withTags(Set.of(tag))
.exec(new BuildImageResultCallback())
.awaitImageId();
}

}
5 changes: 2 additions & 3 deletions airbyte-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ plugins {
}

dependencies {
testImplementation 'org.apache.commons:commons-lang3:3.11'

implementation group: 'org.apache.commons', name: 'commons-compress', version: '1.20'
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'org.apache.commons:commons-lang3:3.11'
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.collect.Streams;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;

public class Strings {

Expand All @@ -35,4 +36,8 @@ public static String join(Iterable<?> iterable, CharSequence separator) {
.collect(Collectors.joining(separator));
}

public static String addRandomSuffix(String base, String separator, int suffixLength) {
return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.config;

import java.nio.file.Path;
import java.util.Set;

public interface Configs {

Expand Down Expand Up @@ -62,6 +63,8 @@ public interface Configs {

String getTemporalHost();

Set<Integer> getTemporalWorkerPorts();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@

import com.google.common.base.Preconditions;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +56,7 @@ public class EnvConfigs implements Configs {
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";

private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
Expand Down Expand Up @@ -166,6 +170,13 @@ public String getTemporalHost() {
return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
}

@Override
public Set<Integer> getTemporalWorkerPorts() {
return Arrays.stream(getEnvOrDefault(TEMPORAL_WORKER_PORTS, "").split(","))
.map(Integer::valueOf)
.collect(Collectors.toSet());
}

private String getEnvOrDefault(String key, String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity());
}
Expand Down
4 changes: 2 additions & 2 deletions airbyte-db/src/test/java/io/airbyte/db/PostgresUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -58,7 +58,7 @@ static void init() {

@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final JsonNode config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -65,7 +65,7 @@ static void init() {

@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
Expand All @@ -48,7 +49,6 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -75,7 +75,7 @@ static void init() {

@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final JsonNode config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.Connection;
import java.sql.PreparedStatement;
Expand All @@ -47,7 +48,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -79,7 +79,7 @@ static void init() {
void setup() throws Exception {
jdbcStreamingQueryConfiguration = mock(JdbcStreamingQueryConfiguration.class);

final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final JsonNode config = getConfig(PSQL_DB, dbName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
Expand All @@ -55,7 +56,6 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -190,7 +190,8 @@ protected void setup(TestDestinationEnv testEnv) throws Exception {
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);

config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJsonString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -69,7 +70,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -136,7 +136,7 @@ void setup(TestInfo info) throws IOException {
.build()
.getService();

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -37,7 +38,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -166,7 +166,7 @@ private static Database getDatabase(JsonNode config) {
@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -37,7 +38,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -175,7 +175,7 @@ private static Database getDatabase(JsonNode config) {
@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);

final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {
Expand Down
Loading