Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More advanced NUMERIC data conversion #101

Closed
clumsy opened this issue Jul 26, 2016 · 27 comments
Closed

More advanced NUMERIC data conversion #101

clumsy opened this issue Jul 26, 2016 · 27 comments
Labels

Comments

@clumsy
Copy link

clumsy commented Jul 26, 2016

Hi,
I was trying to use kafka-connect-jdbc to feed from an Oracle DB.
Here are a few blocker issues I was able to identify:

  1. Metadata does not always contain proper information
    For NUMERIC columns that are the result of aggregation it will say that the scale is 0 even though the result is a floating point - which will result in an Exception when trying to create a BigDecimal with scale 0 and some value with fraction. This happens because the default rounding policy for BigDecimal is ROUND_UNNECESSARY that throws the exception. There's already an issue raised for that: BigDecimal has mismatching scale value for given Decimal schema #44.
    Users should be advised to use a CAST function or an alternative to tackle such problems.
  2. Oracle does not use the following datatypes (details): BIT, TINYINY, SMALLINT, INTEGER, BIGINT
    Instead it represents them with a NUMBER(precision,scale) which according to the current DataConverter implementation maps to NUMERIC that is handled by the DECIMAL conversion resulting in BigDecimal values.
    Using BigDecimal is an overkill to store the values are known to be in the range of the datatypes listed above.
    I suggest either providing a way of specifying a custom type mapper or changing the default one to be like this:
...
case Types.NUMERIC:
        int precision = metadata.getPrecision(col);
        if (metadata.getScale(col) == 0 && precision < 20) { // integer
          Schema schema;
          if (precision > 10) {
            schema = (optional) ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA;
          } else if (precision > 5) {
            schema = (optional) ? Schema.OPTIONAL_INT32_SCHEMA : Schema.INT32_SCHEMA;
          } else if (precision > 3) {
            schema = (optional) ? Schema.OPTIONAL_INT16_SCHEMA : Schema.INT16_SCHEMA;
          } else {
            schema = (optional) ? Schema.OPTIONAL_INT8_SCHEMA : Schema.INT8_SCHEMA;
          }
          builder.field(fieldName, schema);
          break;
        }
      case Types.DECIMAL: {
        SchemaBuilder fieldBuilder = Decimal.builder(metadata.getScale(col));
        if (optional) {
          fieldBuilder.optional();
        }
        builder.field(fieldName, fieldBuilder.build());
        break;
      }
...

This will also require the following modification to the value conversion:

...
case Types.NUMERIC:
        ResultSetMetaData metadata = resultSet.getMetaData();
        int precision = metadata.getPrecision(col);
        if (metadata.getScale(col) == 0 && precision < 20) { // integer
          if (precision > 10) {
            colValue = resultSet.getLong(col);
          } else if (precision > 5) {
            colValue = resultSet.getInt(col);
          } else if (precision > 3) {
            colValue = resultSet.getShort(col);
          } else {
            colValue = resultSet.getByte(col);
          }
          break;
        }
      case Types.DECIMAL: {
        BigDecimal bigDecimalValue = resultSet.getBigDecimal(col);
        if (bigDecimalValue != null) {
          colValue = bigDecimalValue.setScale(resultSet.getMetaData().getScale(col));
        } else {
          colValue = null;
        }
        break;
      }
...

I can provide a pull-request if you agree with the proposed change.

@shikhar
Copy link
Contributor

shikhar commented Jul 28, 2016

The proposed change makes sense to me. Any thoughts @ewencp, I know you were looking at #89 before.

@clumsy does this also take care of point 1 you described - JDBC Types.NUMERIC columns that are a result of aggregation, or do you still need CAST's for that? From what you're saying it sounds like the metadata provides the wrong info so a proper fix needs to be at the JDBC driver-level?

@shikhar
Copy link
Contributor

shikhar commented Jul 28, 2016

I think there are compatibility concerns to take care of since we'd start producing INT* types instead of the logical Decimal type if we make this change. I believe this would only affect Oracle though, not sure if other databases exercise Types.NUMERIC.

If it's just Oracle, the mapping for NUMBER types (https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#i22289) seems to be problematic for lots of users, so we might want to consider a breaking change here?

@clumsy
Copy link
Author

clumsy commented Jul 29, 2016

Yes, it does cover the point 1. One has to put a CAST(<COLUMN> as NUMBER(<PRECISION>,<SCALE>)) and the metadata has the details.
As for the aggregations not sure there's a way of knowing the resulting type for sure to make any fixes in JDBC driver.
Anyway, mentioning the type explicitly can only make the result less surprising.

