Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decimal field with default value causes exception during deserialization #833

Closed
gunnarmorling opened this issue Jun 14, 2018 · 16 comments
Closed

Comments

@gunnarmorling
Copy link

In my Kafka Connect schema I have a field of type Decimal with a default value of 1.23. This value and schema can be serialized using the Avro serializer, but during deserialization (within a KC sink connector), this causes an exception:

connect_1             | org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
connect_1             | 	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1467)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1467)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
connect_1             | 	at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
connect_1             | 	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
connect_1             | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
connect_1             | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
connect_1             | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
connect_1             | 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
connect_1             | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
connect_1             | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
connect_1             | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1             | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1             | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect_1             | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect_1             | 	at java.lang.Thread.run(Thread.java:748)
connect_1             | Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type BYTES: class [B for field: "null"
connect_1             | 	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
connect_1             | 	at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:212)
connect_1             | 	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
connect_1             | 	... 19 more

From what I can say, the reason seems to be that the default value as retrieved from the Avro message (which is a byte array containing a single byte 123) is passed as is as the default value to the schema builder of the Connect schema. There it only allows BigDecimal as default values, though. Interestingly, this apparently worked with 4.0.0, but I'm seeing this exception as of 4.1.0.

@gunnarmorling
Copy link
Author

I'm having the same issue with org.apache.kafka.connect.data.Date. When it's deserialized from an Avro message, that message's int value is tried to be set as default value, while only java.util.Date is allowed as per org.apache.kafka.connect.data.ConnectSchema#LOGICAL_TYPE_CLASSES.

@maver1ck
Copy link

There is also problem with other logical types.

@lexnjugz
Copy link

@gunnarmorling did you get a solution to the above on how to treat decimal defaults values

@gunnarmorling
Copy link
Author

Nope, no solution really apart from using JSON instead of Avro :(

@kiakaku
Copy link

kiakaku commented Oct 13, 2018

Can i change AvroData from schema-registry to fix all logicalType?

@jsalmeron
Copy link

Indeed we are encountering the same problem as well.
Our schema serialized through debezium's parser:
The schema is as follows: { "subject": "test.test.joel2-value", "version": 3, "id": 3000, "schema": "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"test.test.joel2\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"c\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.default\":\"1970-01-01T00:00:00Z\",\"connect.name\":\"io.debezium.time.ZonedTimestamp\"},\"default\":\"1970-01-01T00:00:00Z\"},{\"name\":\"d\",\"type\":{\"type\":\"bytes\",\"scale\":5,\"precision\":10,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"5\",\"connect.decimal.precision\":\"10\"},\"connect.default\":\"AA\u003d\u003d\",\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"},\"default\":\"AA\u003d\u003d\"}],\"connect.name\":\"test.test.joel2.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.mysql\",\"fields\":[{\"name\":\"version\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"server_id\",\"type\":\"long\"},{\"name\":\"ts_sec\",\"type\":\"long\"},{\"name\":\"gtid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"long\"},{\"name\":\"row\",\"type\":\"int\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"boolean\",\"connect.default\":false},\"null\"],\"default\":false},{\"name\":\"thread\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"db\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"table\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"query\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.mysql.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"test.test.joel2.Envelope\"}", "deleted": false }

Which was generated from:
{ "source" : { "server" : "test" }, "position" : { "file" : "mysql-bin.000299", "pos" : 479748198, "snapshot" : true }, "databaseName" : "test", "ddl" : "CREATE TABLE joel2(\n aint(11) NOT NULL,\n btext,\n ctimestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n d decimal(10,5) NOT NULL DEFAULT '0.00000',\n PRIMARY KEY (a)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8" }

