Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6967: Handle bytea target field in in Postgres dialect #36

Merged
merged 1 commit into from
Oct 4, 2023

Conversation

bpaquet
Copy link
Contributor

@bpaquet bpaquet commented Sep 26, 2023

I'm not sure this is the good way to fix it, but it works :)

With this fix, the connector is able to write into bytea field. Without, it failed with the following stack:

[2023-09-25 20:53:19,470] TRACE [main|task-0] Bind field 'bin' at position 6 with type io.debezium.connector.jdbc.type.connect.ConnectBytesType: java.nio.HeapByteBuffer[pos=0 lim=4 cap=4] (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:372)
Hibernate: INSERT INTO original_tables.test_cdc (id,updated_at,created_at,name,uuid,bin) VALUES (?,?,?,?,cast(? as uuid),?) ON CONFLICT (id) DO UPDATE SET updated_at=EXCLUDED.updated_at,created_at=EXCLUDED.created_at,name=EXCLUDED.name,uuid=EXCLUDED.uuid,bin=EXCLUDED.bin
[2023-09-25 20:53:19,473] DEBUG [main|task-0] Rewinding topic dbz.cluster-1.data.public.test_cdc offset to 17. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:193)
[2023-09-25 20:53:19,473] ERROR [main|task-0] Failed to process record: Failed to process a sink record (io.debezium.connector.jdbc.JdbcSinkConnectorTask:101)
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
	at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72)
	at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:93)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "org.hibernate.metamodel.mapping.JdbcMapping.getJdbcValueBinder()" because "jdbcMapping" is null
	at org.hibernate.sql.exec.internal.AbstractJdbcParameter.bindParameterValue(AbstractJdbcParameter.java:108)
	at org.hibernate.sql.exec.internal.AbstractJdbcParameter.bindParameterValue(AbstractJdbcParameter.java:98)
	at org.hibernate.sql.exec.internal.StandardJdbcMutationExecutor.execute(StandardJdbcMutationExecutor.java:74)
	at org.hibernate.query.sql.internal.NativeNonSelectQueryPlanImpl.executeUpdate(NativeNonSelectQueryPlanImpl.java:78)
	at org.hibernate.query.sql.internal.NativeQueryImpl.doExecuteUpdate(NativeQueryImpl.java:820)
	at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:643)
	at io.debezium.connector.jdbc.JdbcChangeEventSink.writeUpsert(JdbcChangeEventSink.java:258)
	at io.debezium.connector.jdbc.JdbcChangeEventSink.write(JdbcChangeEventSink.java:217)
	at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:69)
	... 12 more

@mfvitale
Copy link
Member

Hi @bpaquet Thanks for the contribution. Maybe there is a better way to resolve it. Please have a look to BitType and see if it is possible to manage the bytea in that class or maybe create a dedicated type class for bytea.

@harveyyue
Copy link
Contributor

harveyyue commented Sep 26, 2023

Hi @bpaquet Thanks for the contribution. Maybe there is a better way to resolve it. Please have a look to BitType and see if it is possible to manage the bytea in that class or maybe create a dedicated type class for bytea.

Hi @mfvitale @bpaquet
Avro format treat bytes as ByteBuffer type, maybe apply these change to AbstractType.bind(...) will fix all type of databases

@bpaquet
Copy link
Contributor Author

bpaquet commented Sep 26, 2023

thx @harveyyue and @mfvitale . I do not have a strong opinion on patch location. I put it in PG Dialect to avoid side effect I can not test. If we want to do this for all dialect, the best place is probably to override bind in ConnectBytesType. Are you okay for this?

@mfvitale
Copy link
Member

thx @harveyyue and @mfvitale . I do not have a strong opinion on patch location. I put it in PG Dialect to avoid side effect I can not test. If we want to do this for all dialect, the best place is probably to override bind in ConnectBytesType. Are you okay for this?

Yes, ConnectBytesType can be a better places. I suggest to have a look to tests an try to reproduce the issue.

@bpaquet
Copy link
Contributor Author

bpaquet commented Sep 28, 2023

@mfvitale done. I also added an Intergation test

getSink().execute(String.format(sql, destinationTable));

consume(createRecord);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should assert that the type is correctly store on table. Have a look to other tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@mfvitale
Copy link
Member