I would not say that the change is breaking though. A NUMERIC value does not mean it's a DECIMAL and more over that with an arbitrary precision.

I believe there are several ways we can go:

  • check the driver and only act as proposed for Oracle DB
  • make this logic pluggable/extendable
  • turned off by default and enable via a flag

But one thing I'm certain of is that I don't want my primitive columns to be stored as BigDecimals

@ewencp
Copy link
Contributor

ewencp commented Jul 29, 2016

@clumsy Yeah, this all makes sense and this is definitely a common problem folks are running into (with lots of confused "why am I just getting a bunch of bytes" questions). I agree the solution you propose makes sense -- the way it is implemented now is definitely simpler to explain (one JDBC type maps to a single Connect/Java type), but isn't great for some databases.

Re: breaking changes, this definitely is a breaking change, i.e. backwards incompatible, for anyone already getting BigDecimals and can handle that. So we definitely need this toggleable in some way. I don't like option 1 (always do it for Oracle) because it a) is still backwards incompatible and b) the approach is actually completely general and probably useful for other databases. I think option 3 (a flag) is the best. Rather than just making it pluggable and forcing the user to potentially write code, we just implement the set of options that actually make sense. The flag is purely for compatibility as the behavior you describe is probably what almost everyone wants (the only exception I can think of is if you might need to extend precision in the future, in which case there may be schema compatibility concerns even if your precision is low enough to fit into a regular integer). And while we can turn it off by default for upcoming releases, we could eventually swap the default (with sufficient warning for users) so we get the better behavior by default. I'm not a fan of adding configs and a bit worried about how many we'll end up with as we fix various issues like this, but in this case it seems warranted.

@clumsy
Copy link
Author

clumsy commented Jul 29, 2016

@ewencp Agree, makes sense. The pull request is to follow shortly.

@clumsy
Copy link
Author

clumsy commented Jul 29, 2016

The PR is done.
One remaining question is whether we should add a mapping of NUMERIC(1,0) to BOOLEAN.

@bdrouvot
Copy link

bdrouvot commented Aug 4, 2016

Hi,
I tested this approach, but as oracle stores integer as number(38)

SQL> create table bdtk3 (ID integer NOT NULL,lastname varchar(100),CID integer);

Table created.

SQL> desc bdtk3
Name Null? Type


ID NOT NULL NUMBER(38)
LASTNAME VARCHAR2(100)
CID NUMBER(38)

Then the check "if (metadata.getScale(col) == 0 && precision < 20)" returns false in case of integers and then we still hit the error: "Invalid type for incrementing column: BYTES"
Bertrand

@shikhar
Copy link
Contributor

shikhar commented Aug 4, 2016

@bdrouvot Thank you for trying this out. If the column has the default precision of 38, we can't safely treat that as an int even with the patch. You will have to explicitly create it with a precision that we can map to the int type. e.g.
create table bdtk3 (ID number(9,0) NOT NULL,lastname varchar(100),CID number(9,0));.
Or alternately, you would have to use a custom query and CAST the column, and that should propagate through (basing the casting approach on comments from @clumsy).

@clumsy
Copy link
Author

clumsy commented Aug 24, 2016

@shikhar on the second thought, what if we substitute the casting logic with additional configuration parameter where we a user can specify the desired type himself to match his expectations.
While casting can help somewhat it doesn't seem to come free, plus the precision mapping is not 1-to-1 for Java...
E.g. query=select column1, column2 from table and schema=column1:int8,column2:string

The idea is one can supply a list of column and type to enforce in result schema. Whenever the type is explicitly specified the connector will try to convert to expected value by mirroring the current logic but from the over way around.

The value resolution in this case will be something like:
Type.BOOLEAN => resultSet.getBoolean(col)
Type.INT8 => resultSet.getByte(col)
Type.INT16 => resultSet.getShort(col)
Type.INT32 =>

switch (metadata.getColumnType(col)) {
          case Types.DATE: {
            java.sql.Date date = resultSet.getDate(col, UTC_CALENDAR);
            if (date != null) {
              return Date.fromLogical(Date.SCHEMA, date);
            }
            return null;
          }

          case Types.TIME: {
            java.sql.Time time = resultSet.getTime(col, UTC_CALENDAR);
            if (time != null) {
              return Time.fromLogical(Time.SCHEMA, time);
            }
            return null;
          }

          default:
            return resultSet.getInt(col);
        }