Hits the error when deserializing he message through a KC sink:
{"name": "testSchemaChanges-sink2", "connector": { "state": "RUNNING", "worker_id": "172.16.230.67:8083"}, "tasks": [{ "state": "FAILED", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value\n\tat org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1467)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)\n\tat io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type BYTES: class [B for field: \"null\"\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:244)\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)\n\tat org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)\n\t... 23 more\n", "id": 0, "worker_id": "172.16.230.67:8083"}], "type": "sink"}

@gunnarmorling
Copy link
Author

gunnarmorling commented Nov 15, 2018

Hey @rhauch, is there any chance to bump up this issue in terms of priority? We've received a few reports of Debezium users about it; unfortunately there isn't really anything we can do from the connector side (apart from disabling to propagate the default values to the schema, which might be a connector option we could add). Thanks a lot!

@TheGreatAbyss
Copy link

TheGreatAbyss commented Nov 29, 2018

Hello Everyone,

We at Shutterstock were having the same issue with Debezium data and Decimal Logical Types with default Values. I created a fix for the problem and have tested and verified it in our CF4 cluster.

The issue boils down to the fact that Avro Decimal Logical types store values as a Byte Arrays. Debezium sets the Default Value as Base64 encoded Byte Arrays and record values as Big Integer Byte Arrays

The fix requires making changes in both io.confluent.connect.avro.AvroData and org.apache.kafka.connect.data.ConnectSchema to allow the Byte Class type in the validation hash maps.

I've created a JIRA ticket for this in Apache Kafka: https://issues.apache.org/jira/browse/KAFKA-7688

If I can get GitHub permissions for both this repo and in Apache Kafka I can create a branch and a PR with my changes

Thanks!

@rhauch
Copy link
Member

rhauch commented Jan 7, 2019

@TheGreatAbyss , simply fork this repository and create a PR the normal way (pushing to your fork). There are no privileges required to create a PR with a suggested change.

@TheGreatAbyss
Copy link

Hi @rhauch

Here is my PR in my own Git Repo:

TheGreatAbyss/schema-registry#1.

This combined with a similar minor change in org.apache.kafka.connect.data.ConnectSchema.java fixes the problem for Debezium.

@gunnarmorling
Copy link
Author

Hey @rhauch, as you mentioned the 5.1.1 release in #968, is there any chance to include this one as well? It also affects multiple users of Debezium.

@rhauch
Copy link
Member

rhauch commented Feb 7, 2019

@gunnarmorling, unfortunately, we're already frozen for the 5.1.1 release. I'm trying to determine whether we can still get this fix in.

When fixing #985 I did find and fix a problem when the default values were set using a ByteBuffer (rather than byte[]), but unfortunately I didn't notice this issue or add a test that used a logical type for a default.

It looks like #1014 may be the correct solution, but it's also not quite correct since it's using Avro decimal.

@rhauch
Copy link
Member

rhauch commented Feb 8, 2019

@gunnarmorling I've done more research, and have confirmed there are problems with default values for any logical type (e.g., Decimal, Time, Date, and Timestamp) and another issue with BYTES; see also #983, #695.

I also have a proposed fix (#1020) for the 5.1.x branch (trying to get it into 5.1.1 but might be too late as we are currently in freeze for that release).

@gunnarmorling
Copy link
Author

gunnarmorling commented Feb 9, 2019

Thanks for confirming, @rhauch.

If that doesn't get into 5.1.1, what would be the next release window then? If it helps to make a decision, I can confirm that this has been reported by multiple users of Debezium and unfortunately there isn't a workaround (unless using JSON of course). We've been thinking about offering an option for disabling the export of default values as a temporary measure, but it would be nice of course if this could be avoided. Thanks again for any efforts on this issue, that's much appreciated!

@rhauch
Copy link
Member

rhauch commented Feb 20, 2019

@gunnarmorling, 5.1.2 has just been released, and this fix is in that version: https://github.com/confluentinc/schema-registry/commits/v5.1.2

@rhauch rhauch closed this as completed Feb 20, 2019
@gunnarmorling
Copy link
Author

gunnarmorling commented Feb 20, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants