Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error propagation differences between the callback and event API #4493

Open
3 of 7 tasks
scanterog opened this issue Nov 3, 2023 · 2 comments
Open
3 of 7 tasks

Error propagation differences between the callback and event API #4493

scanterog opened this issue Nov 3, 2023 · 2 comments

Comments

@scanterog
Copy link

scanterog commented Nov 3, 2023

Description

I'm trying to migrate rust-rdkafka from the callback based API to the event one.

If we consume messages via the callback API (i.e rd_kafka_consumer_poll), then RD_KAFKA_OP_ERR errors are not passed back to the application while RD_KAFKA_OP_CONSUMER_ERR are. This is visible here

librdkafka/src/rdkafka.c

Lines 3846 to 3869 in 95a542c

case RD_KAFKA_OP_CONSUMER_ERR:
/* rd_kafka_consumer_poll() (_Q_CB_CONSUMER):
* Consumer errors are returned to the application
* as rkmessages, not error callbacks.
*
* rd_kafka_poll() (_Q_CB_GLOBAL):
* convert to ERR op (fallthru)
*/
if (cb_type == RD_KAFKA_Q_CB_RETURN ||
cb_type == RD_KAFKA_Q_CB_FORCE_RETURN) {
/* return as message_t to application */
return RD_KAFKA_OP_RES_PASS;
}
/* FALLTHRU */
case RD_KAFKA_OP_ERR:
if (rk->rk_conf.error_cb)
rk->rk_conf.error_cb(rk, rko->rko_err,
rko->rko_u.err.errstr,
rk->rk_conf.opaque);
else
rd_kafka_log(rk, LOG_ERR, "ERROR", "%s: %s",
rk->rk_name, rko->rko_u.err.errstr);
break;
where RD_KAFKA_OP_CONSUMER_ERR will return RD_KAFKA_OP_RES_PASS while RD_KAFKA_OP_ERR will provide the error to any error_cb registered or just log it and return the default result value which is RD_KAFKA_OP_RES_HANDLED.

Later on rd_kafka_consume0 will get this op_result and just skip the return of a message with the RD_KAFKA_OP_ERR error.

librdkafka/src/rdkafka.c

Lines 3134 to 3150 in 95a542c

res =
rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_RETURN, NULL);
if (res == RD_KAFKA_OP_RES_PASS)
break;
if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
rd_kafka_yield_thread)) {
/* Callback called rd_kafka_yield(), we must
* stop dispatching the queue and return. */
rd_kafka_set_last_error(RD_KAFKA_RESP_ERR__INTR, EINTR);
rd_kafka_app_polled(rk);
return NULL;
}
/* Message was handled by callback. */
continue;

If we consume messages via the Event API (i.e rd_kafka_queue_poll on the consumer queue after enabling the events via rd_kafka_conf_set_events), rd_kafka_op2event will set both RD_KAFKA_OP_ERR and RD_KAFKA_OP_CONSUMER_ERR as RD_KAFKA_EVENT_ERROR.

[RD_KAFKA_OP_ERR] = RD_KAFKA_EVENT_ERROR,
[RD_KAFKA_OP_CONSUMER_ERR] = RD_KAFKA_EVENT_ERROR,

This results on the consumer now passing back any RD_KAFKA_OP_ERR errors to the application rather than just RD_KAFKA_OP_CONSUMER_ERR errors as with the callback API. For example, RD_KAFKA_RESP_ERR__TRANSPORT errors are now passed back to the app.

Is this difference accepted and is this the expected behaviour or is this a bug? I do see confluence-kafka-go has a special case for PartitionEOF but then passes back all errors to the application (with a flag for fatal ones).

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 2.3.0
  • Apache Kafka version: 2.x and 3.x
  • librdkafka client configuration: defaults.
  • Operating system: <REPLACE with e.g., Centos 5 (x64)>
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@scanterog
Copy link
Author

@emasab I know guys this is not a high priority for you, but is there any way where I can easily get the list of transient errors (assuming this is the diff between RD_KAFKA_OP_ERR and RD_KAFKA_OP_CONSUMER_ERR) to filter those in rust-rdkafka when using the event API? we're getting some user reports and we want to filter those errors (i.e stop bubbling transient errors to the users) to rule out some issues. Any pointers will be highly appreciated.

@scanterog
Copy link
Author

By looking at the code, it looks like we build RD_KAFKA_OP_CONSUMER_ERR errors (i.e errors that are returned to the app) by calling rd_kafka_consumer_err. Then by looking at where this is called, I’ve filtered the following errors as the one needed to be bubble to the users via poll. This is not 100% accurate as I might have missed when there are nested calls, so this is fragile. I'm looking a list like this. The issue described on the ticket only affects the consumer. It does not affect the producer.

  • RD_KAFKA_RESP_ERR__BAD_MSG
  • RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED
  • RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE
  • RD_KAFKA_RESP_ERR__BAD_COMPRESSION
  • RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE
  • RD_KAFKA_RESP_ERR__AUTO_OFFSET_RESET
  • RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
  • RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
  • RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE
  • RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED
  • RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED
  • RD_KAFKA_RESP_ERR__PARTITION_EOF
  • RD_KAFKA_RESP_ERR__FATAL
  • RD_KAFKA_RESP_ERR__NO_OFFSET
  • RD_KAFKA_RESP_ERR__IN_PROGRESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant