Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

number of tasks, their state and relation to the connector state #95

Open
konstan opened this issue Apr 20, 2023 · 1 comment
Open

number of tasks, their state and relation to the connector state #95

konstan opened this issue Apr 20, 2023 · 1 comment

Comments

@konstan
Copy link

konstan commented Apr 20, 2023

I have "tasks.max": "10" in the connector config. How to interpret the FAILED state of the task below with the connector state RUNNING? Is the FAILED state of the task with ID 0 just an indicative log message and connector as a whole still able to process messages (using other tasks?) or does the connector need to be restarted to make it work? But my impression is that only one task out of 10 gets provisioned/used (see logs below and the full log after deploying the connector can be found here https://gist.github.com/konstan/238f7025c2ce051ebdc81141b49978a4).

Config and status of the connector after a failure in a task.

$ curl localhost:8083/connectors/elastic-source-nuvlabox-status | jq .
{
  "name": "elastic-source-nuvlabox-status",
  "config": {
    "connector.class": "com.github.dariobalinzo.ElasticSourceConnector",
    "topic.prefix": "es_",
    "fieldname_converter": "nop",
    "es.host": "es",
    "tasks.max": "10",
    "incrementing.field.name": "updated",
    "name": "elastic-source-nuvlabox-status",
    "es.port": "9200",
    "index.prefix": "nuvla-nuvlabox-status"
  },
  "tasks": [
    {
      "connector": "elastic-source-nuvlabox-status",
      "task": 0
    }
  ],
  "type": "source"
}
$
$ curl localhost:8083/connectors/elastic-source-nuvlabox-status/status | jq .
{
  "name": "elastic-source-nuvlabox-status",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.61:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.0.0.61:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:291)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1048590 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"
    }
  ],
  "type": "source"
}
$

The stack trace

org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
	at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:291)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1048590 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

Checking the logs to understand the number of tasks provisioned/running/failed. It looks like only one task was provisioned out of the 10 requested.

$ grep 'elastic-source-nuvlabox-status|task-' connectDistributed.out
...
[2023-04-20 08:14:32,100] ERROR [elastic-source-nuvlabox-status|task-0] WorkerSourceTask{id=elastic-source-nuvlabox-status-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
[2023-04-20 08:14:32,102] INFO [elastic-source-nuvlabox-status|task-0] [Producer clientId=connector-producer-elastic-source-nuvlabox-status-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1249)
[2023-04-20 08:14:32,104] INFO [elastic-source-nuvlabox-status|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:659)
[2023-04-20 08:14:32,104] INFO [elastic-source-nuvlabox-status|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:663)
[2023-04-20 08:14:32,104] INFO [elastic-source-nuvlabox-status|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:669)
[2023-04-20 08:14:32,105] INFO [elastic-source-nuvlabox-status|task-0] App info kafka.producer for connector-producer-elastic-source-nuvlabox-status-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

Task is being killed and will not recover until manually restarted - how to restart a task? Or is there a config param to restart them automatically by the connector? Or is it possible to configure not to fail the tasks at all (just log the failures)?

@DarioBalinzo
Copy link
Owner

Hi for task error status and you can check the confluent documentation on kafka connectors. I implemented the standard callback (e.g. onStart etc, the tasks are created among all the source data available etc..) but most of the code is just under confluent control.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants