Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDBC sink fails to Postgres database #609

Open
kccheung opened this issue Mar 5, 2019 · 17 comments
Open

JDBC sink fails to Postgres database #609

kccheung opened this issue Mar 5, 2019 · 17 comments

Comments

@kccheung
Copy link

kccheung commented Mar 5, 2019

I encounter the following error when I sink a topic to my postgres database:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
used by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:116)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:69)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
... 10 more

My sink connector config:

{
    "name": "pg_sink_S_COUNTS_VA_INTERMEDIATE",
    "config": {
        "topics": "S_COUNTS_VA_INTERMEDIATE",
        "tasks.max": 1,
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.ignore": "true",
        "schema.ignore": "true",
        "schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "type.name": "kafka-connect",
        "connection.url": "jdbc:postgresql://*****.com:5433/kafka-sink",
        "connection.user": "****",
        "connection.password": "*****",
        "insert.mode": "upsert",
        "auto.create": true,
        "auto.evolve": true,
        "pk.mode": "kafka",
        "pk.fields": "__connect_topic,__connect_partition,__connect_offset",
        "batch.size": 30
    }
}

ENVIRONMENT variables of my kafka-connect container:

      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_PLUGIN_PATH: /usr/share/java

I have a separate producer which can produce JSON data to Kafka without schema defined.

@rmoff
Copy link

rmoff commented Mar 28, 2019

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

The JDBC Sink requires a schema to your data. I'm not sure why this is triggering the error you're seeing, but you definitely to include a schema, either as part of your JSON or using Avro. See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

@rhauch
Copy link
Member

rhauch commented Aug 23, 2019

We'll keep this open so that we catch this earlier and give a much better exception message.

@krishkir
Copy link

Facing the same issue with mongo as source and mysql as sink.Request your help..
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"

@fabiotc
Copy link

fabiotc commented Sep 11, 2019

And if I have a schemaless origin topic, is it possible to create a separated Avro file for the topic data and use it to be able to have a well-defined schema?

@OneCricketeer
Copy link

@krishkir As explained, JDBC sink connector requires schemas to be enabled

@fabiotc You'd have to write a custom Connect SMT or other processor that could parse your topic and return a new record with a schema applied

@fabiotc
Copy link

fabiotc commented Sep 18, 2019

Thanks @Cricket007

@rmoff
Copy link

rmoff commented Jan 22, 2020

@ghost
Copy link

ghost commented Jan 23, 2020

@Cricket007 Thanks for the suggestion on writing a custom SMT. @fabiotc here is a SMT I wrote for appending schema to a record https://github.com/yousufdev/kafka-connect-append-schema, hope it helps.

@fabiotc
Copy link

fabiotc commented Jan 23, 2020

thanks @rmoff
@yousufdev thanks for this! And using your example, Is it possible to append a schema into a complex and nested JSON structure?

@ghost
Copy link

ghost commented Jan 24, 2020

@fabiotc yes you can append schema into a complex structure, just pass it in the 'schema' property of the SMT.

@ggabmc
Copy link

ggabmc commented Oct 8, 2021

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

The JDBC Sink requires a schema to your data. I'm not sure why this is triggering the error you're seeing, but you definitely to include a schema, either as part of your JSON or using Avro. See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explaine

I have this issue neeed some help here please

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:97)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2021-10-08 04:43:53,220] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2021-10-08 04:43:53,220] ERROR Task sink-postgres-file-distributed-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

This is my connector source (im pulling the string from txt file)
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
key.converter.schemas.enable=false
file=demo-file.txt
tasks.max=1
value.converter.schemas.enable=false
topic=demo-2-distributed
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

This is my jdbc connector configuration (im trying to sink to postgress)
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=postgres
topics=demo-2-distributed
tasks.max=1
key.ignore=true
schema.ignore=true
key.converter.schemas.enable=false
auto.evolve=true
connection.user=postgres
value.converter.schemas.enable=false
auto.create=true
connection.url=jdbc:postgresql://postgres:5432/postgres
value.converter=org.apache.kafka.connect.json.JsonConverter
insert.mode=upsert
key.converter=org.apache.kafka.connect.storage.StringConverter

@OneCricketeer
Copy link

java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct

JDBC sink requires Structured data, not strings.

FileStream source only writes strings unless you use a HoistField transform, for example

@ggabmc
Copy link

ggabmc commented Oct 8, 2021

java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct

JDBC sink requires Structured data, not strings.

FileStream source only writes strings unless you use a HoistField transform, for example

Hi I haver added the HoistField in the connect Source like this :

transforms=HoistField
transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistField.field=line

Now im able to see the string as json format in the topic but now im getting in the connector jdbc sink this issue

java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:97)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2021-10-08 05:52:49,638] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2021-10-08 05:52:49,639] ERROR Task sink-postgres-file-distributed-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTas

@OneCricketeer
Copy link

OneCricketeer commented Oct 8, 2021

That error has already been answered in this thread

#609 (comment)

How about using the JDBC source connector (or Debezium) from one database to another? The problem isn't the connector itself, it's the data you're sending through Connect, and the FileStream source just isn't a good example one to use

@ggabmc
Copy link

ggabmc commented Oct 8, 2021

That error has already been answered in this thread

#609 (comment)

How about using the JDBC source connector (or Debezium) from one database to another? The problem isn't the connector itself, it's the data you're sending through Connect, and the FileStream source just isn't a good example one to use

Thank you so much , I changed the way I was doing it... I'm directly sending the message to the topic with the kafka-console-producer including the schema and payload and the sink is able to write those fields in the DB, thank you for your help I'm building a prove of concept to present it at work, thanks.

  kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092  --topic demo-2-distributed
  {
  "schema": {
  "type": "struct", "optional": false, "version": 1, "fields": [
  { "field": "id", "type": "string", "optional": true },
  { "field": "artist", "type": "string", "optional": true },
  { "field": "song", "type": "string", "optional": true }
  ] },
  "payload": {
  "id": 1,
  "artist": "Rick Astley",
  "song": "Never Gonna Give You Up"
  }
  }

Kafka JDBC Sink Connector

{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.password": "postgres",
"topics": "demo-2-distributed",
"tasks.max": "1",
"key.converter.schemas.enable": "false",
"fields.whitelist": "Id,artist,song",
"auto.evolve": "true",
"connection.user": "postgres",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"insert.mode": "upsert",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"pk.mode": "kafka",
"pk.fields": "__connect_topic,__connect_partition,__connect_offset"
}

@OneCricketeer
Copy link

Keeping the schema part of each message isn't recommended. Rather, you can use Jsonschema, Avro, or Protobuf console producers + their corresponding converters

https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

@yousufdev
Copy link

yousufdev commented Oct 31, 2021

@Cricket007 Thanks for the suggestion on writing a custom SMT. @fabiotc here is a SMT I wrote for appending schema to a record https://github.com/yousufdev/kafka-connect-append-schema, hope it helps.

@OneCricketeer, @fabiotc can you please star my SMT repo again ? My account got compromised and deleted.Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants