Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

numeric.mapping doesn't work for DECIMAL fields #563

Open
rmoff opened this issue Jan 7, 2019 · 11 comments

Comments

@rmoff
Copy link

@rmoff rmoff commented Jan 7, 2019

I can't get numeric.mapping to work with MySQL and Confluent Platform 5.1. Steps to reproduce below.

Create MySQL table:

use demo;

create table transactions (
	txn_id INT,
	customer_id INT,
	amount DECIMAL(5,2),
	currency VARCHAR(50),
	txn_timestamp VARCHAR(50)
);

insert into transactions (txn_id, customer_id, amount, currency, txn_timestamp) values (3, 2, 17.13, 'EUR', '2018-04-30T21:30:39Z');

Inspect table:

mysql> describe transactions;
+---------------+--------------+------+-----+---------+-------+
| Field         | Type         | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| txn_id        | int(11)      | YES  |     | NULL    |       |
| customer_id   | int(11)      | YES  |     | NULL    |       |
| amount        | decimal(5,2) | YES  |     | NULL    |       |
| currency      | varchar(50)  | YES  |     | NULL    |       |
| txn_timestamp | varchar(50)  | YES  |     | NULL    |       |
+---------------+--------------+------+-----+---------+-------+
5 rows in set (0.00 sec)

Create connector

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
          "name": "jdbc_source_mysql_12a",
          "config": {
                  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:mysql://mysql:3306/demo",
                  "connection.user": "connect_user",
                  "connection.password": "asgard",
                  "topic.prefix": "mysql-12a-",
                  "numeric.mapping": "best_fit",
                  "table.whitelist" : "demo.transactions",
                  "mode":"bulk",
                  "poll.interval.ms" : 3600000
                  }
          }'

Even though "numeric.mapping": "best_fit", Kafka Connect stores the DECIMAL(5,2) as a Decimal, serialised to bytes in Avro:

$ curl -s "http://localhost:8081/subjects/mysql-12a-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")'
{
  "name": "amount",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

Connect Worker log excerpt:


INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
…
INFO JdbcSourceTaskConfig values:
 batch.max.rows = 100
 catalog.pattern = null
 connection.attempts = 3
 connection.backoff.ms = 10000
 connection.password = [hidden]
 connection.url = jdbc:mysql://mysql:3306/demo
 connection.user = connect_user
 dialect.name =
 incrementing.column.name =
 mode = bulk
 numeric.mapping = best_fit
 numeric.precision.mapping = false
 poll.interval.ms = 3600000
 query =
 schema.pattern = null
 table.blacklist = []
 table.poll.interval.ms = 60000
 table.types = [TABLE]
 table.whitelist = [demo.transactions]
 tables = [`demo`.`transactions`]
 timestamp.column.name = []
 timestamp.delay.interval.ms = 0
 topic.prefix = mysql-12a-
 validate.non.null = true
 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
…
DEBUG Checking for next block of results from BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
DEBUG BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} prepared SQL query: SELECT * FROM `demo`.`transactions` (io.confluent.connect.jdbc.source.BulkTableQuerier)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG Returning 100 records for BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
…
kafka-connect_1_8eb73e80dda1 | [2019-01-07 13:37:50,920] DEBUG Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"transactions\",\"fields\":[{\"name\":\"txn_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txn_timestamp\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"transactions\"}"} to http://schema-registry:8081/subjects/mysql-12a-transactions-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)

I've tried this with three different settings, each still results in the amount field serialised to bytes in Avro:

  • "numeric.mapping": "best_fit"
  • "numeric.mapping": "precision_only"
  • "numeric.precision.mapping": true

Per docs I am expecting to see the decimal(5,2) serialised to Avro FLOAT64(I think - but at least, not bytes)

@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Jan 7, 2019

does numeric.mapping only apply to NUMERIC types, not DECIMAL?

case Types.NUMERIC:
if (mapNumerics == NumericMapping.PRECISION_ONLY) {
int precision = defn.precision();
int scale = defn.scale();
log.trace("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
if (scale == 0 && precision < 19) { // integer
if (precision > 9) {
return rs -> rs.getLong(col);
} else if (precision > 4) {
return rs -> rs.getInt(col);
} else if (precision > 2) {
return rs -> rs.getShort(col);
} else {
return rs -> rs.getByte(col);
}
}
} else if (mapNumerics == NumericMapping.BEST_FIT) {
int precision = defn.precision();
int scale = defn.scale();
log.trace("NUMERIC with precision: '{}' and scale: '{}'", precision, scale);
if (precision < 19) { // fits in primitive data types.
if (scale < 1 && scale >= NUMERIC_TYPE_SCALE_LOW) { // integer
if (precision > 9) {
return rs -> rs.getLong(col);
} else if (precision > 4) {
return rs -> rs.getInt(col);
} else if (precision > 2) {
return rs -> rs.getShort(col);
} else {
return rs -> rs.getByte(col);
}
} else if (scale > 0) { // floating point - use double in all cases
return rs -> rs.getDouble(col);
}
}
}
// fallthrough
case Types.DECIMAL: {
final int precision = defn.precision();
log.debug("DECIMAL with precision: '{}' and scale: '{}'", precision, defn.scale());
final int scale = decimalScale(defn);
return rs -> rs.getBigDecimal(col, scale);
}

@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Jan 7, 2019

https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html

In MySQL, NUMERIC is implemented as DECIMAL

So the following DDL:

CREATE TABLE NUM_TEST (
	TXN_ID INT,
	CUSTOMER_ID INT,
	AMOUNT_01 DECIMAL(5,2),
	AMOUNT_02 NUMERIC(5,2), 
	AMOUNT_03 DECIMAL(5),
	AMOUNT_04 DECIMAL
);

Creates a table like this - note that AMOUNT_02 whilst declared as NUMERIC is created as a DECIMAL:

mysql> DESCRIBE NUM_TEST;
+-------------+---------------+------+-----+---------+-------+
| Field       | Type          | Null | Key | Default | Extra |
+-------------+---------------+------+-----+---------+-------+
| TXN_ID      | int(11)       | YES  |     | NULL    |       |
| CUSTOMER_ID | int(11)       | YES  |     | NULL    |       |
| AMOUNT_01   | decimal(5,2)  | YES  |     | NULL    |       |
| AMOUNT_02   | decimal(5,2)  | YES  |     | NULL    |       |
| AMOUNT_03   | decimal(5,0)  | YES  |     | NULL    |       |
| AMOUNT_04   | decimal(10,0) | YES  |     | NULL    |       |
+-------------+---------------+------+-----+---------+-------+
6 rows in set (0.01 sec)

(MySQL Server version: 8.0.13)

@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Jan 7, 2019

Contrast to Postgres:

CREATE TABLE NUM_TEST (
	TXN_ID INT,
	CUSTOMER_ID INT,
	AMOUNT_01 DECIMAL(5,2),
	AMOUNT_02 NUMERIC(5,2), 
	AMOUNT_03 DECIMAL(5),
	AMOUNT_04 DECIMAL
);

All columns are stored as NUMERIC:

demo=# \d num_test
                   Table "public.num_test"
   Column    |     Type     | Collation | Nullable | Default
-------------+--------------+-----------+----------+---------
 txn_id      | integer      |           |          |
 customer_id | integer      |           |          |
 amount_01   | numeric(5,2) |           |          |
 amount_02   | numeric(5,2) |           |          |
 amount_03   | numeric(5,0) |           |          |
 amount_04   | numeric      |           |          |
col1 col2 col3 col4
Postgres column definition DECIMAL(5,2) NUMERIC(5,2) DECIMAL(5) DECIMAL
Source data in Postgres 100.01 100.02 100 100
numeric.mapping = none (same as leaving it unset) Bytes
'\u0011
Bytes
Øî
Bytes
d
Bytes
d
numeric.mapping = best_fit Double
100.01
Double
100.02
Int
100
Int
100
numeric.mapping = precision_only Bytes
'\u0011
Bytes
Øî
Int
100
Int
100

(Postgres 11.1)

Postgres notes: https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9

@rmoff rmoff changed the title numeric.mapping doesn't appear to work with Confluent Platform 5.1 and MySQL numeric.mapping not supported for MySQL Jan 7, 2019
@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Jan 8, 2019

MS SQL notes : https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#testing-numericmapping-in-ms-sql-server-2017

col1 col2 col3 col4
MSSQL column definition DECIMAL(5,2) NUMERIC(5,2) DECIMAL(5) DECIMAL
MSSQL created column decimal
length 5
precision 5
scale 2
numeric
length 5
precision 5
scale 2
decimal
length 5
precision 5
scale 0
decimal
length 9
precision 18
scale 0
Source data in MSSQL 100.01 100.02 100 100
numeric.mapping = none (same as leaving it unset) Bytes
'\u0011
Bytes
Øî
Bytes
d
Bytes
d
numeric.mapping = best_fit Bytes
'\u0011
Double
100.02
Bytes
d
Bytes
d
numeric.mapping = best_fit
(query used to CAST all DECIMAL fields to NUMERIC)
Double
100.01
Double
100.02
Int
100
Int
100
numeric.mapping = precision_only Bytes
'\u0011
Bytes
Øî
Int
100
Int
100

The same problem exists with DECIMAL fields being ignored. Since MS SQL accepts both DECIMAL and NUMERIC as native data types, use NUMERIC for Kafka Connect to correctly ingest the values when using numeric.precision=best_fit. If changing the source schema isn't an option then you can use query mode, demonstrated here.

@rmoff rmoff changed the title numeric.mapping not supported for MySQL numeric.mapping not supported for MySQL / doesn't work for DECIMAL fields Jan 8, 2019
@rmoff rmoff changed the title numeric.mapping not supported for MySQL / doesn't work for DECIMAL fields numeric.mapping doesn't work for DECIMAL fields Jan 8, 2019
@rmoff rmoff added the mssql label Jan 8, 2019
@anssip

This comment has been minimized.

Copy link

@anssip anssip commented Feb 6, 2019

I am facing a problem with MySql and decimal data types. The values end up as corrupt strings in the Kafka topic. Without using schemas the values look like this when listing with console-consumer:

"revenue":"AfQ="

I tried if registering an Avro schema would help. I made the type of this revenue field to be float in the schema and created a JDBC source connector to fill the topic. But this connector fails with following

org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: 
...
{\"name\":\"revenue\",\"type\":{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}}
...

Seems like it tries to register a new schema that is incompatible with my previously created schema. It tries to use type bytes for this revenue field (and for other decimal fields).

My table in MySQL looks like this:

mysql> describe v_game_transaction;
+-------------------+---------------+------+-----+---------------------+-------+
| Field             | Type          | Null | Key | Default             | Extra |
+-------------------+---------------+------+-----+---------------------+-------+
| id                | bigint(20)    | NO   |     | 0                   |       |
| revenue           | decimal(10,2) | NO   |     | NULL                |       |
...

Is there some way to work around this issue now?

@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Feb 6, 2019

DECIMAL isn't supported for numeric.mapping. There isn't a way to work around this that I'm aware of. The data isn't "corrupt", it's just a BigDecimal.

For more details see https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector#bytes-decimals-numerics

@anssip

This comment has been minimized.

Copy link

@anssip anssip commented Feb 7, 2019

I tried to work around this issue by using a SMT cast. I changed the type of the column to varchar in the DB view i'm using here, and then casting it with

      "transforms": "Cast",
      "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
      "transforms.Cast.spec": "revenue:float64"

But now the connector fails with: [{"state":"FAILED","trace":"org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.sql.Timestamp for field: \"started\

Adding this cast transform causes it to fail with datetime fields! Found this issue relating to cast transforms

My second attempt was to fix the the connector code:

I changed that line to include both DECIMAL and NUMERIC

      case Types.DECIMAL:
      case Types.NUMERIC: {
        if (mapNumerics == NumericMapping.PRECISION_ONLY) {
.....

Using this hacked-up connector it produces a bit different kind of bytes data in the topic. But seems like that is not the correct way to fix the issue :-)

@aliasbadwolf

This comment has been minimized.

Copy link

@aliasbadwolf aliasbadwolf commented Mar 6, 2019

@anssip Here is a small code snippet that can help you in getting the data back in correct form (written in scala; you can change it to Java if you want). Essentially, it is not corrupt data it is just base64 encoded string of "unscaled" value of BigDecimal. Kafka Connect converts NUMERIC type having precision and scale to BigDecimal internally (and timestamps to long/epoch) when using AVRO (since its essentially a JSON.

As you already have schema available with you just get the precision and scale from there and pass it along while recreating a BigDecimal back and once you have the final BigDecimal you can get longValue or intValue or doubleValue from it.

Hope it helps !

//a bigdecimal with precision 4 and scale 3
    val bd = new BigDecimal("1.234")
    println(bd)
    println(bd.precision()) //prints 4
    println(bd.scale) //prints 3
    val encoded = Base64.getEncoder.encodeToString(bd.unscaledValue().toByteArray())
    println(encoded) // prints "BNI="
    
    val decoded = Base64.getDecoder.decode(encoded)
    val bi = new BigInteger(decoded)
    println(bi) //prints 1234
    val bd2 = new BigDecimal(bi, bd.scale)
    println(bd2) //prints 1.234
@anssip

This comment has been minimized.

Copy link

@anssip anssip commented Mar 18, 2019

Thanks, @aliasbadwolf for that tip. I am actually able to convert it to a valid number. I am now doing it with JavaScript as I'm doing the stream processing in Node.js

But my goal here was to streamline my data pipeline and not use any stream processing at all. I'd like to stream the data directly to Elasticsearch without doing any processing (and number conversion). Just one JDBC source connector pushing the data into a topic and from there one sink to push it to Elasticsearch.

@kinghuang

This comment has been minimized.

Copy link

@kinghuang kinghuang commented May 17, 2019

Is there anything being done about this issue? I'm working with a large existing Oracle database, where the primary keys are all declared as NUMBER (no precision/scale). As documented by @rmoff in https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#file-oracle-md, these end up as bytes no matter what numeric.mapping is set to.

While specifying a custom query is a workaround, it's an absolute pain since there are a few hundred tables. SMTs do seem to work in limited testing.

@syedabdulkather

This comment has been minimized.

Copy link

@syedabdulkather syedabdulkather commented Oct 16, 2019

Hi Guys,

Is there any workaround on the above issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.