Skip to content

Commit

Permalink
Arginfo stubs for rdkafka.c
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaud-lb committed Nov 15, 2021
1 parent e67e129 commit 03ce29d
Show file tree
Hide file tree
Showing 10 changed files with 506 additions and 187 deletions.
2 changes: 1 addition & 1 deletion php_rdkafka.h
Expand Up @@ -38,7 +38,7 @@ typedef struct _kafka_object {
zend_object std;
} kafka_object;

PHP_METHOD(RdKafka, __construct);
PHP_METHOD(RdKafka_RdKafka, __construct);

extern zend_module_entry rdkafka_module_entry;
#define phpext_rdkafka_ptr &rdkafka_module_entry
Expand Down
2 changes: 1 addition & 1 deletion queue.c
Expand Up @@ -122,7 +122,7 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka___private_construct, 0, 0, 0)
ZEND_END_ARG_INFO()

static const zend_function_entry kafka_queue_fe[] = {
PHP_ME(RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
PHP_ME(RdKafka_RdKafka, __construct, arginfo_kafka___private_construct, ZEND_ACC_PRIVATE)
PHP_ME(RdKafka__Queue, consume, arginfo_kafka_queue_consume, ZEND_ACC_PUBLIC)
PHP_FE_END
};
Expand Down
190 changes: 29 additions & 161 deletions rdkafka.c
Expand Up @@ -42,6 +42,11 @@
#include "kafka_consumer.h"
#include "topic_partition.h"
#include "fun.h"
#if PHP_VERSION_ID < 80000
#include "rdkafka_legacy_arginfo.h"
#else
#include "rdkafka_arginfo.h"
#endif

#if RD_KAFKA_VERSION < 0x000b0000
# error librdkafka version 0.11.0 or greater required
Expand Down Expand Up @@ -252,20 +257,15 @@ int is_consuming_toppar(kafka_object * intern, rd_kafka_topic_t * rkt, int32_t p
}

/* {{{ private constructor */
PHP_METHOD(RdKafka, __construct)
PHP_METHOD(RdKafka_RdKafka, __construct)
{
zend_throw_exception(NULL, "Private constructor", 0);
return;
}
/* }}} */

/* {{{ proto RdKafka\Consumer::__construct([RdKafka\Conf $conf]) */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_consumer___construct, 0, 0, 0)
ZEND_ARG_INFO(0, conf)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Consumer, __construct)
PHP_METHOD(RdKafka_Consumer, __construct)
{
zval *zconf = NULL;
zend_error_handling error_handling;
Expand All @@ -285,11 +285,7 @@ PHP_METHOD(RdKafka__Consumer, __construct)

/* {{{ proto RdKafka\Queue RdKafka\Consumer::newQueue()
Returns a RdKafka\Queue object */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_new_queue, 0, 0, 0)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Consumer, newQueue)
PHP_METHOD(RdKafka_Consumer, newQueue)
{
rd_kafka_queue_t *rkqu;
kafka_object *intern;
Expand Down Expand Up @@ -332,20 +328,9 @@ PHP_METHOD(RdKafka__Consumer, newQueue)
}
/* }}} */

static const zend_function_entry kafka_consumer_fe[] = {
PHP_ME(RdKafka__Consumer, __construct, arginfo_kafka_consumer___construct, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Consumer, newQueue, arginfo_kafka_new_queue, ZEND_ACC_PUBLIC)
PHP_FE_END
};