@mfvitale done. I also added an Intergation test

Should the test be valid also for other database? Maybe you can add it to AbstractJdbcSinkTest

@harveyyue
Copy link
Contributor

harveyyue commented Sep 29, 2023

@mfvitale @bpaquet Guys, jdbc sink will use BytesType instead of ConnectBytesType in mysql dialect register type phase, so need to apply this change to base Type class.

[2023-09-29 10:38:17,776] INFO [debezium_avro_jdbc_sink|task-0] Type replaced [BYTES]: io.debezium.connector.jdbc.type.connect.ConnectBytesType -> io.debezium.connector.jdbc.dialect.mysql.BytesType (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:609)

@github-actions
Copy link

github-actions bot commented Oct 2, 2023

Hi @bpaquet, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@bpaquet
Copy link
Contributor Author

bpaquet commented Oct 2, 2023

@mfvitale done. I also added an Intergation test

Should the test be valid also for other database? Maybe you can add it to AbstractJdbcSinkTest

There is no test at the moment in this file. Do you think I should move it here?

@bpaquet
Copy link
Contributor Author

bpaquet commented Oct 2, 2023

@mfvitale @bpaquet Guys, jdbc sink will use BytesType instead of ConnectBytesType in mysql dialect register type phase, so need to apply this change to base Type class.

[2023-09-29 10:38:17,776] INFO [debezium_avro_jdbc_sink|task-0] Type replaced [BYTES]: io.debezium.connector.jdbc.type.connect.ConnectBytesType -> io.debezium.connector.jdbc.dialect.mysql.BytesType (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:609)

That's precisely why I originialy added the fix in PG Dialect. Waiting for @mfvitale opinion here :)

@mfvitale
Copy link
Member

mfvitale commented Oct 2, 2023

@mfvitale @bpaquet Guys, jdbc sink will use BytesType instead of ConnectBytesType in mysql dialect register type phase, so need to apply this change to base Type class.
[2023-09-29 10:38:17,776] INFO [debezium_avro_jdbc_sink|task-0] Type replaced [BYTES]: io.debezium.connector.jdbc.type.connect.ConnectBytesType -> io.debezium.connector.jdbc.dialect.mysql.BytesType (io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect:609)

That's precisely why I originialy added the fix in PG Dialect. Waiting for @mfvitale opinion here :)

As a general advice, if the behavior is common to other databases the fix should be applied also to the others. This means that also the tests should run for the databases impacted by the issue. Since this is an insert problem you can move the test to AbstractJdbcSinkInsertModeTest.

image

For others databases there is a dedicated BytesType so maybe adding the one specific for Postgres can be the the best way mostly if the behavior is specific to Postgres. If not maybe adding an AbstractBytesType can be good to share the behavior with other database. @Naros WDYT?

@bpaquet
Copy link
Contributor Author

bpaquet commented Oct 2, 2023

In fact this test is specific to postgres because the bytea column type. So we can consider leaving it here.

@mfvitale
Copy link
Member

mfvitale commented Oct 2, 2023

In fact this test is specific to postgres because the bytea column type. So we can consider leaving it here.

In this case the test is in the right place. Instead for the code change I think it should go in a dedicated BytesType for Postgres.

@bpaquet
Copy link
Contributor Author

bpaquet commented Oct 2, 2023

In fact this test is specific to postgres because the bytea column type. So we can consider leaving it here.

In this case the test is in the right place. Instead for the code change I think it should go in a dedicated BytesType for Postgres.

Done @mfvitale


import java.nio.ByteBuffer;

public class ConnectBytesTypePostgres extends ConnectBytesType {
Copy link
Member

@mfvitale mfvitale Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll suggest to rename it to BytesType and extend AbstractType to be uniform of what already done for others database (i.e MySQL).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bpaquet
Copy link
Contributor Author

bpaquet commented Oct 4, 2023

@mfvitale are we good to go ? :)

@mfvitale
Copy link
Member

mfvitale commented Oct 4, 2023

@mfvitale are we good to go ? :)

@bpaquet Applied, thanks.

@mfvitale mfvitale merged commit 75bddb0 into debezium:main Oct 4, 2023
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants