diff --git a/conf.c b/conf.c index 1257ea4a..26455b9a 100644 --- a/conf.c +++ b/conf.c @@ -52,6 +52,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC) /* {{{ */ kafka_conf_callback_dtor(cbs->rebalance TSRMLS_CC); kafka_conf_callback_dtor(cbs->dr_msg TSRMLS_CC); kafka_conf_callback_dtor(cbs->stats TSRMLS_CC); + kafka_conf_callback_dtor(cbs->consume TSRMLS_CC); + kafka_conf_callback_dtor(cbs->offset_commit TSRMLS_CC); } /* }}} */ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */ @@ -73,6 +75,8 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f kafka_conf_callback_copy(&to->rebalance, from->rebalance TSRMLS_CC); kafka_conf_callback_copy(&to->dr_msg, from->dr_msg TSRMLS_CC); kafka_conf_callback_copy(&to->stats, from->stats TSRMLS_CC); + kafka_conf_callback_copy(&to->consume, from->consume TSRMLS_CC); + kafka_conf_callback_copy(&to->offset_commit, from->offset_commit TSRMLS_CC); } /* }}} */ static void kafka_conf_free(zend_object *object TSRMLS_DC) /* {{{ */ @@ -244,6 +248,65 @@ static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_ } #endif /* HAVE_NEW_KAFKA_CONSUMER */ +#ifdef HAVE_NEW_KAFKA_CONSUMER +static void kafka_conf_consume_cb(rd_kafka_message_t *msg, void *opaque) +{ + kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; + zeval args[2]; + TSRMLS_FETCH(); + + if (!opaque) { + return; + } + + if (!cbs->consume) { + return; + } + + MAKE_STD_ZEVAL(args[0]); + MAKE_STD_ZEVAL(args[1]); + + KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0); + kafka_message_new(P_ZEVAL(args[1]), msg TSRMLS_CC); + + rdkafka_call_function(&cbs->consume->fci, &cbs->consume->fcc, NULL, 2, args TSRMLS_CC); + + zval_ptr_dtor(&args[0]); + zval_ptr_dtor(&args[1]); +} +#endif /* HAVE_NEW_KAFKA_CONSUMER */ + +#ifdef HAVE_NEW_KAFKA_CONSUMER +static void kafka_conf_offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) +{ + kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; + zeval args[3]; + TSRMLS_FETCH(); + + if (!opaque) { + return; + } + + if (!cbs->offset_commit) { + return; + } + + MAKE_STD_ZEVAL(args[0]); + MAKE_STD_ZEVAL(args[1]); + MAKE_STD_ZEVAL(args[2]); + + KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0); + ZVAL_LONG(P_ZEVAL(args[1]), err); + kafka_topic_partition_list_to_array(P_ZEVAL(args[2]), partitions TSRMLS_CC); + + rdkafka_call_function(&cbs->offset_commit->fci, &cbs->offset_commit->fcc, NULL, 3, args TSRMLS_CC); + + zval_ptr_dtor(&args[0]); + zval_ptr_dtor(&args[1]); + zval_ptr_dtor(&args[2]); +} +#endif /* HAVE_NEW_KAFKA_CONSUMER */ + /* {{{ proto RdKafka\Conf::__construct() */ ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf___construct, 0, 0, 0) @@ -550,6 +613,84 @@ PHP_METHOD(RdKafka__Conf, setRebalanceCb) /* }}} */ #endif /* HAVE_NEW_KAFKA_CONSUMER */ +#ifdef HAVE_NEW_KAFKA_CONSUMER +/* {{{ proto void RdKafka\Conf::setConsumeCb(callable $callback) + Set consume callback to use with poll */ + +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf_set_consume_cb, 0, 0, 1) + ZEND_ARG_INFO(0, callback) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__Conf, setConsumeCb) +{ + zend_fcall_info fci; + zend_fcall_info_cache fcc; + kafka_conf_object *intern; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) == FAILURE) { + return; + } + + intern = get_kafka_conf_object(getThis() TSRMLS_CC); + if (!intern) { + return; + } + + Z_ADDREF_P(P_ZEVAL(fci.function_name)); + + if (intern->cbs.consume) { + zval_ptr_dtor(&intern->cbs.consume->fci.function_name); + } else { + intern->cbs.consume = ecalloc(1, sizeof(*intern->cbs.consume)); + } + + intern->cbs.consume->fci = fci; + intern->cbs.consume->fcc = fcc; + + rd_kafka_conf_set_consume_cb(intern->u.conf, kafka_conf_consume_cb); +} +/* }}} */ +#endif /* HAVE_NEW_KAFKA_CONSUMER */ + +#ifdef HAVE_NEW_KAFKA_CONSUMER +/* {{{ proto void RdKafka\Conf::setOffsetCommitCb(mixed $callback) + Set offset commit callback for use with consumer groups */ + +ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf_set_offset_commit_cb, 0, 0, 1) + ZEND_ARG_INFO(0, callback) +ZEND_END_ARG_INFO() + +PHP_METHOD(RdKafka__Conf, setOffsetCommitCb) +{ + zend_fcall_info fci; + zend_fcall_info_cache fcc; + kafka_conf_object *intern; + + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) == FAILURE) { + return; + } + + intern = get_kafka_conf_object(getThis() TSRMLS_CC); + if (!intern) { + return; + } + + Z_ADDREF_P(P_ZEVAL(fci.function_name)); + + if (intern->cbs.offset_commit) { + zval_ptr_dtor(&intern->cbs.offset_commit->fci.function_name); + } else { + intern->cbs.offset_commit = ecalloc(1, sizeof(*intern->cbs.offset_commit)); + } + + intern->cbs.offset_commit->fci = fci; + intern->cbs.offset_commit->fcc = fcc; + + rd_kafka_conf_set_offset_commit_cb(intern->u.conf, kafka_conf_offset_commit_cb); +} +/* }}} */ +#endif /* HAVE_NEW_KAFKA_CONSUMER */ + /* {{{ proto RdKafka\TopicConf::__construct() */ PHP_METHOD(RdKafka__TopicConf, __construct) { @@ -630,7 +771,9 @@ static const zend_function_entry kafka_conf_fe[] = { PHP_ME(RdKafka__Conf, setStatsCb, arginfo_kafka_conf_set_stats_cb, ZEND_ACC_PUBLIC) #ifdef HAVE_NEW_KAFKA_CONSUMER PHP_ME(RdKafka__Conf, setRebalanceCb, arginfo_kafka_conf_set_rebalance_cb, ZEND_ACC_PUBLIC) -#endif /* HAVE_NEW_KAFKA_CONSUMER */ + PHP_ME(RdKafka__Conf, setConsumeCb, arginfo_kafka_conf_set_consume_cb, ZEND_ACC_PUBLIC) + PHP_ME(RdKafka__Conf, setOffsetCommitCb, arginfo_kafka_conf_set_offset_commit_cb, ZEND_ACC_PUBLIC) + #endif /* HAVE_NEW_KAFKA_CONSUMER */ PHP_FE_END }; diff --git a/conf.h b/conf.h index 0373fbea..746599b6 100644 --- a/conf.h +++ b/conf.h @@ -42,6 +42,8 @@ typedef struct _kafka_conf_callbacks { kafka_conf_callback *rebalance; kafka_conf_callback *dr_msg; kafka_conf_callback *stats; + kafka_conf_callback *consume; + kafka_conf_callback *offset_commit; } kafka_conf_callbacks; typedef struct _kafka_conf_object { diff --git a/tests/conf.phpt b/tests/conf.phpt index cc39760b..43a93e8a 100644 --- a/tests/conf.phpt +++ b/tests/conf.phpt @@ -43,6 +43,16 @@ $conf->setStatsCb(function () { }); $dump = $conf->dump(); var_dump(isset($dump["stats_cb"])); +echo "Setting consume callback\n"; +$conf->setConsumeCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["consume_cb"])); + +echo "Setting offset_commit callback\n"; +$conf->setOffsetCommitCb(function () { }); +$dump = $conf->dump(); +var_dump(isset($dump["offset_commit_cb"])); + echo "Dumping conf\n"; var_dump(array_intersect_key($conf->dump(), array( "client.id" => true, @@ -64,6 +74,10 @@ Setting dr_msg callback bool(true) Setting stats callback bool(true) +Setting consume callback +bool(true) +Setting offset_commit callback +bool(true) Dumping conf array(3) { ["client.id"]=>