Skip to content

Commit

Permalink
Fix #151 - Callbacks for offset_commit and consume
Browse files Browse the repository at this point in the history
  • Loading branch information
tPl0ch committed Mar 11, 2018
1 parent d9929b6 commit 163eebb
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 1 deletion.
145 changes: 144 additions & 1 deletion conf.c
Expand Up @@ -52,6 +52,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC) /* {{{ */
kafka_conf_callback_dtor(cbs->rebalance TSRMLS_CC);
kafka_conf_callback_dtor(cbs->dr_msg TSRMLS_CC);
kafka_conf_callback_dtor(cbs->stats TSRMLS_CC);
kafka_conf_callback_dtor(cbs->consume TSRMLS_CC);
kafka_conf_callback_dtor(cbs->offset_commit TSRMLS_CC);
} /* }}} */

static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
Expand All @@ -73,6 +75,8 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f
kafka_conf_callback_copy(&to->rebalance, from->rebalance TSRMLS_CC);
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg TSRMLS_CC);
kafka_conf_callback_copy(&to->stats, from->stats TSRMLS_CC);
kafka_conf_callback_copy(&to->consume, from->consume TSRMLS_CC);
kafka_conf_callback_copy(&to->offset_commit, from->offset_commit TSRMLS_CC);
} /* }}} */

static void kafka_conf_free(zend_object *object TSRMLS_DC) /* {{{ */
Expand Down Expand Up @@ -244,6 +248,65 @@ static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_
}
#endif /* HAVE_NEW_KAFKA_CONSUMER */

#ifdef HAVE_NEW_KAFKA_CONSUMER
static void kafka_conf_consume_cb(rd_kafka_message_t *msg, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zeval args[2];
TSRMLS_FETCH();

if (!opaque) {
return;
}

if (!cbs->consume) {
return;
}

MAKE_STD_ZEVAL(args[0]);
MAKE_STD_ZEVAL(args[1]);

KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
kafka_message_new(P_ZEVAL(args[1]), msg TSRMLS_CC);

rdkafka_call_function(&cbs->consume->fci, &cbs->consume->fcc, NULL, 2, args TSRMLS_CC);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
}
#endif /* HAVE_NEW_KAFKA_CONSUMER */

#ifdef HAVE_NEW_KAFKA_CONSUMER
static void kafka_conf_offset_commit_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
{
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
zeval args[3];
TSRMLS_FETCH();

if (!opaque) {
return;
}

if (!cbs->offset_commit) {
return;
}

MAKE_STD_ZEVAL(args[0]);
MAKE_STD_ZEVAL(args[1]);
MAKE_STD_ZEVAL(args[2]);

KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
ZVAL_LONG(P_ZEVAL(args[1]), err);
kafka_topic_partition_list_to_array(P_ZEVAL(args[2]), partitions TSRMLS_CC);

rdkafka_call_function(&cbs->offset_commit->fci, &cbs->offset_commit->fcc, NULL, 3, args TSRMLS_CC);

zval_ptr_dtor(&args[0]);
zval_ptr_dtor(&args[1]);
zval_ptr_dtor(&args[2]);
}
#endif /* HAVE_NEW_KAFKA_CONSUMER */

/* {{{ proto RdKafka\Conf::__construct() */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf___construct, 0, 0, 0)
Expand Down Expand Up @@ -550,6 +613,84 @@ PHP_METHOD(RdKafka__Conf, setRebalanceCb)
/* }}} */
#endif /* HAVE_NEW_KAFKA_CONSUMER */

#ifdef HAVE_NEW_KAFKA_CONSUMER
/* {{{ proto void RdKafka\Conf::setConsumeCb(callable $callback)
Set consume callback to use with poll */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf_set_consume_cb, 0, 0, 1)
ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Conf, setConsumeCb)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) == FAILURE) {
return;
}

intern = get_kafka_conf_object(getThis() TSRMLS_CC);
if (!intern) {
return;
}

Z_ADDREF_P(P_ZEVAL(fci.function_name));

if (intern->cbs.consume) {
zval_ptr_dtor(&intern->cbs.consume->fci.function_name);
} else {
intern->cbs.consume = ecalloc(1, sizeof(*intern->cbs.consume));
}

intern->cbs.consume->fci = fci;
intern->cbs.consume->fcc = fcc;

rd_kafka_conf_set_consume_cb(intern->u.conf, kafka_conf_consume_cb);
}
/* }}} */
#endif /* HAVE_NEW_KAFKA_CONSUMER */

#ifdef HAVE_NEW_KAFKA_CONSUMER
/* {{{ proto void RdKafka\Conf::setOffsetCommitCb(mixed $callback)
Set offset commit callback for use with consumer groups */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_conf_set_offset_commit_cb, 0, 0, 1)
ZEND_ARG_INFO(0, callback)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Conf, setOffsetCommitCb)
{
zend_fcall_info fci;
zend_fcall_info_cache fcc;
kafka_conf_object *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", &fci, &fcc) == FAILURE) {
return;
}

intern = get_kafka_conf_object(getThis() TSRMLS_CC);
if (!intern) {
return;
}

Z_ADDREF_P(P_ZEVAL(fci.function_name));

if (intern->cbs.offset_commit) {
zval_ptr_dtor(&intern->cbs.offset_commit->fci.function_name);
} else {
intern->cbs.offset_commit = ecalloc(1, sizeof(*intern->cbs.offset_commit));
}

intern->cbs.offset_commit->fci = fci;
intern->cbs.offset_commit->fcc = fcc;

rd_kafka_conf_set_offset_commit_cb(intern->u.conf, kafka_conf_offset_commit_cb);
}
/* }}} */
#endif /* HAVE_NEW_KAFKA_CONSUMER */

/* {{{ proto RdKafka\TopicConf::__construct() */
PHP_METHOD(RdKafka__TopicConf, __construct)
{
Expand Down Expand Up @@ -630,7 +771,9 @@ static const zend_function_entry kafka_conf_fe[] = {
PHP_ME(RdKafka__Conf, setStatsCb, arginfo_kafka_conf_set_stats_cb, ZEND_ACC_PUBLIC)
#ifdef HAVE_NEW_KAFKA_CONSUMER
PHP_ME(RdKafka__Conf, setRebalanceCb, arginfo_kafka_conf_set_rebalance_cb, ZEND_ACC_PUBLIC)
#endif /* HAVE_NEW_KAFKA_CONSUMER */
PHP_ME(RdKafka__Conf, setConsumeCb, arginfo_kafka_conf_set_consume_cb, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Conf, setOffsetCommitCb, arginfo_kafka_conf_set_offset_commit_cb, ZEND_ACC_PUBLIC)
#endif /* HAVE_NEW_KAFKA_CONSUMER */
PHP_FE_END
};

Expand Down
2 changes: 2 additions & 0 deletions conf.h
Expand Up @@ -42,6 +42,8 @@ typedef struct _kafka_conf_callbacks {
kafka_conf_callback *rebalance;
kafka_conf_callback *dr_msg;
kafka_conf_callback *stats;
kafka_conf_callback *consume;
kafka_conf_callback *offset_commit;
} kafka_conf_callbacks;

typedef struct _kafka_conf_object {
Expand Down
14 changes: 14 additions & 0 deletions tests/conf.phpt
Expand Up @@ -43,6 +43,16 @@ $conf->setStatsCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["stats_cb"]));

echo "Setting consume callback\n";
$conf->setConsumeCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["consume_cb"]));

echo "Setting offset_commit callback\n";
$conf->setOffsetCommitCb(function () { });
$dump = $conf->dump();
var_dump(isset($dump["offset_commit_cb"]));

echo "Dumping conf\n";
var_dump(array_intersect_key($conf->dump(), array(
"client.id" => true,
Expand All @@ -64,6 +74,10 @@ Setting dr_msg callback
bool(true)
Setting stats callback
bool(true)
Setting consume callback
bool(true)
Setting offset_commit callback
bool(true)
Dumping conf
array(3) {
["client.id"]=>
Expand Down

0 comments on commit 163eebb

Please sign in to comment.