Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Found last value Cursor{primaryCursor='null', secondaryCursor='null'} java.lang.NullPointerException #49

Open
nmousouros opened this issue Jul 1, 2021 · 7 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@nmousouros
Copy link

We managed to connect to the elasticsearch but we are getting a nullpointerexception, what could be causing that? Below is our kafka (strimzi) configuration and the error message

class: com.github.dariobalinzo.ElasticSourceConnector config: poll.interval.ms: 1000 es.port: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:port} es.scheme: https fieldname_converter: nop es.tls.truststore.location: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:truststore-location} es.host: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:host} es.user: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:user} elastic.security.protocol: SSL es.tls.truststore.password: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:truststore-password} index.prefix: ca-dict-sanctions-stg errors.log.enable: true topic.prefix: '' incrementing.field.name: status.currentUpdatedOn es.password: >- ${file:/opt/kafka/external-configuration/elasticsearch-creds/elasticsearch-creds:password} tasksMax: 1

"_source" : { "field" : "Country", "operator" : "Equals", "value" : "CYP", "action" : "Block", "replace" : "", "carrierBank" : [ ], "status" : { "currentStatus" : "Active", "currentUpdatedOn" : "2021-07-01T12:20:04.453+0300", "currentNote" : null, "previousStatus" : null, "previousUpdatedOn" : null, "previousNote" : null, "daysSinceLastUpdate" : 0 }, "metadata" : { "creationDate" : "2021-07-01T12:20:04.453+0300", "createdBy" : "Userlevel4", "lastEditedBy" : "Userlevel4", "lastEditedDate" : "2021-07-01T12:20:04.454+0300" } }

2021-07-01 11:12:57,262 INFO fetching from ca-dict-sanctions-stg (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] 2021-07-01 11:12:57,795 INFO found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] 2021-07-01 11:12:57,797 ERROR error (com.github.dariobalinzo.task.ElasticSourceTask) [task-thread-ca-dict-sanctions-stg-0] java.lang.NullPointerException at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:97) at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:203) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

@DarioBalinzo
Copy link
Owner

I've seen the code and the incrementing.field.name is not compatible yet with nested documents. At this time it works only with the root level fields.

I hope you can find a workaround or a new incrementing field, in the meantime I'm planning to add the compatibility for nested fields also for cursors when finalizing the ssl config branch.

@DarioBalinzo DarioBalinzo self-assigned this Jul 4, 2021
@DarioBalinzo DarioBalinzo added the enhancement New feature or request label Jul 4, 2021
@DarioBalinzo DarioBalinzo added this to the 1.4.3 milestone Jul 4, 2021
@armaankohli91
Copy link

Hi

Any update on this?

On all versions above 1.4, I am facing this issue even when my incrementing.field.name is not a nested field. Following is the connector configuration:

{
             "connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
             "tasks.max": "1",
             "es.scheme": "http",
             "es.host" : "somehost",
             "es.port" : "someport",
             "es.user" : "username",
             "mode":"incrementing",
             "es.password" : "password", 
             "index.prefix" : "index",
             "topic.prefix" : "topic",
             "incrementing.field.name" : "received_on",
             "poll.interval.ms" : 10000

}

@DarioBalinzo
Copy link
Owner

Hi @armaankohli91 , sorry for the delay, I will work on it in the next days and I will update here in the issue.
Can you provide a sample document json, especially of the incrementing field that is failing?

Is that an array? array also are not supported

Thanks,
Dario

@armaankohli91
Copy link

armaankohli91 commented Feb 17, 2022

Hi @armaankohli91 , sorry for the delay, I will work on it in the next days and I will update here in the issue. Can you provide a sample document json, especially of the incrementing field that is failing?

Is that an array? array also are not supported

Thanks, Dario

Hi Dario

Thanks for the response.

No it is not an array. Its a plain number that just increments with each document.

Sorry I misunderstood by what you meant by sample document. Editing the reply to have the same here:

{
                    "da_name": "Rihanna",
                    "da_firstName": "Rihanna",
                    "da_lastName": "Rihanna",
                    "da_address": "TX",
                    "ra_f1": "12",
                    "ra_H1": "11",
                    "created_on": "2022-02-02T16:38:53.0870292",
                    "received_on": 97568911026700
                }

So, received_on is a field that stores current milliseconds while saving the document.

Hope it helps.

-Armaan

@dahal4
Copy link

dahal4 commented May 10, 2022

hi there ,i am facing same problem .Did anyone find the solution?

@siva-vunet
Copy link

Hey, is this issue still open ? I am facing this exact same error when i try to use a timestamp keyword field as the incrementing field in the Kafka connector configuration.

@hyperbaba
Copy link

Looks like the nested fields for the incrementing.field.name is still not functional. This is a serious limitation for this great piece of software

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants