diff --git a/.env b/.env index 13d3574388d7e..efd5e5f9da734 100644 --- a/.env +++ b/.env @@ -90,7 +90,6 @@ MAX_DISCOVER_WORKERS=5 ### FEATURE FLAGS ### -NEW_SCHEDULER=false AUTO_DISABLE_FAILING_CONNECTIONS=false EXPOSE_SECRETS_IN_EXPORT=false FORCE_MIGRATE_SECRET_STORE=false diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 1fa9220fd3205..99b88045d71ec 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -518,283 +518,6 @@ jobs: label: ${{ needs.start-platform-build-runner.outputs.label }} ec2-instance-id: ${{ needs.start-platform-build-runner.outputs.ec2-instance-id }} - # Scheduler V2 - # In case of self-hosted EC2 errors, remove this block. - start-platform-new-scheduler-acceptance-runner: - name: "Platform: Start Docker w/ Scheduler v2 Test Runner" - needs: - - changes - - find_valid_pat - # Because scheduled builds on master require us to skip the changes job. Use always() to force this to run on master. - if: needs.changes.outputs.backend == 'true' || needs.changes.outputs.build == 'true' || (always() && github.ref == 'refs/heads/master') - timeout-minutes: 10 - runs-on: ubuntu-latest - outputs: - label: ${{ steps.start-ec2-runner.outputs.label }} - ec2-instance-id: ${{ steps.start-ec2-runner.outputs.ec2-instance-id }} - steps: - - name: Checkout Airbyte - uses: actions/checkout@v2 - - name: Start AWS Runner - id: start-ec2-runner - uses: ./.github/actions/start-aws-runner - with: - github-token: ${{ needs.find_valid_pat.outputs.pat }} - aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }} - platform-new-scheduler-acceptance: - # In case of self-hosted EC2 errors, remove the next two lines and uncomment the currently commented out `runs-on` line. - needs: start-platform-new-scheduler-acceptance-runner # required to start the main job when the runner is ready - runs-on: ${{ needs.start-platform-new-scheduler-acceptance-runner.outputs.label }} # run the job on the newly created runner - name: "Platform: Docker w/ Scheduler v2 Acceptance Tests" - timeout-minutes: 90 - steps: - - name: Checkout Airbyte - uses: actions/checkout@v2 - - - name: Npm Caching - uses: actions/cache@v2 - with: - path: | - ~/.npm - key: ${{ secrets.CACHE_VERSION }}-npm-${{ runner.os }}-${{ hashFiles('**/package-lock.json') }} - restore-keys: | - ${{ secrets.CACHE_VERSION }}-npm-${{ runner.os }}- - - # this intentionally does not use restore-keys so we don't mess with gradle caching - - name: Gradle Caching - uses: actions/cache@v2 - with: - path: | - ~/.gradle/caches - ~/.gradle/wrapper - **/.venv - key: ${{ secrets.CACHE_VERSION }}-${{ runner.os }}-${{ hashFiles('**/*.gradle*') }}-${{ hashFiles('**/package-lock.json') }} - - - uses: actions/setup-java@v1 - with: - java-version: "17" - - - uses: actions/setup-node@v1 - with: - node-version: "16.13.0" - - - name: Set up CI Gradle Properties - run: | - mkdir -p ~/.gradle/ - cat > ~/.gradle/gradle.properties < ~/.gradle/gradle.properties < /tmp/kind-config.yaml < result = jobDatabase.query(ctx -> ctx.select() + .from(AIRBYTE_METADATA_TABLE) + .where(DSL.field(METADATA_KEY_COL).eq(SCHEDULER_MIGRATION_STATUS)) + .fetch()); + + return result.stream().count() == 1; + } + + @Override + public void setSchedulerMigrationDone() throws IOException { + jobDatabase.query(ctx -> ctx.execute(String.format( + "INSERT INTO %s(%s, %s) VALUES('%s', '%s') ON CONFLICT (%s) DO UPDATE SET %s = '%s'", + AIRBYTE_METADATA_TABLE, + METADATA_KEY_COL, + METADATA_VAL_COL, + SCHEDULER_MIGRATION_STATUS, + true, + METADATA_KEY_COL, + METADATA_VAL_COL, + true))); + } + @Override public Optional getVersion() throws IOException { final Result result = jobDatabase.query(ctx -> ctx.select() diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index b73310ea7cd73..010957fb57985 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -248,4 +248,20 @@ List listJobStatusAndTimestampWithConnection(UUID con */ void setSecretMigrationDone() throws IOException; + /** + * Check if the scheduler has been migrated to temporal. + * + * TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next + * "major" version bump as it will no longer be needed. + */ + boolean isSchedulerMigrated() throws IOException; + + /** + * Set that the scheduler migration has been performed. + * + * TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next + * "major" version bump as it will no longer be needed. + */ + void setSchedulerMigrationDone() throws IOException; + } diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index c85e50f338f3c..2a5e7c625378e 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -512,7 +512,7 @@ void testYamlSchemas() throws IOException { } @Test - void testMigrationMetadata() throws IOException { + void testSecretMigrationMetadata() throws IOException { boolean isMigrated = jobPersistence.isSecretMigrated(); assertFalse(isMigrated); jobPersistence.setSecretMigrationDone(); @@ -520,6 +520,15 @@ void testMigrationMetadata() throws IOException { assertTrue(isMigrated); } + @Test + void testSchedulerMigrationMetadata() throws IOException { + boolean isMigrated = jobPersistence.isSchedulerMigrated(); + assertFalse(isMigrated); + jobPersistence.setSchedulerMigrationDone(); + isMigrated = jobPersistence.isSchedulerMigrated(); + assertTrue(isMigrated); + } + private long createJobAt(final Instant created_at) throws IOException { when(timeSupplier.get()).thenReturn(created_at); return jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index bfc1911cb5e0c..dc2ab226f22ca 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -4,6 +4,7 @@ package io.airbyte.server; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; @@ -233,6 +234,14 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, DbMigrationHandler.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER, JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); + // It is important that the migration to the temporal scheduler is performed before the server + // accepts any requests. + // This is why this migration is performed here instead of in the bootloader - so that the server + // blocks on this. + // TODO (https://github.com/airbytehq/airbyte/issues/12823): remove this method after the next + // "major" version bump as it will no longer be needed. + migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner); + LOGGER.info("Starting server..."); return apiFactory.create( @@ -260,14 +269,24 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, jobsFlyway); } - private static void migrateExistingConnection(final ConfigRepository configRepository, final EventRunner eventRunner) + @VisibleForTesting + static void migrateExistingConnectionsToTemporalScheduler(final ConfigRepository configRepository, + final JobPersistence jobPersistence, + final EventRunner eventRunner) throws JsonValidationException, ConfigNotFoundException, IOException { + // Skip the migration if it was already performed, to save on resources/startup time + if (jobPersistence.isSchedulerMigrated()) { + LOGGER.info("Migration to temporal scheduler has already been performed"); + return; + } + LOGGER.info("Start migration to the new scheduler..."); final Set connectionIds = configRepository.listStandardSyncs().stream() .filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE) .map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet()); eventRunner.migrateSyncIfNeeded(connectionIds); + jobPersistence.setSchedulerMigrationDone(); LOGGER.info("Done migrating to the new scheduler..."); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java b/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java new file mode 100644 index 0000000000000..491d99b601803 --- /dev/null +++ b/airbyte-server/src/test/java/io/airbyte/server/ServerAppTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.airbyte.config.StandardSync; +import io.airbyte.config.StandardSync.Status; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.scheduler.client.EventRunner; +import io.airbyte.scheduler.persistence.JobPersistence; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ServerAppTest { + + @Mock + private ConfigRepository mConfigRepository; + + @Mock + private JobPersistence mJobPersistence; + + @Mock + private EventRunner mEventRunner; + + @Test + void testMigrationAlreadyPerformed() throws Exception { + when(mJobPersistence.isSchedulerMigrated()).thenReturn(true); + + ServerApp.migrateExistingConnectionsToTemporalScheduler(mConfigRepository, mJobPersistence, mEventRunner); + + verifyNoMoreInteractions(mJobPersistence); + verifyNoMoreInteractions(mConfigRepository); + verifyNoMoreInteractions(mEventRunner); + } + + @Test + void testPerformMigration() throws Exception { + when(mJobPersistence.isSchedulerMigrated()).thenReturn(false); + + final StandardSync activeConnection = new StandardSync().withStatus(Status.ACTIVE).withConnectionId(UUID.randomUUID()); + final StandardSync inactiveConnection = new StandardSync().withStatus(Status.INACTIVE).withConnectionId(UUID.randomUUID()); + final StandardSync deprecatedConnection = new StandardSync().withStatus(Status.DEPRECATED).withConnectionId(UUID.randomUUID()); + when(mConfigRepository.listStandardSyncs()).thenReturn(List.of(activeConnection, inactiveConnection, deprecatedConnection)); + + ServerApp.migrateExistingConnectionsToTemporalScheduler(mConfigRepository, mJobPersistence, mEventRunner); + + verify(mEventRunner).migrateSyncIfNeeded(Set.of(activeConnection.getConnectionId(), inactiveConnection.getConnectionId())); + verify(mJobPersistence).setSchedulerMigrationDone(); + verifyNoMoreInteractions(mJobPersistence); + verifyNoMoreInteractions(mConfigRepository); + verifyNoMoreInteractions(mEventRunner); + } + +} diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 0638c6a58a70e..80a0e97ef2b8c 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -1172,6 +1172,8 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception @RetryingTest(3) @Order(22) + @DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public void testDeleteConnection() throws Exception { final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); @@ -1232,6 +1234,8 @@ public void testDeleteConnection() throws Exception { @Test @Order(23) + @DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { // This test only covers the specific behavior of updating a connection that does not have an // underlying temporal workflow. @@ -1275,6 +1279,8 @@ public void testUpdateConnectionWhenWorkflowUnreachable() throws Exception { @Test @Order(24) + @DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Exception { // This test only covers the specific behavior of updating a connection that does not have an // underlying temporal workflow. @@ -1331,6 +1337,8 @@ public void testManualSyncRepairsWorkflowWhenWorkflowUnreachable() throws Except @Test @Order(25) + @DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public void testResetConnectionRepairsWorkflowWhenWorkflowUnreachable() throws Exception { // This test only covers the specific behavior of updating a connection that does not have an // underlying temporal workflow. diff --git a/kube/overlays/dev-integration-test-schedulerv2/.env b/kube/overlays/dev-integration-test-schedulerv2/.env index 6764c4fb469d4..6b923016fa5ba 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/.env +++ b/kube/overlays/dev-integration-test-schedulerv2/.env @@ -65,5 +65,4 @@ JOB_KUBE_NODE_SELECTORS= JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps -NEW_SCHEDULER=true CONTAINER_ORCHESTRATOR_ENABLED=true diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 8d22b46917813..d937f08653d24 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -66,4 +66,4 @@ JOB_KUBE_NODE_SELECTORS= JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps -CONTAINER_ORCHESTRATOR_ENABLED=false +CONTAINER_ORCHESTRATOR_ENABLED=true diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index 9bd324e721f14..5c0caf4aa4787 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -68,4 +68,4 @@ JOB_KUBE_NODE_SELECTORS= JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps -CONTAINER_ORCHESTRATOR_ENABLED=false +CONTAINER_ORCHESTRATOR_ENABLED=true diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 4ecee24ca21d8..7d2673238c8c2 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -68,6 +68,6 @@ JOB_KUBE_NODE_SELECTORS= JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps -CONTAINER_ORCHESTRATOR_ENABLED=false +CONTAINER_ORCHESTRATOR_ENABLED=true CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED=true diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index ec5cb2642e0b7..8dc4093b0e755 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -68,5 +68,5 @@ JOB_KUBE_NODE_SELECTORS= JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps -CONTAINER_ORCHESTRATOR_ENABLED=false +CONTAINER_ORCHESTRATOR_ENABLED=true