Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sybase ASE Sink Connector - Not able to transfer data(TEXT datatype) from debezium postgres source to Sybase ase sink(jconn). #1363

Open
hotpotato0 opened this issue Sep 23, 2023 · 0 comments

Comments

@hotpotato0
Copy link

hotpotato0 commented Sep 23, 2023

I trying to transfer data from debezium postgres source to Sybase ASE.
But in the target Sybase ASE(using jconn4 *sybase ase jdbc driver), TEXT data types are not worked.

Environment:
Kafka : docker.io/bitnami/kafka:2.7.0-debian-10-r68
Zookeeper : docker.io/bitnami/zookeeper:3.8.0-debian-10-r20
Debezium Postgresql source connector : https://debezium.io/documentation/reference/stable/connectors/postgresql.html (Version 2.2.1.Final)
JdbcSinkConnector (10.7.3) / Sink DBMS : Sybase ASE 16 / jdbc : jconn4-7.07-SP110

Error message
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.sybase.jdbc4.jdbc.SybLob (java.lang.String is in module java.base of loader 'bootstrap'; com.sybase.jdbc4.jdbc.SybLob is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @6be7bf6d) at com.sybase.jdbc4.tds.TdsParam.prepareForSend(TdsParam.java:230) at com.sybase.jdbc4.jdbc.ParamManager.checkParams(ParamManager.java:1180) at com.sybase.jdbc4.tds.Tds.sendDynamicExecuteParams(Tds.java:1500) at com.sybase.jdbc4.tds.Tds.dynamicExecute(Tds.java:1404) at com.sybase.jdbc4.jdbc.SybPreparedStatement.sendQuery(SybPreparedStatement.java:2883) at com.sybase.jdbc4.jdbc.SybStatement.sendBatch(SybStatement.java:1923) at com.sybase.jdbc4.jdbc.SybStatement.executeBatch(SybStatement.java:1882) at com.sybase.jdbc4.jdbc.SybStatement.executeBatch(SybStatement.java:1800) at com.sybase.jdbc4.jdbc.SybPreparedStatement.executeBatch(SybPreparedStatement.java:1832) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) ... 10 more

jdbc sink connecter config(jtds)

{
	"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
	"dialect.name": "SybaseDatabaseDialect",
	"table.name.format": "tbl_tst_cdc_05_text_sink",
	"connection.password": "******",
	"tasks.max": "1",
	"topics": "debezium_source.connect_dev.tbl_tst_cdc_05_text",
	"delete.enabled": "true",
	"connection.user": "sd",
	"name": "sybase_jdbc_sink_tbl_tst_cdc_05_text",
	"connection.url": "jdbc:jtds:sybase://localhost:3000/kis_temp",
	"value.converter": "org.apache.kafka.connect.json.JsonConverter",
	"insert.mode": "upsert",
	"pk.mode": "record_key",
	"key.converter": "org.apache.kafka.connect.json.JsonConverter",
	"pk.fields": "seq"
}

-> It's work. But this jdbc driver have a performance issues.(less than 30%, jconn4 jdbc driver)

jdbc sink connecter config(jconn4)

{
	"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
	"dialect.name": "SybaseDatabaseDialect",
	"table.name.format": "tbl_tst_cdc_05_text_sink",
	"connection.password": "******",
	"tasks.max": "1",
	"topics": "debezium_source.connect_dev.tbl_tst_cdc_05_text",
	"delete.enabled": "true",
	"connection.user": "sd",
	"name": "sybase_jdbc_sink_tbl_tst_cdc_05_text",
	"connection.url": "jdbc:sybase:Tds:localhost:3000/sink",
	"value.converter": "org.apache.kafka.connect.json.JsonConverter",
	"insert.mode": "upsert",
	"pk.mode": "record_key",
	"key.converter": "org.apache.kafka.connect.json.JsonConverter",
	"pk.fields": "seq"
}

-> not worked.

Topic(debezium_source.connect_dev.tbl_tst_cdc_05_text) message

{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "int32",
				"optional": false,
				"default": 0,
				"field": "seq"
			},
			{
				"type": "string",
				"optional": true,
				"field": "pg_text_to_ase_text"
			},
			{
				"type": "string",
				"optional": true,
				"field": "pg_text_to_ase_unitext"
			}
		],
		"optional": false,
		"name": "debezium_source.connect_dev.tbl_tst_cdc_05_text.Value"
	},
	"payload": {
		"seq": 5,
		"pg_text_to_ase_text": "67e9f30be4b4e85fb3b60c6997f8f740",
		"pg_text_to_ase_unitext": null
	}
}


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant