-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Migrate OSS to temporal scheduler #12757
Conversation
@@ -233,6 +233,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, | |||
final Flyway jobsFlyway = FlywayFactory.create(jobsDataSource, DbMigrationHandler.class.getSimpleName(), JobsDatabaseMigrator.DB_IDENTIFIER, | |||
JobsDatabaseMigrator.MIGRATION_FILE_LOCATION); | |||
|
|||
// It is important that the migration to the temporal scheduler is performed before the server accepts any requests. | |||
// This is why this migration is performed here instead of in the bootloader - so that the server blocks on this. | |||
migrateExistingConnectionsToTemporalScheduler(configRepository, jobPersistence, eventRunner); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recall that as I described in this comment, the logic to cleanup non-terminal jobs is only executed just before a new job is created by the temporal workflow.
One consequence of this is that if a user upgrades while a job is running, then that job will continue to have that RUNNING
status until the next time that the connection is synced. So in the worst case, this means 24 hours could pass before that zombie job is marked as FAILED
and a new job is created.
This isn't an ideal situation, but I think it is tolerable as a one-time occurrence for this migration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to note, this was fixed in that other PR by also cleaning up jobs at the beginning of the temporal workflow, so this should no longer be an issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up question: is there a plan/scheduled work to come back and a) remove the environment variable that represented the feature flag and b) remove/clean up the code so that there is no mention of "new" scheduler, now that there is only one?
@lmossman Looks like some files in the PR need formatting. |
private final String SCHEDULER_MIGRATION_STATUS = "schedulerMigration"; | ||
|
||
@Override | ||
public boolean isSchedulerMigrated() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this something that we can remove next time we do a major version bump? i don't think we should do it just for this, but if we can remove it when we do, it would be good to make sure we track or label it somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we should be able to remove this on the next major version bump. I will add a comment in the code and also make a ticket for tracking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lmossman Do you have a ticket for the tracking I have some elements I want to add in the list of stuff we want to do for the next major bump
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes here is the ticket: #12823
LOGGER.info("Migration to temporal scheduler has already been performed"); | ||
return; | ||
} | ||
|
||
LOGGER.info("Start migration to the new scheduler..."); | ||
final Set<UUID> connectionIds = | ||
configRepository.listStandardSyncs().stream() | ||
.filter(standardSync -> standardSync.getStatus() == Status.ACTIVE || standardSync.getStatus() == Status.INACTIVE) | ||
.map(standardSync -> standardSync.getConnectionId()).collect(Collectors.toSet()); | ||
eventRunner.migrateSyncIfNeeded(connectionIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a test that check that we don't run anything if the jobPersitence returns true for the isSchedulerMigrated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I can add that test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added tests for both cases
@jdpgrailsdev yes this work is planned, we briefly discussed it this morning during sprint planning. This is the ticket: #8445 (still needs to be fleshed out but it is basically what you described) |
I merged the base branch of this PR into master without realizing that would auto-close this PR. Reopening this and rebasing it back on master |
2ffe5f8
to
e452515
Compare
…ORCHESTRATOR_ENABLED flag to true for kube .env files
@lmossman there is any impact for users? what improvements OSS can expect with new scheduler? Asking because this can be a good feature release. |
If it is a user-facing feature, can you add the 🎉 emoji in the PR title so I will know to add it to the changelog? |
@marcosmarxm @Amruta-Ranade I described the user impact in the PR description. The migration to the temporal scheduler will happen without any user action required, so user's won't necessarily see any change in terms of the user interface. But now that the scheduling is handled by temporal, OSS deployments should now be able to handle a larger scale of number of active connections in the deployment. And, for OSS deployments on kubernetes, after upgrading to the next version that will include this change, users should then be able to do high-availability upgrades of airbyte without resulting in job failures. I.e. in future upgrades, kube users can just I updated the PR title with the correct emoji, but that won't change the commit history on master since this PR was already merged in. |
* Migrate OSS to temporal scheduler * add comment about migration being performed in server * add comments about removing migration logic * formatting and add tests for migration logic * rm duplicated test * remove more duplicated build task * remove retry * disable acceptance tests that call temporal directly when on kube * set NEW_SCHEDULER and CONTAINER_ORCHESTRATOR_ENABLED env vars to true to be consistent * set default value of container orchestrator enabled to true * Revert "set default value of container orchestrator enabled to true" This reverts commit 21b3670. * Revert "set NEW_SCHEDULER and CONTAINER_ORCHESTRATOR_ENABLED env vars to true to be consistent" This reverts commit 6dd2ec0. * Revert "Revert "set NEW_SCHEDULER and CONTAINER_ORCHESTRATOR_ENABLED env vars to true to be consistent"" This reverts commit 2f40f9d. * Revert "Revert "set default value of container orchestrator enabled to true"" This reverts commit 26068d5. * fix sync workflow test * remove defunct cancellation tests due to internal temporal error * format - remove unused imports * revert changes that set container orchestrator enabled to true everywhere * remove NEW_SCHEDULER feature flag from .env files, and set CONTAINER_ORCHESTRATOR_ENABLED flag to true for kube .env files Co-authored-by: Benoit Moriceau <benoit@airbyte.io>
What
See issue comment for context: #10021 (comment)
Resolves #10021
Migrates OSS to the new scheduler.
How
This PR sets the
usesNewScheduler
feature flag totrue
for everyone regardless of the environment variable value.I also added logic to set a database metadata flag to
true
after the migration is performed, and skip the migration if it is true, in order to prevent us needing to execute this server logic on every startup. I borrowed this idea from Benoit's PR here.I have tested this out locally and it seems to properly migrate my deployment from the old scheduler to the new one.
Recommended reading order
ServerApp.java
EnvVariableFeatureFlags.java
DefaultJobPersistence.java
🚨 User Impact 🚨
The impact to the user is that they will be migrated to the new temporal-based scheduler upon upgrade, which should improve OSS Airbyte deployments' ability to handle a large number of connections.
A smooth migration requires users to spin down their existing deployment and spin it up on the upgraded version, as described in our docs here, which should prevent the old scheduler from running during the migration.
If any jobs were actively running at the time of the upgrade, then the next time those connections perform a sync, those zombie jobs will be marked as failed.