Type.INT64 =>

switch (metadata.getColumnType(col)) {
          case Types.TIMESTAMP: {
            java.sql.Timestamp timestamp = resultSet.getTimestamp(col, UTC_CALENDAR);
            if (timestamp != null) {
              return Timestamp.fromLogical(Timestamp.SCHEMA, timestamp);
            }
            return null;
          }

          default:
            return resultSet.getLong(col);
        }

TypeFLOAT16 => resultSet.getFloat(col)
Type.FLOAT32 => resultSet.getDouble(col)
Type.STRING =>

switch (metadata.getColumnType(col)) {
          case Types.SQLXML: {
            SQLXML sqlxml = resultSet.getSQLXML(col);
            if (sqlxml != null) {
              return sqlxml.toString();
            }
            return null;
          }

          case Types.DATALINK: {
            URL url = resultSet.getURL(col);
            if (url != null) {
              return url.toString();
            }
            return null;
          }

          case Types.NCHAR:
          case Types.NVARCHAR:
          case Types.LONGNVARCHAR:
            return resultSet.getNString(col);

          case Types.CLOB:
            return getString(resultSet.getClob(col));

          case Types.NCLOB:
            return getString(resultSet.getNClob(col));

          default:
            return resultSet.getString(col);
        }

Type.BYTES =>

switch (metadata.getColumnType(col)) {
          case Types.BLOB: {
            Blob blob = resultSet.getBlob(col);
            if (blob == null) {
              return null;
            } else {
              if (blob.length() > Integer.MAX_VALUE) {
                throw new IOException("Can't process BLOBs longer than Integer.MAX_VALUE");
              }
              byte[] value = blob.getBytes(1, (int) blob.length());
              blob.free();
              return value;
            }
          }

          case Types.DECIMAL: {
            BigDecimal bigDecimalValue = resultSet.getBigDecimal(col);
            if (bigDecimalValue != null) {
              int scale = metadata.getScale(col);
              return Decimal.fromLogical(Decimal.schema(scale), bigDecimalValue.setScale(scale)); // TODO: cache schema
            }
            return null;
          }

          default:
            return resultSet.getBytes(col);
        }
...
private static String getString(Clob clob) throws IOException, SQLException {
    if (clob == null) {
      return null;
    }
    if (clob.length() > Integer.MAX_VALUE) {
      throw new IOException("Can't process BLOBs longer than Integer.MAX_VALUE");
    }
    String value = clob.getSubString(1, (int) clob.length());
    clob.free();
    return value;
  }

@shikhar
Copy link
Contributor

shikhar commented Aug 24, 2016

@clumsy I agree that the automatic precision mapping as in #104 is messy and may not do what the user actually wants.

There are many use-cases where it's tempting to add advanced configuration like you are describing. Instead of doing it on a per-connector basis, we would like to add framework-level support for reusable, configurable transformations. Then you can imagine configuring something like

transformer.typemarshal=org.apache.kafka.connect.transforms.TypeMarshaller
transformer.typemarshal.spec=column1:int8,column2:string

@clumsy
Copy link
Author

clumsy commented Aug 24, 2016

@shikhar This is exactly what I was thinking for (please check my previous updated post).
I've already tested a PoC using a simple Map<String, Type> with the proposed logic for JDBC connector - works just fine.
I can contribute a patch but cannot find this TypeMarshaller class.
Is it already available or is it just a proposition?

P.S.
Canceled #104

@shikhar
Copy link
Contributor

shikhar commented Aug 24, 2016

@clumsy that was just a proposition, we'd need some framework-level support for transformations and a specific transformer implementation like the 'TypeMarshaller'. It would only be able to kick in after the JDBC source connector has handed over records to the framework, and so wouldn't be able to rely on the typed JDBC ResultSet.getDesiredType() methods. That tradeoff aside, I think we should wait on transformation support in Connect rather than adding a feature to explicitly specify types in this connector as you're proposing.

If you need this today, another solution is to use a custom Converter implementation that delegates to whatever base Converter you are currently using.

@stewartbryson
Copy link

@clumsy Is your enhancement available?

@clumsy
Copy link
Author

clumsy commented Sep 7, 2016

@stewartbryson only as a PoC I use locally. I can share a gist.

@salemn
Copy link

salemn commented Sep 8, 2016

hi @clumsy, thanks for bringing this issue, and the fix related to it.
I'm interested in such fix if you already have it.

My case is a bit different but still interesting to see how you handled this: i'm using connect to synch two postgres database, and all my numeric data are converted to bytea type which unreadable.

@nylund
Copy link

nylund commented Sep 22, 2016

@clumsy I am also interested in this. Can you share the gist or do you have a PR already I could try out?

@clumsy
Copy link
Author

clumsy commented Sep 23, 2016

Ok, due to multiple inquiries I share my PoC that I used in my project: clumsy/kafka-connect-jdbc@754c79fc517f8bfe75b7b07e670b5d1a64505dbf

What this PoC does:

  • uses the provided key column for the SourceRecord and thus Kafka message key (or uses primary key info, key can only consist of 1 column)
  • uses explicitly specified schema to construct SourceRecord (it also serves as a projection): e.g. schema=IdColumn:int,TextColumn:string

Please note:

  • This PoC is not of PR-qualitiy
  • I had to merge some recent changes from master and might have missed something, but you should get the idea

I do believe that this is the only sane solution for the JDBC data sources, there are just to many implementations out there and unless we tell them what we need - we will always be surprised.
Please let me know if you face any problem with this PoC.

Hope this helps,
Cheers!

@nylund
Copy link

nylund commented Oct 7, 2016

A note to other who might run into this issue. A NUMBER column with undefined precision in Oracle DB is reported by the driver as having scale=-127. When adding a work around for this I got most of our use cases working.

@rmunoz10
Copy link

Hi @clumsy Thank you for looking into this. I still have the problem reported in #33. Any plans to make this part of the jdbc connector product? I can't use the connector against our Oracle DBs otherwise.

Thanks

@johnboy14
Copy link

When's the eta for this getting resolved? I still have this problem.

@nik0xFF
Copy link

nik0xFF commented Mar 24, 2017

Hello, I'm encountering the issue described in #33 , the toLogical conversion seems to fail.
Thanks

@boristyukin
Copy link

@nylund have you resolved this? have the same issue with type NUMBER (no precision)

@niknyl
Copy link

niknyl commented Nov 7, 2017

@boristyukin I worked around this specific issue in a local branch but eventually gave up on kafka-connect with Oracle as we faced a bunch of other issues.

@boristyukin
Copy link

@niknyl thanks man I ended up not using Connect for Oracle and used StreamSets as I needed to create a quick demo. This is unfortunately as Oracle is still a king in database world. I wish I could fix this myself but I have not touched Java in years :) Sqoop had similar issues with our source system but at least there was a custom mapping. I did try Kafka single message transform to convert the data types but this is another github issue with that one as it does not support number type / byte conversion right now as well. Looks like a dead end

@praveengone
Copy link

@ewencp, @clumsy the solution proposed above is it part of jdbc connector code. Also when is the plan to keep it

@wicknicks
Copy link
Member

The JDBC connector now supports a numeric.mapping configuration option, that should help resolve some of the issues described here. If that is not the case, please re-open this ticket, or create a new issue.

@wicknicks wicknicks added the bug label Nov 9, 2018
@luisfilipe46
Copy link

luisfilipe46 commented Nov 29, 2018

Hi.
Recently I setup a JDBC source connector to read data from Microsoft SQL Server database.
The table which I'm reading from has some columns with datatype decimal(18, 2) and decimal(18, 0).
When Kafka is reading from this table, Kafka puts these columns with bytes datatype, which are not readable and it cannot write to elasticsearch (using Elastic search sink connector) in a readable and searchable way.

I tried to use numeric.mapping with best_fit value, but the situation remains the same.
I tried to use SMT transformations in order to cast bytes to int64 or float64 when I'm writing to Elasticsearch, but I'm getting the following exception:

Click to toggle contents of exception org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Unexpected type in Cast transformation: BYTES at org.apache.kafka.connect.transforms.Cast.convertFieldType(Cast.java:206) at org.apache.kafka.connect.transforms.Cast.getOrBuildSchema(Cast.java:168) at org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:137) at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:107) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 14 more

For my use case, I can use numeric.mapping or cast column values to a int or float. Unfortunately, I'm not able to do it. Both approaches are not working.

Can this be a situation that just occurs in Microsoft SQL Server? Or is this happening in Oracle Numeric/Decimal datatype too?

Any help will be appreciated at this point.
I can post more details later if needed/requested.
I'm using confluent platform open source version 5.0.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests