Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres-source: complete implementation for for ctid and xmin sync #27302

Merged
merged 37 commits into from
Jun 22, 2023

Conversation

subodh1810
Copy link
Contributor

@subodh1810 subodh1810 commented Jun 13, 2023

Issue : #26724, #26735 and everything related to ctid + xmin syncs.

The ctid iterator comes from PR #27229 but that was test PR plus I messed up while pushing commit so closing it in favor of this PR. This PR contains end to end features for xmin and ctid compatibility syncs

I have tested this PR in my local airbyte instance as well. I set up a connector and ran it though xmin sync mode (had to edit spec to do that) and ran a couple of syncs and verified that it works as expected. The first sync went through ctid sync mode and second sync went through xmin. Here are the logs from the two syncs.
d801b200_62f7_478c_b955_c041315cc10a_logs_30_txt.txt
d801b200_62f7_478c_b955_c041315cc10a_logs_29_txt.txt

Snippet from state being saved in the database

airbyte=# select * from state where connection_id = '3e43590d-a92c-457f-9317-ed1d950be2d5';
-[ RECORD 1 ]-+--------------------------------------------------------------------------------------------------------
id            | da06520c-0727-42d2-87da-598c4263412e
connection_id | 3e43590d-a92c-457f-9317-ed1d950be2d5
state         | {"version": 2, "state_type": "xmin", "num_wraparound": 0, "xmin_raw_value": 495, "xmin_xid_value": 495}
created_at    | 2023-06-19 09:06:40.983739+00
updated_at    | 2023-06-19 09:06:40.983739+00
stream_name   | id_and_name2
namespace     | public
type          | STREAM
-[ RECORD 2 ]-+--------------------------------------------------------------------------------------------------------
id            | 396ee0f3-07f4-4831-a134-5a01d67c65e1
connection_id | 3e43590d-a92c-457f-9317-ed1d950be2d5
state         | {"version": 2, "state_type": "xmin", "num_wraparound": 0, "xmin_raw_value": 495, "xmin_xid_value": 495}
created_at    | 2023-06-19 09:06:41.201327+00
updated_at    | 2023-06-19 09:06:41.201327+00
stream_name   | names
namespace     | public
type          | STREAM
-[ RECORD 3 ]-+--------------------------------------------------------------------------------------------------------
id            | b7812b6d-830d-4613-94e7-91a5e3a81545
connection_id | 3e43590d-a92c-457f-9317-ed1d950be2d5
state         | {"version": 2, "state_type": "xmin", "num_wraparound": 0, "xmin_raw_value": 495, "xmin_xid_value": 495}
created_at    | 2023-06-19 09:06:41.206329+00
updated_at    | 2023-06-19 09:06:41.206329+00
stream_name   | id_and_name
namespace     | public
type          | STREAM

airbyte=# select * from state where connection_id = '3e43590d-a92c-457f-9317-ed1d950be2d5';
-[ RECORD 1 ]-+--------------------------------------------------------------------------------------------------------
id            | da06520c-0727-42d2-87da-598c4263412e
connection_id | 3e43590d-a92c-457f-9317-ed1d950be2d5
state         | {"version": 2, "state_type": "xmin", "num_wraparound": 0, "xmin_raw_value": 497, "xmin_xid_value": 497}
created_at    | 2023-06-19 09:06:40.983739+00
updated_at    | 2023-06-19 09:10:24.376563+00
stream_name   | id_and_name2
namespace     | public
type          | STREAM
-[ RECORD 2 ]-+--------------------------------------------------------------------------------------------------------
id            | 396ee0f3-07f4-4831-a134-5a01d67c65e1
connection_id | 3e43590d-a92c-457f-9317-ed1d950be2d5
state         | {"version": 2, "state_type": "xmin", "num_wraparound": 0, "xmin_raw_value": 497, "xmin_xid_value": 497}
created_at    | 2023-06-19 09:06:41.201327+00
updated_at    | 2023-06-19 09:10:24.655649+00
stream_name   | names
namespace     | public
type          | STREAM
-[ RECORD 3 ]-+--------------------------------------------------------------------------------------------------------
id            | b7812b6d-830d-4613-94e7-91a5e3a81545
connection_id | 3e43590d-a92c-457f-9317-ed1d950be2d5
state         | {"version": 2, "state_type": "xmin", "num_wraparound": 0, "xmin_raw_value": 497, "xmin_xid_value": 497}
created_at    | 2023-06-19 09:06:41.206329+00
updated_at    | 2023-06-19 09:10:24.657942+00
stream_name   | id_and_name
namespace     | public
type          | STREAM

@github-actions
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan and you've followed all steps in the Breaking Changes Checklist
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • You, or an Airbyter, have run /test successfully on this PR - or on a non-forked branch
  • You've updated the connector's metadata.yaml file (new!)
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@github-actions
Copy link
Contributor

github-actions bot commented Jun 13, 2023

Affected Connector Report

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to do the following as needed:

  • Run integration tests
  • Bump connector or module version
  • Add changelog
  • Publish the new version

