Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for unsigned tinyint values #152

Closed
wants to merge 1 commit into from

Conversation

vamossagar12
Copy link

Based upon the theory provided in the official jdbc documentation, I have added a condition to include both Short and Byte as INT8 types in ConnectSchema.

Here's the doc:

8.3.5 SMALLINT

The JDBC type SMALLINT represents a 16-bit signed integer value between -32768 and 32767.

The corresponding SQL type, SMALLINT, is defined in SQL-92 and is supported by all the major databases. The SQL-92 standard leaves the precision of SMALLINT up to the implementation, but in practice, all the major databases support at least 16 bits.

The recommended Java mapping for the JDBC SMALLINT type is as a Java short.
This would allow kafka-connect-jdbc to allow both signed and unsigned values. Currently it fails for unsigned values(i.e values > 127). In this PR, if the value of tinyint value received from DB is greater than 127, then it converts to a short otherwise byte. This has been taken care of in the kafka project and schema-registry project.

@ConfluentJenkins
Copy link
Collaborator

Can one of the admins verify this patch?

@ghost
Copy link

ghost commented Oct 19, 2016

It looks like @vamossagar12 hasn't signed our Contributor License Agreement, yet.

Appreciation of efforts,

clabot

@vamossagar12
Copy link
Author

I can't see the Contributor License Agreement link.

@ewencp
Copy link
Contributor

ewencp commented Oct 19, 2016

Sorry about that, you can sign it at http://clabot.confluent.io/cla

@vamossagar12
Copy link
Author

I have signed twice.. I says Thanks for signing.. Would that be sufficient?

@ewencp
Copy link
Contributor

ewencp commented Oct 19, 2016

Yes, that's fine

@vamossagar12
Copy link
Author

Hi, Any updates on this?

@shikhar
Copy link
Contributor

shikhar commented Oct 25, 2016

The information you provided in the description is for SMALLINT, for TINYINT https://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html notes

The recommended Java mapping for the JDBC TINYINT type is as either a Java byte or a Java short. The 8-bit Java byte type represents a signed value from -128 to 127, so it may not always be appropriate for larger TINYINT values, whereas the 16-bit Java short will always be able to hold all TINYINT values.

While a case can be made for using Schema.Type.INT16 (Java short) rather than Schema.Type.INT8 (Java int), it's not because the data won't fit in a byte rather Java does not provide unsigned bytes so you may need to x & 0xFF.

Currently it fails for unsigned values(i.e values > 127).

What JDBC driver is that with? I'd expect ResultSet.getByte() to simply return a 'negative' byte in that case.

@shikhar
Copy link
Contributor

shikhar commented Oct 26, 2016

Linking #98

@vamossagar12
Copy link
Author

@shikhar Ah ok That was an oversight. Sorry about that. But yeah i wanted to post about TINYINT.

Basically what I have done is check if the value is greater than BYTE.MAX. if it isn't then just pass along a byte otherwise, pass a short.

I have cpoied in mysql-connector-java-5.1.21.jar in the classpath.

Also, without this change, I get the following exception:

com.mysql.jdbc.exceptions.jdbc4.MySQLDataException: '220' in column '20' is outside valid range for the datatype TINYINT.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:925)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:896)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:885)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:860)
at com.mysql.jdbc.ResultSetImpl.throwRangeException(ResultSetImpl.java:7086)
at com.mysql.jdbc.ResultSetImpl.getNativeByte(ResultSetImpl.java:3090)
at com.mysql.jdbc.ResultSetImpl.getNativeByte(ResultSetImpl.java:3048)
at com.mysql.jdbc.ResultSetImpl.getByte(ResultSetImpl.java:1666)
at io.confluent.connect.jdbc.DataConverter.convertFieldValue(DataConverter.java:287)
at io.confluent.connect.jdbc.DataConverter.convertRecord(DataConverter.java:68)
at io.confluent.connect.jdbc.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:179)
at io.confluent.connect.jdbc.JdbcSourceTask.poll(JdbcSourceTask.java:217)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Which is basically ResultSet.getByte().

What I have tried to do is to broaden the net for 8 byte values to ensure that a Short goes only if the value is greater than BYTE.MAX otherwise only bytes go through.

Since this has dependency across ConnectSchema and SchemaRegistry, I have made changes and raised separate PRs for those as well. Here's the same for your reference:

https://github.com/apache/kafka/pull/2044-> raised to apache kafka. The jenkins build failed here and I have been told that these are being worked upon correctly.

https://github.com/confluentinc/schema-registry/pull/432--> raised against schema regisrty. IT does the opposite of the above.

Please let me know in case of any issues/concerns.

@vamossagar12
Copy link
Author

hey @shikhar did you get a chance to look at this?

@shikhar
Copy link
Contributor

shikhar commented Nov 1, 2016

I think we should change the code to (byte) rs.getShort(), that should prevent the error from the MySQL JDBC driver. A change to Schema.Type.SHORT seems unnecessary.

We are thinking along the lines of supporting lightweight transformations in Kafka Connect, and certain integer widening operations seem seem reasonable to me. So one could x & 0xFF for byte->short as a transformation.

@vamossagar12
Copy link
Author

vamossagar12 commented Nov 2, 2016

@shikhar So I commented out the code that I had in this PR and this what you suggested:

 // 8 bits int
case Types.TINYINT: {
        /*if (resultSet.getShort(col) > Byte.MAX_VALUE)
          colValue = resultSet.getShort(col);
        else
          colValue = resultSet.getByte(col);
        break;*/
        colValue = ((byte) resultSet.getShort(col)) & 0xFF;
        break;
      }

As you said, the jdbc error got resolved but when I ran the code again on a newly downloaded confluent package(with none of the other changes that I had made in kafka and schema-registry projects) I got the following exception:

org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT8: class java.lang.Integer
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:231)
at org.apache.kafka.connect.data.Struct.put(Struct.java:215)
at org.apache.kafka.connect.data.Struct.put(Struct.java:204)
at io.confluent.connect.jdbc.source.DataConverter.convertFieldValue(DataConverter.java:439)
at io.confluent.connect.jdbc.source.DataConverter.convertRecord(DataConverter.java:73)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:184)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:212)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

This error is at the main kafka project level. Inside the ConnectSchema. While converting it does a check that whether the type of the object passed and schema type match up or not. The

x & 0xFF converts the value into an int(ran it locally to confirm) while the schema object passed it still INT8. The permissible value for INT8 is only Byte.class

SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class)) and hence this fails.

On a side note, I looked at the way Debezium handles tinyint values, and then seem to handle both signed and unsigned values using just a Short value.

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java#L89

So, this could be another option to consider(as you mentioned in one of your previous comments).

@vamossagar12
Copy link
Author

@shikhar Gwen mentioned in one of the tickets in the confluent google groups that there's a plan to add single-message transforms to the connect layer to resolve the kinds of issues we discussed in this PR.

So, whatever is being done in this PR wouldn't be needed anymore I guess? Is there anything that I/we can start looking at or maybe we can even contribute?

@shikhar
Copy link
Contributor

shikhar commented Nov 16, 2016

I think the solution @andybryant proposed in #165 seems reasonable, since even with single-message-transforms the eventual state is e.g. for an unsigned TINYINT to map to INT16 rather than INT8, or unsigned INTEGER to INT64 rather than INT32. I didn't know that signed/unsigned can be detected from column metadata.

@shikhar shikhar closed this Nov 16, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants