fix(data-warehouse): use inclusive cursor on date-typed incremental sync#59033
Merged
Conversation
Contributor
|
Reviews (1): Last reviewed commit: "fix(data-warehouse): use inclusive curso..." | Re-trigger Greptile |
Contributor
There was a problem hiding this comment.
Pull request overview
This PR fixes a correctness bug in incremental syncs for SQL data warehouse sources when the incremental cursor is a Date (day-granularity) by switching the lower-bound predicate to be inclusive (>=) for Date cursors while keeping the existing exclusive behavior (>) for higher-resolution cursor types.
Changes:
- Introduces
incremental_type_to_operator()to centralize the cursor lower-bound operator choice (>=forIncrementalFieldType.Date, otherwise>). - Updates multiple SQL source query builders (Postgres, Redshift, MySQL, MSSQL, BigQuery, Snowflake) to use the helper for incremental predicates.
- Adds/extends tests covering the helper mapping and Postgres query builders, including a regression test ensuring Postgres windowed mode keeps an exclusive lower bound for Date.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| posthog/temporal/data_imports/pipelines/helpers.py | Adds incremental_type_to_operator() helper to choose >= for Date cursors and > otherwise. |
| posthog/temporal/data_imports/pipelines/test_helpers.py | Adds unit test for incremental_type_to_operator() across all IncrementalFieldType members. |
| posthog/temporal/data_imports/sources/postgres/postgres.py | Uses the helper for single-shot incremental scans and count queries; keeps > in windowed mode. |
| posthog/temporal/data_imports/sources/postgres/partitioned_tables.py | Uses the helper in per-child partition incremental queries. |
| posthog/temporal/data_imports/sources/postgres/test_postgres.py | Adds tests asserting operator selection in main, count, partition queries, plus windowed-mode regression coverage. |
| posthog/temporal/data_imports/sources/redshift/redshift.py | Uses the helper for sampled and non-sampled incremental predicates. |
| posthog/temporal/data_imports/sources/mysql/mysql.py | Uses the helper in MySQL incremental WHERE clause. |
| posthog/temporal/data_imports/sources/mssql/mssql.py | Uses the helper in MSSQL incremental WHERE clause. |
| posthog/temporal/data_imports/sources/bigquery/bigquery.py | Uses the helper in BigQuery incremental query construction. |
| posthog/temporal/data_imports/sources/snowflake/snowflake.py | Uses the helper in Snowflake incremental query construction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
MarconLP
approved these changes
May 19, 2026
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
SQL data warehouse sources (Postgres, Redshift, MySQL, MSSQL, BigQuery, Snowflake) advance their incremental cursor with
WHERE incremental_field > saved_cursor. For DateTime / Timestamp / Integer / Numeric / ObjectID columns the cursor carries enough resolution that>is safe — two distinct rows essentially never share the boundary value.For
IncrementalFieldType.Date, the cursor only carries day granularity. If sync A completes mid-day and saves cursor2026-05-13, sync B's> '2026-05-13'skips every row that lands on2026-05-13after sync A advanced the cursor — those rows are silently dropped from the warehouse on every subsequent sync. The bug is invisible: sync B "succeeds" with zero new rows for the boundary day.Changes
incremental_type_to_operator(field_type)inposthog/temporal/data_imports/pipelines/helpers.pyreturns">="forDateand">"for every otherIncrementalFieldType. Centralizing the decision keeps the six SQL sources in sync.postgres/postgres.py(main scan + count query)postgres/partitioned_tables.py(build_partition_query, called per child partition)redshift/redshift.py(sampled + non-sampled scans)mysql/mysql.pymssql/mssql.pybigquery/bigquery.pysnowflake/snowflake.py_build_querywithupper_bound_inclusiveset, used byiterate_date_windows) deliberately keeps>even forDate. Consecutive windows pass the previous window'shias the next window'slo, so>=would re-fetch every boundary row inside a single run and produce duplicates between windows. A regression test (test_windowed_mode_keeps_exclusive_lower_bound_for_date) locks this behavior in.Trade-off this PR accepts: for
APPENDsync_type on aDate-typed cursor, the next sync after a cursor advance will re-ship rows at the boundary day. Without primary-key dedup those land as duplicates. This is a smaller harm than the current silent data loss, andINCREMENTALsync_type (the much more common case) absorbs the re-shipped rows via primary-key dedup.How did you test this code?
I am an agent. Only automated tests were run — no manual sync verification.
posthog/temporal/data_imports/pipelines/test_helpers.pycovering everyIncrementalFieldTypemember.posthog/temporal/data_imports/sources/postgres/test_postgres.py:TestBuildQuery.test_operator_matches_field_type(Date,DateTime,Timestamp,Integer)TestBuildQuery.test_count_query_operator_matches_field_typeTestBuildPartitionQuery.test_operator_matches_field_typeTestBuildQuery.test_windowed_mode_keeps_exclusive_lower_bound_for_date— regression guard for the windowed pathTestIterateDateWindowsRealDb::test_yields_all_rows_over_partitioned_tablethat caught the earlier draft over-applying>=to windowed mode.Publish to changelog?
no
🤖 Agent context
>=switch onsync_type == INCREMENTALso APPEND keeps>. Rejected because most SQL sources don't havesync_typeavailable where the query is built, and the bug is severe enough (silent boundary-day loss across all syncs) that the small APPEND duplication is preferable to threading a new parameter through every source.Datetypes so>still captures the boundary day. Rejected as more invasive (every source's cursor advancement logic would need to know about it) and harder to reason about — the operator-side fix keeps the cursor's stored value honest and isolates the change to query construction.iterate_date_windows) still has the same boundary-day loss between sync runs because the first window'slois the saved cursor and uses>. Fixing that requiresiterate_date_windowsto step the initialloback by one unit forDatetypes — left for a follow-up since it has its own correctness considerations around partition snap-forward (_step_back_one) and is a distinct codepath from the single-shot scan the customer-facing bug surfaces in.