Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-postgres] : Provide option to advance LSN #34781

Merged
merged 9 commits into from
Feb 8, 2024

Conversation

akashkulk
Copy link
Contributor

@akashkulk akashkulk commented Feb 2, 2024

Closes #32946

An option is exposed to set the Debezium property in the setup config : heartbeat.action.query

Set up instructions for users are :

  1. Create a heartbeat table within the database and schema being captured that airbyte has write access to.
  2. Add this table to Airbyte's existing publication.
  3. Set a query that will periodically update that table so as to advance the LSN.

Tested locally. Some notes :

  1. By default, this property is NOT set. No change is expected.
  2. If a user sets a bad query or the airbyte user doesn't have permission to execute the given query, the postgres connector will not fail and continue syncing.

Copy link

vercel bot commented Feb 2, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Feb 8, 2024 7:42pm

Copy link
Contributor

github-actions bot commented Feb 2, 2024

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

Copy link
Contributor

github-actions bot commented Feb 2, 2024

Coverage report for source-postgres

File Coverage [46.28%]
PostgresCdcProperties.java 46.28%
Total Project Coverage 69.26% 🍏

@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Feb 5, 2024
@akashkulk akashkulk marked this pull request as ready for review February 5, 2024 18:42
@akashkulk akashkulk requested a review from a team as a code owner February 5, 2024 18:42
@@ -68,6 +68,11 @@ private static Properties commonProperties(final JdbcDatabase database) {
: HEARTBEAT_INTERVAL;
props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis()));

if (sourceConfig.get("replication_method").has("heartbeat_action_query")
&& !sourceConfig.get("replication_method").get("heartbeat_action_query").asText().isEmpty()) {
props.setProperty("heartbeat.action.query", sourceConfig.get("replication_method").get("heartbeat_action_query").asText());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems straightforward configuring an existing debezium property.
I'm just concerned we're opening a door to all sorts of direct sql manipulation.
Not sure how but would be safer to solve it in some templated way so users or some bad actor cannot inject something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If most of all customers will end up configuring
INSERT INTO airbyte_heartbeat (text) VALUES ('heartbeat')

Can we maybe change field to input something like [schema].[table] and have our code build a query?
could be reasons not to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline - I'd like to not make things harder than they are by forcing users to create a specific table. Thinking here is they could just reuse one they already have instead of creating one. WDYT?

@@ -68,6 +68,11 @@ private static Properties commonProperties(final JdbcDatabase database) {
: HEARTBEAT_INTERVAL;
props.setProperty("heartbeat.interval.ms", Long.toString(heartbeatInterval.toMillis()));

if (sourceConfig.get("replication_method").has("heartbeat_action_query")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add heartbeat_action_query to a test - even by just configuring it on a test to make sure nothing is broken.

@@ -289,6 +289,13 @@
],
"default": "After loading Data in the destination",
"order": 7
},
"heartbeat_action_query": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking: because this field is optional, existing connection lacking it would just keep working
i.e non-breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - this property is only set if it's populated, otherwise I've verified that the behavior is the same. The default value is empty

@akashkulk akashkulk merged commit e6fff38 into master Feb 8, 2024
25 checks passed
@akashkulk akashkulk deleted the akash/pg-advance-lsn branch February 8, 2024 20:13
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 21, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/source/postgres
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Source Postgres : Investigate debezium not returning records in allotted time
3 participants