/* {{{ proto int RdKafka\Kafka::addBrokers(string $brokerList)
Returns the number of brokers successfully added */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_add_brokers, 0, 0, 1)
ZEND_ARG_INFO(0, broker_list)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, addBrokers)
PHP_METHOD(RdKafka_RdKafka, addBrokers)
{
char *broker_list;
size_t broker_list_len;
Expand All @@ -366,13 +351,7 @@ PHP_METHOD(RdKafka__Kafka, addBrokers)

/* {{{ proto RdKafka\Metadata::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
Request Metadata from broker */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_get_metadata, 0, 0, 1)
ZEND_ARG_INFO(0, all_topics)
ZEND_ARG_INFO(0, only_topic)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, getMetadata)
PHP_METHOD(RdKafka_RdKafka, getMetadata)
{
zend_bool all_topics;
zval *only_zrkt;
Expand Down Expand Up @@ -411,12 +390,7 @@ PHP_METHOD(RdKafka__Kafka, getMetadata)

/* {{{ proto void RdKafka\Kafka::setLogLevel(int $level)
Specifies the maximum logging level produced by internal kafka logging and debugging */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_set_log_level, 0, 0, 1)
ZEND_ARG_INFO(0, level)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, setLogLevel)
PHP_METHOD(RdKafka_RdKafka, setLogLevel)
{
kafka_object *intern;
zend_long level;
Expand All @@ -436,13 +410,7 @@ PHP_METHOD(RdKafka__Kafka, setLogLevel)

/* {{{ proto RdKafka\Topic RdKafka\Kafka::newTopic(string $topic)
Returns an RdKafka\Topic object */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_new_topic, 0, 0, 1)
ZEND_ARG_INFO(0, topic_name)
ZEND_ARG_INFO(0, topic_conf)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, newTopic)
PHP_METHOD(RdKafka_RdKafka, newTopic)
{
char *topic;
size_t topic_len;
Expand Down Expand Up @@ -507,11 +475,7 @@ PHP_METHOD(RdKafka__Kafka, newTopic)

/* {{{ proto int RdKafka\Kafka::getOutQLen()
Returns the current out queue length */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_get_outq_len, 0, 0, 0)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, getOutQLen)
PHP_METHOD(RdKafka_RdKafka, getOutQLen)
{
kafka_object *intern;

Expand All @@ -530,12 +494,7 @@ PHP_METHOD(RdKafka__Kafka, getOutQLen)

/* {{{ proto int RdKafka\Kafka::poll(int $timeout_ms)
Polls the provided kafka handle for events */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_poll, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, poll)
PHP_METHOD(RdKafka_RdKafka, poll)
{
kafka_object *intern;
zend_long timeout;
Expand All @@ -555,12 +514,7 @@ PHP_METHOD(RdKafka__Kafka, poll)

/* {{{ proto int RdKafka\Kafka::flush(int $timeout_ms)
Wait until all outstanding produce requests, et.al, are completed. */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_flush, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, flush)
PHP_METHOD(RdKafka_RdKafka, flush)
{
kafka_object *intern;
zend_long timeout;
Expand All @@ -581,12 +535,7 @@ PHP_METHOD(RdKafka__Kafka, flush)
#ifdef HAS_RD_KAFKA_PURGE
/* {{{ proto int RdKafka\Kafka::purge(int $purge_flags)
Purge messages that are in queue or in flight */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_purge, 0, 0, 1)
ZEND_ARG_INFO(0, purge_flags)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, purge)
PHP_METHOD(RdKafka_RdKafka, purge)
{
kafka_object *intern;
zend_long purge_flags;
Expand All @@ -607,16 +556,7 @@ PHP_METHOD(RdKafka__Kafka, purge)

/* {{{ proto void RdKafka\Kafka::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms)
Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_query_watermark_offsets, 0, 0, 1)
ZEND_ARG_INFO(0, topic)
ZEND_ARG_INFO(0, partition)
ZEND_ARG_INFO(1, low)
ZEND_ARG_INFO(1, high)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, queryWatermarkOffsets)
PHP_METHOD(RdKafka_RdKafka, queryWatermarkOffsets)
{
kafka_object *intern;
char *topic;
Expand Down Expand Up @@ -652,11 +592,7 @@ PHP_METHOD(RdKafka__Kafka, queryWatermarkOffsets)

/* {{{ proto void RdKafka\Kafka::offsetsForTimes(array $topicPartitions, int $timeout_ms)
Look up the offsets for the given partitions by timestamp. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_offsets_for_times, 0, 0, 2)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()
PHP_METHOD(RdKafka__Kafka, offsetsForTimes)
PHP_METHOD(RdKafka_RdKafka, offsetsForTimes)
{
HashTable *htopars = NULL;
kafka_object *intern;
Expand Down Expand Up @@ -692,12 +628,7 @@ PHP_METHOD(RdKafka__Kafka, offsetsForTimes)

/* {{{ proto void RdKafka::setLogger(mixed $logger)
Sets the log callback */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_set_logger, 0, 0, 1)
ZEND_ARG_INFO(0, logger)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, setLogger)
PHP_METHOD(RdKafka_RdKafka, setLogger)
{
kafka_object *intern;
zend_long id;
Expand Down Expand Up @@ -735,11 +666,7 @@ PHP_METHOD(RdKafka__Kafka, setLogger)

/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::pausePatitions(RdKafka\TopicPartition[] $topicPartitions)
Pause producing or consumption for the provided list of partitions. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_pause_partitions, 0, 0, 1)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, pausePartitions)
PHP_METHOD(RdKafka_RdKafka, pausePartitions)
{
HashTable *htopars;
rd_kafka_topic_partition_list_t *topars;
Expand Down Expand Up @@ -775,11 +702,7 @@ PHP_METHOD(RdKafka__Kafka, pausePartitions)

/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::resumePatitions(RdKafka\TopicPartition[] $topicPartitions)
Resume producing consumption for the provided list of partitions. */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_resume_partitions, 0, 0, 1)
ZEND_ARG_INFO(0, topic_partitions)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Kafka, resumePartitions)
PHP_METHOD(RdKafka_RdKafka, resumePartitions)
{
HashTable *htopars;
rd_kafka_topic_partition_list_t *topars;
Expand Down Expand Up @@ -813,34 +736,8 @@ PHP_METHOD(RdKafka__Kafka, resumePartitions)
}
/* }}} */

