Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 Source Postgres (CDC): Sync fails when records in Postgres are deleted for tables with non null constraint on columns #8557

Closed
malcolmtivelius opened this issue Dec 6, 2021 · 9 comments 路 Fixed by #13016

Comments

@malcolmtivelius
Copy link

malcolmtivelius commented Dec 6, 2021

Environment

  • Airbyte version: v.30.39 (upgraded to v.0.32.6)
  • OS Version / Instance: debian / GCP e2-medium
  • Deployment: Docker
  • Source Connector and version: Postgres 0.3.13 (upgraded to 0.3.17)
  • Destination Connector and version: Bigquery 0.5.0
  • Severity: High
  • Step where error happened: Sync job

Current Behavior

Trying to setup CDC with postgres 12 (CloudSQL hosted)-> Bigquery, keep getting errors when data is deleted. Think it has something to do with that the postgres schema has non null constraints on certain columns. The deletes are picked up as row [id, null, null etc..] and since some of the columns have non null constraints it produces an error. Have tried with both wall2json and pgoutput

Expected Behavior

Expected behavior is that the delete events should be picked up by debezium and not throw any errors, since the non null constraints are only relevant when writing to the Postgres table, not reading from it.

Logs

LOG

[34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 2021-11-12 14:15:01 锟絒1;31mERROR锟絒m i.d.r.TableSchemaBuilder(lambda$createValueGenerator$5):269 - {} - Failed to properly convert data value for 'public.countries.code' of type varchar for row [090add40-0a14-11eb-b226-6f1fbcfccacc, null, null, null, null, null]:
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "code", schema type: STRING
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220) ~[connect-api-2.6.1.jar:?]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at org.apache.kafka.connect.data.Struct.put(Struct.java:216) ~[connect-api-2.6.1.jar:?]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:265) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.relational.RelationalChangeRecordEmitter.emitDeleteRecord(RelationalChangeRecordEmitter.java:130) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:54) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:88) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:198) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:226) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder.processNotEmptyMessage(NonStreamingWal2JsonMessageDecoder.java:78) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:42) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:482) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:474) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:185) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:147) ~[debezium-connector-postgres-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113) ~[debezium-core-1.4.2.Final.jar:1.4.2.Final]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) [?:?]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) [?:?]
锟絒34msource锟絒0m - 2021-11-12 14:15:01 INFO () DefaultAirbyteStreamFactory(lambda$create$0):61 - 	at java.lang.Thread.run(Thread.java:832) [?:?]

Steps to Reproduce

  1. Set up CDC replication sync from postgres 12 to bigquery
  2. Run a sync
  3. Delete a row from the postgres source table
  4. Run another sync

Are you willing to submit a PR?

Do not know how to solve this, it seems like and issue with debezium

@malcolmtivelius malcolmtivelius added needs-triage type/bug Something isn't working labels Dec 6, 2021
@alafanechere alafanechere added area/connectors Connector related issues cdc and removed needs-triage labels Dec 7, 2021
@alafanechere
Copy link
Contributor

@malcolmtivelius could you upgrade your Postgres connector from 0.3.13 to 0.3.17 and tell us if the error persists?

@alafanechere alafanechere changed the title Postgres to Bigquery logical replication (CDC) sync fails when records in postgres are deleted for tables with non null constraint on columns 馃悰 Source Postgres (CDC): Logical replication (CDC) sync fails when records in Postgres are deleted for tables with non null constraint on columns Dec 7, 2021
@alafanechere alafanechere changed the title 馃悰 Source Postgres (CDC): Logical replication (CDC) sync fails when records in Postgres are deleted for tables with non null constraint on columns 馃悰 Source Postgres (CDC): Sync fails when records in Postgres are deleted for tables with non null constraint on columns Dec 7, 2021
@malcolmtivelius
Copy link
Author

Hey I just upgraded Airbyte to v.0.32.6 and the Postgres connector to 0.3.17. Still running into the same issue when deleting a record! Let me know if you need anything else

@alafanechere
Copy link
Contributor

@sherifnada I'll let you prioritize 馃槃

@shrodingers
Copy link
Contributor

shrodingers commented Dec 9, 2021

Hi ! Just stumbled upon the same issue, and it seems that this is because of the way data is meant to be formatted when going out of the source, and in case of deletes, if REPLICA IDENTITY is set on the tables with DEFAULT value, we do not have information about the original data. A workaround i've find is to use REPLICA IDENTITY FULL in order to keep the previous data in cases of update / delete, and avoiding having issues in cases of delete, even if the best would be not to make some schema validation on these source events

In the meantime, i tried to understand what was happening, since it seems to be inside debezium internal behaviour with kafka connect, and i wondered if this could be because of the way DELETE tombstone events are stripped here. I really new to this stuff, but this seems to impact the same case as the issue (and it would make sense that if there is a method to indicates that the key shall be dereferenced, that when this is stripped, it mistakes about the nature of the input)

@malcolmtivelius
Copy link
Author

Thanks for letting me know about this workaround! @shrodingers I just tried it out and my issue was resolved. But as you say, it would still be beneficial to only send the primary key (and not the entire row) if a record is deleted. Will follow this issue for further updates

@yurii-bidiuk
Copy link
Contributor

yurii-bidiuk commented May 12, 2022

Hello @malcolmtivelius
Could you please update your source connector to the latest version (0.4.14) and verify if this bug is still reproducible for you?
I tried to repeat all the steps (run a sync, delete one row, run sync again) but my syncs are successful and no issues occurs
Thank you in advance

@lucienfregosibodyguard
Copy link
Contributor

lucienfregosibodyguard commented May 12, 2022

Hi @yurii-bidiuk
I work with the Postgres connector latest version 0.4.14 and I got exactly the same behavior when I delete a line and perform a sync

2022-05-12 13:46:52 source > 2022-05-12 13:46:52 ERROR i.d.r.TableSchemaBuilder(lambda$createValueGenerator$5):269 - Failed to properly convert data value for 'public.users.email' of type text for row [2f6b7eee-4378-4925-9648-55189c7b0f16, null, null, null, null, null, null, null, null]: 2022-05-12 13:46:52 source > org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "email", schema type: STRING

I fixed it with the full replica identity as suggested by @shrodingers

@shrodingers
Copy link
Contributor

@yurii-bidiuk, I think the column needs to be Non nullable in the source for the error to be reproducible (because it will fail the JSON schema validation i guess). So the DELETE event, which has all but primary key to null, will fail validation (if columns are nullable, validation succeeds i think, since null is allowed). Let me know if i can help on this issue :)

@yurii-bidiuk
Copy link
Contributor

@lucienfregosibodyguard @shrodingers thank you, guys. Previously I used a table with 2 columns (id and testData), but after increasing the number of columns to 5 a bug start reproducing for me as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Ready for implementation (prioritized)
Development

Successfully merging a pull request may close this issue.