diff --git a/pyproject.toml b/pyproject.toml index 78116d09..00387984 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "psycopg2-binary==2.9.10", "sqlalchemy==2.0.41", "sshtunnel==0.4.0", - "singer-sdk[faker]~=0.48.0", + "singer-sdk[faker,msgspec]~=0.48.0", ] [project.urls] diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 564ff8b2..fdcda273 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -297,7 +297,7 @@ def _increment_stream_state( def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: """Return a generator of row-type dictionary objects.""" - status_interval = 5.0 # if no records in 5 seconds the tap can exit + status_interval = 1.0 # if no records in 1 second the tap can exit start_lsn = self.get_starting_replication_key_value(context=context) if start_lsn is None: start_lsn = 0 @@ -323,6 +323,10 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: "add-tables": self.fully_qualified_name, }, ) + rlist = [logical_replication_cursor] + wlist = [] + xlist = [] + now_ = datetime.datetime.now # Using scaffolding layout from: # https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor @@ -336,19 +340,13 @@ def get_records(self, context: Context | None) -> Iterable[dict[str, t.Any]]: timeout = ( status_interval - ( - datetime.datetime.now() - - logical_replication_cursor.feedback_timestamp + now_() - logical_replication_cursor.feedback_timestamp ).total_seconds() ) try: # If the timeout has passed and the cursor still has no new # messages, the sync has completed. - if ( - select.select( - [logical_replication_cursor], [], [], max(0, timeout) - )[0] - == [] - ): + if not select.select(rlist, wlist, xlist, max(0, timeout))[0]: break except InterruptedError: pass @@ -369,28 +367,30 @@ def consume(self, message, cursor) -> dict | None: row = {} - upsert_actions = {"I", "U"} - delete_actions = {"D"} - truncate_actions = {"T"} - transaction_actions = {"B", "C"} - - if message_payload["action"] in upsert_actions: - for column in message_payload["columns"]: - row.update({column["name"]: self._parse_column_value(column, cursor)}) - row.update({"_sdc_deleted_at": None}) - row.update({"_sdc_lsn": message.data_start}) - elif message_payload["action"] in delete_actions: - for column in message_payload["identity"]: - row.update({column["name"]: self._parse_column_value(column, cursor)}) + if message_payload["action"] in {"I", "U"}: + row.update( + { + column["name"]: self._parse_column_value(column, cursor) + for column in message_payload["columns"] + } + ) + row.update({"_sdc_deleted_at": None, "_sdc_lsn": message.data_start}) + elif message_payload["action"] == "D": + row.update( + { + column["name"]: self._parse_column_value(column, cursor) + for column in message_payload["identity"] + } + ) row.update( { "_sdc_deleted_at": datetime.datetime.utcnow().strftime( r"%Y-%m-%dT%H:%M:%SZ" - ) + ), + "_sdc_lsn": message.data_start, } ) - row.update({"_sdc_lsn": message.data_start}) - elif message_payload["action"] in truncate_actions: + elif message_payload["action"] == "T": self.logger.debug( ( "A message payload of %s (corresponding to a truncate action) " @@ -398,7 +398,7 @@ def consume(self, message, cursor) -> dict | None: ), message.payload, ) - elif message_payload["action"] in transaction_actions: + elif message_payload["action"] in {"B", "C"}: self.logger.debug( ( "A message payload of %s (corresponding to a transaction beginning " diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index 6c2c81a4..c9f9c269 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -14,6 +14,7 @@ import paramiko from singer_sdk import SQLStream, SQLTap, Stream from singer_sdk import typing as th # JSON schema typing helpers +from singer_sdk.contrib.msgspec import MsgSpecWriter from singer_sdk.singerlib import Catalog, Metadata, Schema from sqlalchemy.engine import URL from sqlalchemy.engine.url import make_url @@ -38,6 +39,7 @@ class TapPostgres(SQLTap): name = "tap-postgres" package_name = "meltanolabs-tap-postgres" default_stream_class = PostgresStream + message_writer_class = MsgSpecWriter def __init__( self, diff --git a/uv.lock b/uv.lock index 5b9c5b58..ac1f3336 100644 --- a/uv.lock +++ b/uv.lock @@ -577,7 +577,7 @@ source = { editable = "." } dependencies = [ { name = "paramiko" }, { name = "psycopg2-binary" }, - { name = "singer-sdk", extra = ["faker"] }, + { name = "singer-sdk", extra = ["faker", "msgspec"] }, { name = "sqlalchemy" }, { name = "sshtunnel" }, ] @@ -616,7 +616,7 @@ typing = [ requires-dist = [ { name = "paramiko", specifier = ">=3,<4" }, { name = "psycopg2-binary", specifier = "==2.9.10" }, - { name = "singer-sdk", extras = ["faker"], specifier = "~=0.48.0" }, + { name = "singer-sdk", extras = ["faker", "msgspec"], specifier = "~=0.48.0" }, { name = "sqlalchemy", specifier = "==2.0.41" }, { name = "sshtunnel", specifier = "==0.4.0" }, ] @@ -649,6 +649,49 @@ typing = [ { name = "types-sqlalchemy", specifier = ">=1.4.53.38" }, ] +[[package]] +name = "msgspec" +version = "0.19.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/cf/9b/95d8ce458462b8b71b8a70fa94563b2498b89933689f3a7b8911edfae3d7/msgspec-0.19.0.tar.gz", hash = "sha256:604037e7cd475345848116e89c553aa9a233259733ab51986ac924ab1b976f8e", size = 216934, upload-time = "2024-12-27T17:40:28.597Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/40/817282b42f58399762267b30deb8ac011d8db373f8da0c212c85fbe62b8f/msgspec-0.19.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d8dd848ee7ca7c8153462557655570156c2be94e79acec3561cf379581343259", size = 190019, upload-time = "2024-12-27T17:39:13.803Z" }, + { url = "https://files.pythonhosted.org/packages/92/99/bd7ed738c00f223a8119928661167a89124140792af18af513e6519b0d54/msgspec-0.19.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0553bbc77662e5708fe66aa75e7bd3e4b0f209709c48b299afd791d711a93c36", size = 183680, upload-time = "2024-12-27T17:39:17.847Z" }, + { url = "https://files.pythonhosted.org/packages/e5/27/322badde18eb234e36d4a14122b89edd4e2973cdbc3da61ca7edf40a1ccd/msgspec-0.19.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fe2c4bf29bf4e89790b3117470dea2c20b59932772483082c468b990d45fb947", size = 209334, upload-time = "2024-12-27T17:39:19.065Z" }, + { url = "https://files.pythonhosted.org/packages/c6/65/080509c5774a1592b2779d902a70b5fe008532759927e011f068145a16cb/msgspec-0.19.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:00e87ecfa9795ee5214861eab8326b0e75475c2e68a384002aa135ea2a27d909", size = 211551, upload-time = "2024-12-27T17:39:21.767Z" }, + { url = "https://files.pythonhosted.org/packages/6f/2e/1c23c6b4ca6f4285c30a39def1054e2bee281389e4b681b5e3711bd5a8c9/msgspec-0.19.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:3c4ec642689da44618f68c90855a10edbc6ac3ff7c1d94395446c65a776e712a", size = 215099, upload-time = "2024-12-27T17:39:24.71Z" }, + { url = "https://files.pythonhosted.org/packages/83/fe/95f9654518879f3359d1e76bc41189113aa9102452170ab7c9a9a4ee52f6/msgspec-0.19.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2719647625320b60e2d8af06b35f5b12d4f4d281db30a15a1df22adb2295f633", size = 218211, upload-time = "2024-12-27T17:39:27.396Z" }, + { url = "https://files.pythonhosted.org/packages/79/f6/71ca7e87a1fb34dfe5efea8156c9ef59dd55613aeda2ca562f122cd22012/msgspec-0.19.0-cp310-cp310-win_amd64.whl", hash = "sha256:695b832d0091edd86eeb535cd39e45f3919f48d997685f7ac31acb15e0a2ed90", size = 186174, upload-time = "2024-12-27T17:39:29.647Z" }, + { url = "https://files.pythonhosted.org/packages/24/d4/2ec2567ac30dab072cce3e91fb17803c52f0a37aab6b0c24375d2b20a581/msgspec-0.19.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aa77046904db764b0462036bc63ef71f02b75b8f72e9c9dd4c447d6da1ed8f8e", size = 187939, upload-time = "2024-12-27T17:39:32.347Z" }, + { url = "https://files.pythonhosted.org/packages/2b/c0/18226e4328897f4f19875cb62bb9259fe47e901eade9d9376ab5f251a929/msgspec-0.19.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:047cfa8675eb3bad68722cfe95c60e7afabf84d1bd8938979dd2b92e9e4a9551", size = 182202, upload-time = "2024-12-27T17:39:33.633Z" }, + { url = "https://files.pythonhosted.org/packages/81/25/3a4b24d468203d8af90d1d351b77ea3cffb96b29492855cf83078f16bfe4/msgspec-0.19.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e78f46ff39a427e10b4a61614a2777ad69559cc8d603a7c05681f5a595ea98f7", size = 209029, upload-time = "2024-12-27T17:39:35.023Z" }, + { url = "https://files.pythonhosted.org/packages/85/2e/db7e189b57901955239f7689b5dcd6ae9458637a9c66747326726c650523/msgspec-0.19.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c7adf191e4bd3be0e9231c3b6dc20cf1199ada2af523885efc2ed218eafd011", size = 210682, upload-time = "2024-12-27T17:39:36.384Z" }, + { url = "https://files.pythonhosted.org/packages/03/97/7c8895c9074a97052d7e4a1cc1230b7b6e2ca2486714eb12c3f08bb9d284/msgspec-0.19.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:f04cad4385e20be7c7176bb8ae3dca54a08e9756cfc97bcdb4f18560c3042063", size = 214003, upload-time = "2024-12-27T17:39:39.097Z" }, + { url = "https://files.pythonhosted.org/packages/61/61/e892997bcaa289559b4d5869f066a8021b79f4bf8e955f831b095f47a4cd/msgspec-0.19.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:45c8fb410670b3b7eb884d44a75589377c341ec1392b778311acdbfa55187716", size = 216833, upload-time = "2024-12-27T17:39:41.203Z" }, + { url = "https://files.pythonhosted.org/packages/ce/3d/71b2dffd3a1c743ffe13296ff701ee503feaebc3f04d0e75613b6563c374/msgspec-0.19.0-cp311-cp311-win_amd64.whl", hash = "sha256:70eaef4934b87193a27d802534dc466778ad8d536e296ae2f9334e182ac27b6c", size = 186184, upload-time = "2024-12-27T17:39:43.702Z" }, + { url = "https://files.pythonhosted.org/packages/b2/5f/a70c24f075e3e7af2fae5414c7048b0e11389685b7f717bb55ba282a34a7/msgspec-0.19.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:f98bd8962ad549c27d63845b50af3f53ec468b6318400c9f1adfe8b092d7b62f", size = 190485, upload-time = "2024-12-27T17:39:44.974Z" }, + { url = "https://files.pythonhosted.org/packages/89/b0/1b9763938cfae12acf14b682fcf05c92855974d921a5a985ecc197d1c672/msgspec-0.19.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:43bbb237feab761b815ed9df43b266114203f53596f9b6e6f00ebd79d178cdf2", size = 183910, upload-time = "2024-12-27T17:39:46.401Z" }, + { url = "https://files.pythonhosted.org/packages/87/81/0c8c93f0b92c97e326b279795f9c5b956c5a97af28ca0fbb9fd86c83737a/msgspec-0.19.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4cfc033c02c3e0aec52b71710d7f84cb3ca5eb407ab2ad23d75631153fdb1f12", size = 210633, upload-time = "2024-12-27T17:39:49.099Z" }, + { url = "https://files.pythonhosted.org/packages/d0/ef/c5422ce8af73928d194a6606f8ae36e93a52fd5e8df5abd366903a5ca8da/msgspec-0.19.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d911c442571605e17658ca2b416fd8579c5050ac9adc5e00c2cb3126c97f73bc", size = 213594, upload-time = "2024-12-27T17:39:51.204Z" }, + { url = "https://files.pythonhosted.org/packages/19/2b/4137bc2ed45660444842d042be2cf5b18aa06efd2cda107cff18253b9653/msgspec-0.19.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:757b501fa57e24896cf40a831442b19a864f56d253679f34f260dcb002524a6c", size = 214053, upload-time = "2024-12-27T17:39:52.866Z" }, + { url = "https://files.pythonhosted.org/packages/9d/e6/8ad51bdc806aac1dc501e8fe43f759f9ed7284043d722b53323ea421c360/msgspec-0.19.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:5f0f65f29b45e2816d8bded36e6b837a4bf5fb60ec4bc3c625fa2c6da4124537", size = 219081, upload-time = "2024-12-27T17:39:55.142Z" }, + { url = "https://files.pythonhosted.org/packages/b1/ef/27dd35a7049c9a4f4211c6cd6a8c9db0a50647546f003a5867827ec45391/msgspec-0.19.0-cp312-cp312-win_amd64.whl", hash = "sha256:067f0de1c33cfa0b6a8206562efdf6be5985b988b53dd244a8e06f993f27c8c0", size = 187467, upload-time = "2024-12-27T17:39:56.531Z" }, + { url = "https://files.pythonhosted.org/packages/3c/cb/2842c312bbe618d8fefc8b9cedce37f773cdc8fa453306546dba2c21fd98/msgspec-0.19.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f12d30dd6266557aaaf0aa0f9580a9a8fbeadfa83699c487713e355ec5f0bd86", size = 190498, upload-time = "2024-12-27T17:40:00.427Z" }, + { url = "https://files.pythonhosted.org/packages/58/95/c40b01b93465e1a5f3b6c7d91b10fb574818163740cc3acbe722d1e0e7e4/msgspec-0.19.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:82b2c42c1b9ebc89e822e7e13bbe9d17ede0c23c187469fdd9505afd5a481314", size = 183950, upload-time = "2024-12-27T17:40:04.219Z" }, + { url = "https://files.pythonhosted.org/packages/e8/f0/5b764e066ce9aba4b70d1db8b087ea66098c7c27d59b9dd8a3532774d48f/msgspec-0.19.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:19746b50be214a54239aab822964f2ac81e38b0055cca94808359d779338c10e", size = 210647, upload-time = "2024-12-27T17:40:05.606Z" }, + { url = "https://files.pythonhosted.org/packages/9d/87/bc14f49bc95c4cb0dd0a8c56028a67c014ee7e6818ccdce74a4862af259b/msgspec-0.19.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:60ef4bdb0ec8e4ad62e5a1f95230c08efb1f64f32e6e8dd2ced685bcc73858b5", size = 213563, upload-time = "2024-12-27T17:40:10.516Z" }, + { url = "https://files.pythonhosted.org/packages/53/2f/2b1c2b056894fbaa975f68f81e3014bb447516a8b010f1bed3fb0e016ed7/msgspec-0.19.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ac7f7c377c122b649f7545810c6cd1b47586e3aa3059126ce3516ac7ccc6a6a9", size = 213996, upload-time = "2024-12-27T17:40:12.244Z" }, + { url = "https://files.pythonhosted.org/packages/aa/5a/4cd408d90d1417e8d2ce6a22b98a6853c1b4d7cb7669153e4424d60087f6/msgspec-0.19.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a5bc1472223a643f5ffb5bf46ccdede7f9795078194f14edd69e3aab7020d327", size = 219087, upload-time = "2024-12-27T17:40:14.881Z" }, + { url = "https://files.pythonhosted.org/packages/23/d8/f15b40611c2d5753d1abb0ca0da0c75348daf1252220e5dda2867bd81062/msgspec-0.19.0-cp313-cp313-win_amd64.whl", hash = "sha256:317050bc0f7739cb30d257ff09152ca309bf5a369854bbf1e57dffc310c1f20f", size = 187432, upload-time = "2024-12-27T17:40:16.256Z" }, + { url = "https://files.pythonhosted.org/packages/ea/d0/323f867eaec1f2236ba30adf613777b1c97a7e8698e2e881656b21871fa4/msgspec-0.19.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:15c1e86fff77184c20a2932cd9742bf33fe23125fa3fcf332df9ad2f7d483044", size = 189926, upload-time = "2024-12-27T17:40:18.939Z" }, + { url = "https://files.pythonhosted.org/packages/a8/37/c3e1b39bdae90a7258d77959f5f5e36ad44b40e2be91cff83eea33c54d43/msgspec-0.19.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3b5541b2b3294e5ffabe31a09d604e23a88533ace36ac288fa32a420aa38d229", size = 183873, upload-time = "2024-12-27T17:40:20.214Z" }, + { url = "https://files.pythonhosted.org/packages/cb/a2/48f2c15c7644668e51f4dce99d5f709bd55314e47acb02e90682f5880f35/msgspec-0.19.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0f5c043ace7962ef188746e83b99faaa9e3e699ab857ca3f367b309c8e2c6b12", size = 209272, upload-time = "2024-12-27T17:40:21.534Z" }, + { url = "https://files.pythonhosted.org/packages/25/3c/aa339cf08b990c3f07e67b229a3a8aa31bf129ed974b35e5daa0df7d9d56/msgspec-0.19.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ca06aa08e39bf57e39a258e1996474f84d0dd8130d486c00bec26d797b8c5446", size = 211396, upload-time = "2024-12-27T17:40:22.897Z" }, + { url = "https://files.pythonhosted.org/packages/c7/00/c7fb9d524327c558b2803973cc3f988c5100a1708879970a9e377bdf6f4f/msgspec-0.19.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:e695dad6897896e9384cf5e2687d9ae9feaef50e802f93602d35458e20d1fb19", size = 215002, upload-time = "2024-12-27T17:40:24.341Z" }, + { url = "https://files.pythonhosted.org/packages/3f/bf/d9f9fff026c1248cde84a5ce62b3742e8a63a3c4e811f99f00c8babf7615/msgspec-0.19.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:3be5c02e1fee57b54130316a08fe40cca53af92999a302a6054cd451700ea7db", size = 218132, upload-time = "2024-12-27T17:40:25.744Z" }, + { url = "https://files.pythonhosted.org/packages/00/03/b92011210f79794958167a3a3ea64a71135d9a2034cfb7597b545a42606d/msgspec-0.19.0-cp39-cp39-win_amd64.whl", hash = "sha256:0684573a821be3c749912acf5848cce78af4298345cb2d7a8b8948a0a5a27cfe", size = 186301, upload-time = "2024-12-27T17:40:27.076Z" }, +] + [[package]] name = "mypy" version = "1.17.1" @@ -1278,6 +1321,9 @@ wheels = [ faker = [ { name = "faker" }, ] +msgspec = [ + { name = "msgspec" }, +] testing = [ { name = "pytest" }, ]