static const zend_function_entry kafka_fe[] = {
PHP_ME(RdKafka__Kafka, addBrokers, arginfo_kafka_add_brokers, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, getMetadata, arginfo_kafka_get_metadata, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, getOutQLen, arginfo_kafka_get_outq_len, ZEND_ACC_PUBLIC)
PHP_MALIAS(RdKafka__Kafka, metadata, getMetadata, arginfo_kafka_get_metadata, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)
PHP_ME(RdKafka__Kafka, setLogLevel, arginfo_kafka_set_log_level, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)
PHP_ME(RdKafka__Kafka, newTopic, arginfo_kafka_new_topic, ZEND_ACC_PUBLIC)
PHP_MALIAS(RdKafka__Kafka, outqLen, getOutQLen, arginfo_kafka_get_outq_len, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)
PHP_ME(RdKafka__Kafka, poll, arginfo_kafka_poll, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, flush, arginfo_kafka_flush, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_PURGE
PHP_ME(RdKafka__Kafka, purge, arginfo_kafka_purge, ZEND_ACC_PUBLIC)
#endif
PHP_ME(RdKafka__Kafka, setLogger, arginfo_kafka_set_logger, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)
PHP_ME(RdKafka__Kafka, queryWatermarkOffsets, arginfo_kafka_query_watermark_offsets, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, offsetsForTimes, arginfo_kafka_offsets_for_times, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, pausePartitions, arginfo_kafka_kafka_pause_partitions, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Kafka, resumePartitions, arginfo_kafka_kafka_resume_partitions, ZEND_ACC_PUBLIC)
PHP_FE_END
};

/* {{{ proto RdKafka\Producer::__construct([RdKafka\Conf $conf]) */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_producer___construct, 0, 0, 0)
ZEND_ARG_INFO(0, conf)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Producer, __construct)
PHP_METHOD(RdKafka_Producer, __construct)
{
zval *zconf = NULL;
zend_error_handling error_handling;
Expand All @@ -861,11 +758,7 @@ PHP_METHOD(RdKafka__Producer, __construct)
#ifdef HAS_RD_KAFKA_TRANSACTIONS
/* {{{ proto int RdKafka\Producer::initTransactions(int timeout_ms)
Initializes transactions, needs to be done before producing and starting a transaction */
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_init_transactions, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Producer, initTransactions)
PHP_METHOD(RdKafka_Producer, initTransactions)
{
kafka_object *intern;
zend_long timeout_ms;
Expand Down Expand Up @@ -893,11 +786,7 @@ PHP_METHOD(RdKafka__Producer, initTransactions)

/* {{{ proto int RdKafka\Producer::beginTransaction()
Start a transaction */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_begin_transaction, 0, 0, 0)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Producer, beginTransaction)
PHP_METHOD(RdKafka_Producer, beginTransaction)
{
kafka_object *intern;
const rd_kafka_error_t *error;
Expand All @@ -920,12 +809,7 @@ PHP_METHOD(RdKafka__Producer, beginTransaction)

/* {{{ proto int RdKafka\Producer::commitTransaction(int timeout_ms)
Commit a transaction */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_commit_transaction, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Producer, commitTransaction)
PHP_METHOD(RdKafka_Producer, commitTransaction)
{
kafka_object *intern;
zend_long timeout_ms;
Expand Down Expand Up @@ -953,12 +837,7 @@ PHP_METHOD(RdKafka__Producer, commitTransaction)

/* {{{ proto int RdKafka\Producer::abortTransaction(int timeout_ms)
Commit a transaction */

ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_abort_transaction, 0, 0, 1)
ZEND_ARG_INFO(0, timeout_ms)
ZEND_END_ARG_INFO()

PHP_METHOD(RdKafka__Producer, abortTransaction)
PHP_METHOD(RdKafka_Producer, abortTransaction)
{
kafka_object *intern;
zend_long timeout_ms;
Expand All @@ -985,17 +864,6 @@ PHP_METHOD(RdKafka__Producer, abortTransaction)
/* }}} */
#endif

static const zend_function_entry kafka_producer_fe[] = {
PHP_ME(RdKafka__Producer, __construct, arginfo_kafka_producer___construct, ZEND_ACC_PUBLIC)
#ifdef HAS_RD_KAFKA_TRANSACTIONS
PHP_ME(RdKafka__Producer, initTransactions, arginfo_kafka_init_transactions, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Producer, beginTransaction, arginfo_kafka_begin_transaction, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Producer, commitTransaction, arginfo_kafka_commit_transaction, ZEND_ACC_PUBLIC)
PHP_ME(RdKafka__Producer, abortTransaction, arginfo_kafka_abort_transaction, ZEND_ACC_PUBLIC)
#endif
PHP_FE_END
};

#define COPY_CONSTANT(name) \
REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT)

Expand Down Expand Up @@ -1070,18 +938,18 @@ PHP_MINIT_FUNCTION(rdkafka)
kafka_object_handlers.free_obj = kafka_free;
kafka_object_handlers.offset = XtOffsetOf(kafka_object, std);

INIT_CLASS_ENTRY(ce, "RdKafka", kafka_fe);
INIT_CLASS_ENTRY(ce, "RdKafka", class_RdKafka_RdKafka_methods);
ce_kafka = zend_register_internal_class(&ce);
ce_kafka->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS;
ce_kafka->create_object = kafka_new;

zend_declare_property_null(ce_kafka, ZEND_STRL("error_cb"), ZEND_ACC_PRIVATE);
zend_declare_property_null(ce_kafka, ZEND_STRL("dr_cb"), ZEND_ACC_PRIVATE);

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", kafka_consumer_fe);
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Consumer", class_RdKafka_Consumer_methods);
ce_kafka_consumer = zend_register_internal_class_ex(&ce, ce_kafka);

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", kafka_producer_fe);
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Producer", class_RdKafka_Producer_methods);
ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka);

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Exception", NULL);
Expand Down

0 comments on commit 03ce29d

Please sign in to comment.