Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query with WHERE clause cannot be used with incrementing/timestamp mode #566

Open
rmoff opened this issue Jan 8, 2019 · 1 comment

Comments

@rmoff
Copy link

commented Jan 8, 2019

docker-compose.yml for env setup is here

Normal query, no predicate:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_01",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-01",
                "mode":"incrementing",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id",
                "incrementing.column.name": "txn_id",
                "validate.non.null": false
                }
        }'

Works as expected.

Now we want to include a predicate at the DB, e.g. on currency column. Valid postgres query:

postgres=> SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR';
 txn_id | customer_id | amount | currency |    txn_timestamp     | first_name | last_name |           email            | gender |                       comments
--------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------
      3 |           2 |  17.13 | EUR      | 2018-04-30T21:30:39Z | Auberon    | Sulland   | asulland1@slideshare.net   | Male   | Organized context-sensitive Graphical User Interface
     12 |           4 | -92.57 | EUR      | 2018-03-11T07:33:19Z | Nolana     | Yeeles    | nyeeles3@drupal.org        | Female | Adaptive real-time archive

Create connector with WHERE clause, but no variable placeholders:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_02",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-02",
                "mode":"incrementing",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency=\'EUR\'",
                "incrementing.column.name": "txn_id",
                "validate.non.null": false
                }
        }'

Fails because the query generated is invalid - two WHERE clauses:

SELECT          t.txn_id, 
                t.customer_id, 
                t.amount, 
                t.currency, 
                t.txn_timestamp, 
                c.first_name, 
                c.last_name, 
                c.email, 
                c.gender, 
                c.comments 
FROM            demo.transactions t 
LEFT OUTER JOIN demo.customers c 
ON              t.customer_id = c.id 
WHERE           t.currency='EUR' 
where           "txn_id" > ? 
ORDER BY        "txn_id" ASC
 [2019-01-08 16:34:06,979] DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR'', topicPrefix='postgres-02', incrementingColumn='txn_id', timestampColumns=[]} prepared SQL query: SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' WHERE "txn_id" > ? ORDER BY "txn_id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
 [2019-01-08 16:34:06,995] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR'', topicPrefix='postgres-02', incrementingColumn='txn_id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
 org.postgresql.util.PSQLException: ERROR: syntax error at or near "WHERE"
   Position: 234
  at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2182)
  at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1911)
  at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:173)

Build variable placeholders into query:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_03",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-03",
                "mode":"incrementing",
                "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency=\'EUR\' AND t.txn_id > ? ORDER BY t.txn_id ASC",
                "incrementing.column.name": "t.txn_id",
                "validate.non.null": false
                }
        }'

Fails because connector still appends its incrementing clause

SELECT          t.txn_id, 
                t.customer_id, 
                t.amount, 
                t.currency, 
                t.txn_timestamp, 
                c.first_name, 
                c.last_name, 
                c.email, 
                c.gender, 
                c.comments 
FROM            demo.transactions t 
LEFT OUTER JOIN demo.customers c 
ON              t.customer_id = c.id 
WHERE           t.currency='EUR' 
AND             t.txn_id > ? 
ORDER BY        t.txn_id ASC 
where           "t.txn_id" > ? 
ORDER BY        "t.txn_id" ASC
[2019-01-08 16:42:54,794] DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC', topicPrefix='postgres-03', incrementingColumn='t.txn_id', timestampColumns=[]} prepared SQL query: SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC WHERE "t.txn_id" > ? ORDER BY "t.txn_id" ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
[2019-01-08 16:42:54,794] DEBUG Executing prepared statement with incrementing value = -1 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
[2019-01-08 16:42:54,794] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR' AND t.txn_id > ? ORDER BY t.txn_id ASC', topicPrefix='postgres-03', incrementingColumn='t.txn_id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
org.postgresql.util.PSQLException: No value specified for parameter 2.
 at org.postgresql.core.v3.SimpleParameterList.checkAllParametersSet(SimpleParameterList.java:228)
 at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:163)
 at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:645)
 at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:495)
 at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:380)
 at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:170)
 at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:86)
 at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:58)
 at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:304)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Currently the docs say

If you use a WHERE clause, it must handle incremental queries itself.

Is this possible with the JDBC connector? Or should the docs be clear that incremental/timestamp modes are mutually exclusive with WHERE in the query?

@aliasbadwolf

This comment has been minimized.

Copy link

commented Mar 6, 2019

Did you tried using your query with predicate as inner query? Try this:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_03",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:postgresql://postgres:5432/postgres",
                "connection.user": "connect_user",
                "connection.password": "asgard",
                "topic.prefix": "postgres-03",
                "mode":"incrementing",
                "query":"select * from (SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id WHERE t.currency='EUR') o",
                "incrementing.column.name": "t.txn_id",
                "validate.non.null": false
                }
        }'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.