fix(ws): stop v3 sync jobs getting stuck in running#60642
Merged
Conversation
we want to be able to filter by this in the logs and have both producer and consumer
this avoid stale running state to be writen in the DB
Contributor
|
Reviews (1): Last reviewed commit: "make job saves to only updated intended ..." | Re-trigger Greptile |
Gilbert09
approved these changes
May 29, 2026
danielcarletti
approved these changes
May 29, 2026
Contributor
danielcarletti
left a comment
There was a problem hiding this comment.
Love the job_id -> external_data_job_id (and similar) changes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
V3 warehouse-source syncs sometimes finish with a split state: the data loads fine and the
ExternalDataSchemaisCompleted, but theExternalDataJobis stuck inRunningwithfinished_at = NULLforever.Nothing fixes it automatically, so it shows as a phantom "still running" sync and inflates the "running jobs" counts used for billing/usage.
It's a race between two processes that both write the job row:
warehouse-sources-load) marks the jobCompleted.calculate_table_size_activityreads the job while it's stillRunning, does a slow S3 size lookup, then calls an unscopedjob.save()that writes back every column.When that save lands after the consumer's completion, it overwrites the whole row from its stale in-memory copy reverting
statustoRunningand clearingfinished_at(and never touching the schema, hence the split).Changes
workflow_activities/calculate_table_size.py: scoped the job write tojob.save(update_fields=["storage_delta_mib", "updated_at"])so it can no longer overwritestatusorfinished_at. This is the confirmed clobber.pipelines/common/extract.py: applied the same scoping toreset_rows_synced_if_needed's save (update_fields=["rows_synced", "updated_at"]), a latent twin that can fire on an extraction retry. No behavior change for the non-DLT pipeline, sinceRunningis persisted independently by the create-job activity.pipelines/pipeline_v3/postgres_queue/consumer.py: renamed the bound log contextvarsschema_id/source_id/job_idtoexternal_data_schema_id/external_data_source_id/external_data_job_id(both the per-batch bind and the recovery-sweep bind) to match the producer and make trouble shooting easier in logs.pipelines/pipeline_v3/load/processor.py: renamed the same keys on the processor's log calls. Function-call params and Prometheus metric labels were left unchanged.pipelines/pipeline_v3/postgres_queue/test_consumer.py: updated the bound-context assertion for the renamed key.tests/data_imports/test_calculate_table_size.py: new regression test that injects a concurrentCompletedbetween the activity's read and save, then asserts the status survives whilestorage_delta_mibis still written.How did you test this code?
Test run
Automatic notifications
Docs update
NO