✅ Sources (3)

Connector Version Changelog Publish
source-alloydb 2.0.28
source-alloydb-strict-encrypt 2.0.28 🔵
(ignored)
🔵
(ignored)
source-postgres-strict-encrypt 2.0.34 🔵
(ignored)
🔵
(ignored)
  • See "Actionable Items" below for how to resolve warnings and errors.

✅ Destinations (0)

Connector Version Changelog Publish
  • See "Actionable Items" below for how to resolve warnings and errors.

✅ Other Modules (0)

Actionable Items

(click to expand)

Category Status Actionable Item
Version
mismatch
The version of the connector is different from its normal variant. Please bump the version of the connector.

doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.
Changelog
doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.

changelog missing
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog.
Publish
not in seed
The connector is not in the cloud or oss registry, so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that you have added a metadata.yaml file and the expected registries are enabled.

@github-actions
Copy link
Contributor

github-actions bot commented Jun 13, 2023

Coverage report for source-postgres

File Coverage [88.48%] 🍏
XminStateManager.java 100% 🍏
AirbyteMessageWithCtid.java 100% 🍏
CtidStateIterator.java 96.39% 🍏
CtidPostgresSourceOperations.java 94.29% 🍏
XminCtidUtils.java 93.03% 🍏
PostgresCtidHandler.java 90.63% 🍏
CtidStateManager.java 88.24% 🍏
PostgresSource.java 86.15% 🍏
PostgresQueryUtils.java 81.89% 🍏
Total Project Coverage 67.8% 🍏

@subodh1810 subodh1810 changed the title add version and state type to xmin status postgres-source: logic for ctid and xmin switching + state changes Jun 13, 2023
@subodh1810
Copy link
Contributor Author

@rodireich @akashkulk please take a look at this PR, I pulled in changes from Rodi's PR #27229 but I realised it lacked things in order for me to build the logic so made such changes to unblock myself. This is still in draft cause it lacks tests but if you both are happy with the changes, I will add the tests and mark it as ready for review.

@subodh1810 subodh1810 changed the base branch from master to 26486-initial-sync-using-ctid June 13, 2023 11:59
@@ -89,7 +90,9 @@ public static XminStatus getXminStatus(final JdbcDatabase database) throws SQLEx
return new XminStatus()
.withNumWraparound(result.get(NUM_WRAPAROUND_COL).asLong())
.withXminXidValue(result.get(XMIN_XID_VALUE_COL).asLong())
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong());
.withXminRawValue(result.get(XMIN_RAW_VALUE_COL).asLong())
.withVersion(2L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this version be explicitly defined in a common class somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Friendly ping - I think we should explicitly define a common version constant somewhere (maybe in a version utility file)

final PostgresXminHandler handler = new PostgresXminHandler(database, sourceOperations, getQuoteString(), xminStatus, xminStateManager);
return handler.getIncrementalIterators(catalog, tableNameToTable, emittedAt);
final List<AirbyteStateMessage> rawStateMessages = stateManager.getRawStateMessages();

Copy link
Contributor

@akashkulk akashkulk Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as I understand at a very high-level we are :

  1. Identifying streams to sync via ctid (these include newly added streams and in progress streams)
  2. Identify streams to sync via xmin
  3. Build those interators
  4. Concatenate them

If so, can we add a quick comment indicating this is what we're doing

final List<AirbyteStateMessage> statesFromCtidSync = new ArrayList<>();
final List<AirbyteStateMessage> statesFromXminSync = new ArrayList<>();

final Set<AirbyteStreamNameNamespacePair> alreadySeenStreams = new HashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add helper methods to calculate streamsStillInCtidSync, streamsForXminSync, etc.

@@ -73,6 +73,14 @@ public XminStatus getXminStatus(final AirbyteStreamNameNamespacePair pair) {
* @return AirbyteMessage which includes information on state of records read so far
*/
public static AirbyteMessage createStateMessage(final AirbyteStreamNameNamespacePair pair, final XminStatus xminStatus) {
final AirbyteStateMessage stateMessage = getAirbyteStateMessage(pair, xminStatus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a refactor?

if (!streamsForCtidSync.isEmpty()) {
final CtidStateManager ctidStateManager = new CtidStateManager(statesFromCtidSync);
final PostgresCtidHandler ctidHandler = new PostgresCtidHandler(database, sourceOperations, getQuoteString(), ctidStateManager,
x -> new ObjectMapper().valueToTree(xminStatus),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : IMO it's super confusing to have lambdas as function arguments here and above. can we calculate them seperately and pass them in?

new ConfiguredAirbyteCatalog().withStreams(streamsForXminSync), tableNameToTable, emittedAt));
}

return Stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : More of an extension of the discussion we had in the AM

Maybe not for this PR, but this is the issue I was talking about : https://github.com/airbytehq/airbyte/issues/27115 where ideally we want to kick off an incremental xmin sync after the initial successful ctid sync.

One way to do this is to slightly augment what you have here. Even for streams that belong to streamsForCtidSync we can build iterators for the xmin query and merge them so that the xmin iterator runs after the ctid iterator.

When that happens, we'll have to make sure that the handlers can merge streams based on stream name

@octavia-squidington-iii
Copy link
Collaborator

source-postgres test report (commit 52eb61de16) - ❌

⏲️ Total pipeline duration: 1590 seconds

Step Result
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml
Connector version semver check.
Connector version increment check.
QA checks
Build connector tar
Build source-postgres docker image for platform linux/x86_64
Unit tests
Integration tests
Acceptance tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=source-postgres test

@subodh1810
Copy link
Contributor Author

subodh1810 commented Jun 21, 2023

/legacy-test connector=connectors/source-postgres

🕑 connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/5337467369
❌ connectors/source-postgres https://github.com/airbytehq/airbyte/actions/runs/5337467369
🐛 https://gradle.com/s/bbiaufos6ljss

Build Failed

Test summary info:

=========================== short test summary info ============================
FAILED test_core.py::TestConnection::test_check[inputs0] - AssertionError: as...
FAILED test_core.py::TestConnection::test_check[inputs1] - AssertionError: as...
FAILED test_core.py::TestDiscovery::test_discover[inputs0] - docker.errors.Co...
FAILED test_core.py::TestDiscovery::test_discover[inputs1] - docker.errors.Co...
ERROR test_core.py::TestDiscovery::test_defined_cursors_exist_in_schema[inputs0]
ERROR test_core.py::TestDiscovery::test_defined_cursors_exist_in_schema[inputs1]
ERROR test_core.py::TestDiscovery::test_defined_refs_exist_in_schema[inputs0]
ERROR test_core.py::TestDiscovery::test_defined_refs_exist_in_schema[inputs1]
ERROR test_core.py::TestDiscovery::test_defined_keyword_exist_in_schema[inputs0-allOf]
ERROR test_core.py::TestDiscovery::test_defined_keyword_exist_in_schema[inputs0-not]
ERROR test_core.py::TestDiscovery::test_defined_keyword_exist_in_schema[inputs1-allOf]
ERROR test_core.py::TestDiscovery::test_defined_keyword_exist_in_schema[inputs1-not]
ERROR test_core.py::TestDiscovery::test_primary_keys_exist_in_schema[inputs0]
ERROR test_core.py::TestDiscovery::test_primary_keys_exist_in_schema[inputs1]
ERROR test_core.py::TestDiscovery::test_streams_has_sync_modes[inputs0] - doc...
ERROR test_core.py::TestDiscovery::test_streams_has_sync_modes[inputs1] - doc...
ERROR test_core.py::TestDiscovery::test_additional_properties_is_true[inputs0]
ERROR test_core.py::TestDiscovery::test_additional_properties_is_true[inputs1]
ERROR test_core.py::TestDiscovery::test_backward_compatibility[inputs0] - doc...
ERROR test_core.py::TestDiscovery::test_backward_compatibility[inputs1] - doc...
ERROR test_core.py::TestBasicRead::test_read[inputs0] - docker.errors.Contain...
ERROR test_core.py::TestBasicRead::test_read[inputs1] - docker.errors.Contain...
ERROR test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
ERROR test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs1]
SKIPPED [1] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: not found in the config.
SKIPPED [2] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:100: The previous and actual specifications are identical.
SKIPPED [2] ../usr/local/lib/python3.9/site-packages/connector_acceptance_test/tests/test_core.py:695: This tests currently leads to too much failures. We need to fix the connectors at scale first.
======== 4 failed, 46 passed, 5 skipped, 20 errors in 304.82s (0:05:04) ========

@alafanechere
Copy link
Contributor

@subodh1810 I believe the automated test workflow failed due to a memory problem.
I'm re-running manually here and will share results

* Initial logic for xmin wraparound

* Add tests

* Address comments

* add xmin-wraparound check

* address review comments

* 🤖 Auto format source-postgres code [skip ci]

* missed this

* 🤖 Auto format source-postgres code [skip ci]

---------

Co-authored-by: Akash Kulkarni <akash@airbyte.io>
Co-authored-by: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com>
Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com>
@subodh1810
Copy link
Contributor Author

/approve-and-merge reason="The CI seems to have an issue + change is not going to impact any current running connector cause its for xmin which is a completely new sync mode."

@octavia-approvington
Copy link
Contributor

This looks fine!
Merged!
Imagine it being fine

@octavia-approvington octavia-approvington merged commit 8c8f041 into master Jun 22, 2023
6 checks passed
@octavia-approvington octavia-approvington deleted the state-structure-change-xmin-ctid branch June 22, 2023 08:02
@alafanechere
Copy link
Contributor

@rodireich
Copy link
Contributor

@alafanechere, @bnchrch As part of this work to sync large tables in a new way, we're also making change to the structure of our saved state. this is most likely going to need a matching change on CAT.
I filed https://github.com/airbytehq/airbyte-internal-issues/issues/1812 to track that work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants