Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPE in JDBC Source using query #560

Open
rmoff opened this issue Jan 4, 2019 · 2 comments

Comments

@rmoff
Copy link

@rmoff rmoff commented Jan 4, 2019

Source object

mysql> describe transactions;
+---------------+--------------+------+-----+---------+-------+
| Field         | Type         | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| txn_id        | int(11)      | YES  |     | NULL    |       |
| customer_id   | int(11)      | YES  |     | NULL    |       |
| amount        | decimal(5,2) | YES  |     | NULL    |       |
| currency      | varchar(50)  | YES  |     | NULL    |       |
| txn_timestamp | varchar(50)  | YES  |     | NULL    |       |
+---------------+--------------+------+-----+---------+-------+
5 rows in set (0.01 sec)

Config:

[2019-01-04 17:09:40,225] INFO JdbcSourceTaskConfig values:
 batch.max.rows = 100
 catalog.pattern = null
 connection.attempts = 3
 connection.backoff.ms = 10000
 connection.password = [hidden]
 connection.url = jdbc:mysql://mysql:3306/demo
 connection.user = connect_user
 dialect.name =
 incrementing.column.name = txn_id
 mode = incrementing
 numeric.mapping = null
 numeric.precision.mapping = false
 poll.interval.ms = 5000
 query = SELECT customer_id, amount, currency, txn_timestamp FROM demo.transactions
 schema.pattern = null
 table.blacklist = []
 table.poll.interval.ms = 60000
 table.types = [TABLE]
 table.whitelist = []
 tables = []
 timestamp.column.name = []
 timestamp.delay.interval.ms = 0
 topic.prefix = mysql-xx
 validate.non.null = false
 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)

Error:

[2019-01-04 17:09:40,625] ERROR WorkerSourceTask{id=jdbc_source_mysql_xx-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
 at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractOffsetIncrementedId(TimestampIncrementingCriteria.java:231)
 at io.confluent.connect.jdbc.source.TimestampIncrementingCriteria.extractValues(TimestampIncrementingCriteria.java:189)
 at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:185)
 at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:309)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Jan 4, 2019

This was caused by omitting the incrementing.column.name (txn_id) from the SELECT list. With this added back in, the connector worked:

[2019-01-04 17:12:26,496] INFO JdbcSourceTaskConfig values:
 batch.max.rows = 100
 catalog.pattern = null
 connection.attempts = 3
 connection.backoff.ms = 10000
 connection.password = [hidden]
 connection.url = jdbc:mysql://mysql:3306/demo
 connection.user = connect_user
 dialect.name =
 incrementing.column.name = txn_id
 mode = incrementing
 numeric.mapping = null
 numeric.precision.mapping = false
 poll.interval.ms = 5000
 query = SELECT txn_id, customer_id, amount, currency, txn_timestamp FROM demo.transactions
 schema.pattern = null
 table.blacklist = []
 table.poll.interval.ms = 60000
 table.types = [TABLE]
 table.whitelist = []
 tables = []
 timestamp.column.name = []
 timestamp.delay.interval.ms = 0
 topic.prefix = mysql-xx
 validate.non.null = false
 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Jan 4, 2019

Related: #561

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant
You can’t perform that action at this time.