Skip to content

Commit

Permalink
✨ Migrate OSS to temporal scheduler (#12757)
Browse files Browse the repository at this point in the history
* Migrate OSS to temporal scheduler

* add comment about migration being performed in server

* add comments about removing migration logic

* formatting and add tests for migration logic

* rm duplicated test

* remove more duplicated build task

* remove retry

* disable acceptance tests that call temporal directly when on kube

* set NEW_SCHEDULER and CONTAINER_ORCHESTRATOR_ENABLED env vars to true to be consistent

* set default value of container orchestrator enabled to true

* Revert "set default value of container orchestrator enabled to true"

This reverts commit 21b3670.

* Revert "set NEW_SCHEDULER and CONTAINER_ORCHESTRATOR_ENABLED env vars to true to be consistent"

This reverts commit 6dd2ec0.

* Revert "Revert "set NEW_SCHEDULER and CONTAINER_ORCHESTRATOR_ENABLED env vars to true to be consistent""

This reverts commit 2f40f9d.

* Revert "Revert "set default value of container orchestrator enabled to true""

This reverts commit 26068d5.

* fix sync workflow test

* remove defunct cancellation tests due to internal temporal error

* format - remove unused imports

* revert changes that set container orchestrator enabled to true everywhere

* remove NEW_SCHEDULER feature flag from .env files, and set CONTAINER_ORCHESTRATOR_ENABLED flag to true for kube .env files

Co-authored-by: Benoit Moriceau <benoit@airbyte.io>
  • Loading branch information
lmossman and benmoriceau committed May 19, 2022
1 parent a5f1366 commit 26ed385
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 291 deletions.
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ MAX_DISCOVER_WORKERS=5


### FEATURE FLAGS ###
NEW_SCHEDULER=false
AUTO_DISABLE_FAILING_CONNECTIONS=false
EXPOSE_SECRETS_IN_EXPORT=false
FORCE_MIGRATE_SECRET_STORE=false
281 changes: 0 additions & 281 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -518,283 +518,6 @@ jobs:
label: ${{ needs.start-platform-build-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-platform-build-runner.outputs.ec2-instance-id }}

# Scheduler V2
# In case of self-hosted EC2 errors, remove this block.
start-platform-new-scheduler-acceptance-runner:
name: "Platform: Start Docker w/ Scheduler v2 Test Runner"
needs:
- changes
- find_valid_pat
# Because scheduled builds on master require us to skip the changes job. Use always() to force this to run on master.
if: needs.changes.outputs.backend == 'true' || needs.changes.outputs.build == 'true' || (always() && github.ref == 'refs/heads/master')
timeout-minutes: 10
runs-on: ubuntu-latest
outputs:
label: ${{ steps.start-ec2-runner.outputs.label }}
ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Start AWS Runner
id: start-ec2-runner
uses: ./.github/actions/start-aws-runner
with:
github-token: ${{ needs.find_valid_pat.outputs.pat }}
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
platform-new-scheduler-acceptance:
# In case of self-hosted EC2 errors, remove the next two lines and uncomment the currently commented out `runs-on` line.
needs: start-platform-new-scheduler-acceptance-runner # required to start the main job when the runner is ready
runs-on: ${{ needs.start-platform-new-scheduler-acceptance-runner.outputs.label }} # run the job on the newly created runner
name: "Platform: Docker w/ Scheduler v2 Acceptance Tests"
timeout-minutes: 90
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2

- name: Npm Caching
uses: actions/cache@v2
with:
path: |
~/.npm
key: ${{ secrets.CACHE_VERSION }}-npm-${{ runner.os }}-${{ hashFiles('**/package-lock.json') }}
restore-keys: |
${{ secrets.CACHE_VERSION }}-npm-${{ runner.os }}-
# this intentionally does not use restore-keys so we don't mess with gradle caching
- name: Gradle Caching
uses: actions/cache@v2
with:
path: |
~/.gradle/caches
~/.gradle/wrapper
**/.venv
key: ${{ secrets.CACHE_VERSION }}-${{ runner.os }}-${{ hashFiles('**/*.gradle*') }}-${{ hashFiles('**/package-lock.json') }}

- uses: actions/setup-java@v1
with:
java-version: "17"

- uses: actions/setup-node@v1
with:
node-version: "16.13.0"

- name: Set up CI Gradle Properties
run: |
mkdir -p ~/.gradle/
cat > ~/.gradle/gradle.properties <<EOF
org.gradle.jvmargs=-Xmx8g -Xss4m --add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
org.gradle.workers.max=8
org.gradle.vfs.watch=false
EOF
- name: Build
run: |
SUB_BUILD=PLATFORM ./gradlew build javadoc --scan
- name: Run End-to-End Acceptance Tests with the new scheduler
run: ./tools/bin/acceptance_test_with_new_scheduler.sh

# In case of self-hosted EC2 errors, remove this block.
stop-platform-new-scheduler-acceptance-runner:
name: "Platform: Stop Docker w/ Scheduler v2 Test Runner"
timeout-minutes: 10
needs:
- start-platform-new-scheduler-acceptance-runner # required to get output from the start-runner job
- platform-new-scheduler-acceptance # required to wait when the main job is done
- find_valid_pat
runs-on: ubuntu-latest
# Always is required to stop the runner even if the previous job has errors. However always() runs even if the previous step is skipped.
# Thus, we check for skipped here.
if: ${{ always() && needs.start-platform-new-scheduler-acceptance-runner.result != 'skipped'}}
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-2
- name: Stop EC2 runner
uses: supertopher/ec2-github-runner@base64v1.0.10
with:
mode: stop
label: ${{ needs.start-platform-new-scheduler-acceptance-runner.outputs.label }}
github-token: ${{ needs.find_valid_pat.outputs.pat }}
ec2-instance-id: ${{ needs.start-platform-new-scheduler-acceptance-runner.outputs.ec2-instance-id }}

## Kube Acceptance Tests
# Docker acceptance tests run as part of the build job.
# In case of self-hosted EC2 errors, remove this block.
start-kube-acceptance-test-runner:
name: "Platform: Start Kube Acceptance Test EC2 Runner"
needs:
- changes
- find_valid_pat
# Because scheduled builds on master require us to skip the changes job. Use always() to force this to run on master.
if: needs.changes.outputs.backend == 'true' || needs.changes.outputs.build == 'true' || (always() && github.ref == 'refs/heads/master')
timeout-minutes: 10
runs-on: ubuntu-latest
outputs:
label: ${{ steps.start-ec2-runner.outputs.label }}
ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
- name: Start AWS Runner
id: start-ec2-runner
uses: ./.github/actions/start-aws-runner
with:
# github-self-hosted-runner-ubuntu-20-with-150gdisk-docker-20.10.7-and-socat
ec2-image-id: ami-0c1a9bc22624339d8
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
github-token: ${{ needs.find_valid_pat.outputs.pat }}
kube-acceptance-test:
name: "Platform: Acceptance Tests (Kube)"
# In case of self-hosted EC2 errors, removed the `needs` line and switch back to running on ubuntu-latest.
needs: start-kube-acceptance-test-runner # required to start the main job when the runner is ready
runs-on: ${{ needs.start-kube-acceptance-test-runner.outputs.label }} # run the job on the newly created runner
environment: more-secrets
timeout-minutes: 90
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2

- name: Cache Build Artifacts
uses: ./.github/actions/cache-build-artifacts
with:
cache-key: ${{ secrets.CACHE_VERSION }}
cache-python: "false"

- uses: actions/setup-java@v1
with:
java-version: "17"

- uses: actions/setup-node@v1
with:
node-version: "16.13.0"

- name: Fix EC-2 Runner
run: |
mkdir -p /home/runner
- name: Set up CI Gradle Properties
run: |
mkdir -p ~/.gradle/
cat > ~/.gradle/gradle.properties <<EOF
org.gradle.jvmargs=-Xmx8g -Xss4m --add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \
--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
org.gradle.workers.max=8
org.gradle.vfs.watch=false
EOF
- name: Create cluster config file
run: |
cat > /tmp/kind-config.yaml <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
EOF
- name: Check Docker System Info
run: docker system info

- name: KIND Kubernetes Cluster Setup
uses: helm/kind-action@v1.2.0
with:
node_image: kindest/node:v1.21.2
config: /tmp/kind-config.yaml
# In case of self-hosted EC2 errors, remove this env block.
env:
USER: root
HOME: /home/runner
CHANGE_MINIKUBE_NONE_USER: true

- name: Describe kube nodes
run: kubectl describe nodes
env:
USER: root
HOME: /home/runner

- name: Build Platform Docker Images
run: SUB_BUILD=PLATFORM ./gradlew assemble -x test --scan

- name: Run Logging Tests
run: ./tools/bin/cloud_storage_logging_test.sh
env:
# AWS_S3_INTEGRATION_TEST_CREDS can be found in LastPass as AWS_S3_INTEGRATION_TEST_CREDS
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
# GOOGLE_CLOUD_STORAGE_TEST_CREDS can be found in LastPass as "google cloud storage ( gcs ) test creds"
GOOGLE_CLOUD_STORAGE_TEST_CREDS: ${{ secrets.GOOGLE_CLOUD_STORAGE_TEST_CREDS }}

- name: Run Kubernetes End-to-End Acceptance Tests
env:
USER: root
HOME: /home/runner
# AWS_S3_INTEGRATION_TEST_CREDS can be found in LastPass as AWS_S3_INTEGRATION_TEST_CREDS
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
SECRET_STORE_GCP_CREDENTIALS: ${{ secrets.SECRET_STORE_GCP_CREDENTIALS }}
SECRET_STORE_GCP_PROJECT_ID: ${{ secrets.SECRET_STORE_GCP_PROJECT_ID }}
run: |
CI=true IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh
- uses: actions/upload-artifact@v2
if: failure()
with:
name: Kubernetes Logs
path: /tmp/kubernetes_logs/*

- name: Show Disk Usage
run: |
df -h
docker system df
- name: Run AWS/GCP Cloud Integration Tests
env:
USER: root
HOME: /home/runner
# AWS_S3_INTEGRATION_TEST_CREDS can be found in LastPass as AWS_S3_INTEGRATION_TEST_CREDS
AWS_S3_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_S3_INTEGRATION_TEST_CREDS }}
SECRET_STORE_GCP_CREDENTIALS: ${{ secrets.SECRET_STORE_GCP_CREDENTIALS }}
SECRET_STORE_GCP_PROJECT_ID: ${{ secrets.SECRET_STORE_GCP_PROJECT_ID }}
run: |
CI=true ./tools/bin/cloud_integration_tests.sh
# In case of self-hosted EC2 errors, remove this block.
stop-kube-acceptance-test-runner:
name: "Platform: Stop Kube Acceptance Test EC2 Runner"
timeout-minutes: 10
needs:
- start-kube-acceptance-test-runner # required to get output from the start-runner job
- kube-acceptance-test # required to wait when the main job is done
- find_valid_pat
runs-on: ubuntu-latest
# Always is required to stop the runner even if the previous job has errors. However always() runs even if the previous step is skipped.
# Thus, we check for skipped here.
if: ${{ always() && needs.start-kube-acceptance-test-runner.result != 'skipped'}}
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-2
- name: Stop EC2 runner
uses: supertopher/ec2-github-runner@base64v1.0.10
with:
mode: stop
github-token: ${{ needs.find_valid_pat.outputs.pat }}
label: ${{ needs.start-kube-acceptance-test-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-kube-acceptance-test-runner.outputs.ec2-instance-id }}

## Kube Acceptance Tests (with scheduler v2 - both temporal changes and container orchestrator)
# Docker acceptance tests run as part of the build job.
# In case of self-hosted EC2 errors, remove this block.
Expand Down Expand Up @@ -939,8 +662,6 @@ jobs:
- frontend-build
- octavia-cli-build
- platform-build
- platform-new-scheduler-acceptance
- kube-acceptance-test
- kube-acceptance-test-v2
if: ${{ failure() && github.ref == 'refs/heads/master' }}
steps:
Expand All @@ -965,8 +686,6 @@ jobs:
- frontend-build
- octavia-cli-build
- platform-build
- platform-new-scheduler-acceptance
- kube-acceptance-test
- kube-acceptance-test-v2
if: success()
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

@Override
public boolean usesNewScheduler() {
log.info("New Scheduler: " + Boolean.parseBoolean(System.getenv("NEW_SCHEDULER")));
// TODO: sweep this method along with the scheduler
log.info("New Scheduler: true (post-migration)");

return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER"));
// After migrating all OSS users onto the new temporal scheduler, this should always return true.
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,32 @@ public void setSecretMigrationDone() throws IOException {
true)));
}

private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration";

@Override
public boolean isSchedulerMigrated() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
.from(AIRBYTE_METADATA_TABLE)
.where(DSL.field(METADATA_KEY_COL).eq(SCHEDULER_MIGRATION_STATUS))
.fetch());

return result.stream().count() == 1;
}

@Override
public void setSchedulerMigrationDone() throws IOException {
jobDatabase.query(ctx -> ctx.execute(String.format(
"INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'",
AIRBYTE_METADATA_TABLE,
METADATA_KEY_COL,
METADATA_VAL_COL,
SCHEDULER_MIGRATION_STATUS,
true,
METADATA_KEY_COL,
METADATA_VAL_COL,
true)));
}

@Override
public Optional<String> getVersion() throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.select()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,20 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con
*/
void setSecretMigrationDone() throws IOException;

/**
* Check if the scheduler has been migrated to temporal.
*
* TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
* "major" version bump as it will no longer be needed.
*/
boolean isSchedulerMigrated() throws IOException;

/**
* Set that the scheduler migration has been performed.
*
* TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next
* "major" version bump as it will no longer be needed.
*/
void setSchedulerMigrationDone() throws IOException;

}

0 comments on commit 26ed385

Please sign in to comment.