Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate default.topic.configuration #446

Merged
merged 8 commits into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ matrix:
python: "2.7"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
before_install:
- pip install -U pip && pip install virtualenv
- brew update && brew upgrade pyenv
- pyenv install -f 2.7.15
- virtualenv -p ~/.pyenv/versions/2.7.15/bin/python ./env
Expand All @@ -26,6 +27,7 @@ matrix:
python: "3.6"
env: LD_LIBRARY_PATH="$PWD/tmp-build/lib" LIBRDKAFKA_VERSION=v0.11.5
before_install:
- pip install -U pip && pip install virtualenv
- brew update && brew upgrade pyenv
- pyenv install -f 3.6.5
- virtualenv -p ~/.pyenv/versions/3.6.5/bin/python ./env
Expand All @@ -43,8 +45,8 @@ matrix:
services: docker

install:
- pip install -U pip && pip install virtualenv
- if [[ $TRAVIS_OS_NAME == "osx" ]]; then python -m ensurepip && virtualenv /tmp/venv && source /tmp/venv/bin/activate ; fi
- pip install -U pip
- if [[ -z $CIBW_BEFORE_BUILD ]]; then pip install pytest-timeout flake8 ; fi
- if [[ -z $CIBW_BEFORE_BUILD ]]; then rm -rf tmp-build ; tools/bootstrap-librdkafka.sh --require-ssl ${LIBRDKAFKA_VERSION} tmp-build ; fi
- if [[ -n $TRAVIS_TAG && -n $CIBW_BEFORE_BUILD ]]; then pip install cibuildwheel; fi
Expand Down
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])
Expand Down
122 changes: 21 additions & 101 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1360,79 +1360,12 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
return 0;
}



/**
* Populate topic conf from provided dict.
*
* Will raise an exception on error and return -1, or returns 0 on success.
*/
static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what,
PyObject *dict) {
Py_ssize_t pos = 0;
PyObject *ko, *vo;

if (!PyDict_Check(dict)) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s: requires a dict", what);
return -1;
}

while (PyDict_Next(dict, &pos, &ko, &vo)) {
PyObject *ks, *ks8;
PyObject *vs, *vs8;
const char *k;
const char *v;
char errstr[256];

if (!(ks = cfl_PyObject_Unistr(ko))) {
PyErr_SetString(PyExc_TypeError,
"expected configuration property "
"value as type unicode string");
return -1;
}

if (!(vs = cfl_PyObject_Unistr(vo))) {
PyErr_SetString(PyExc_TypeError,
"expected configuration property "
"value as type unicode string");
Py_DECREF(ks);
return -1;
}

k = cfl_PyUnistr_AsUTF8(ks, &ks8);
v = cfl_PyUnistr_AsUTF8(vs, &vs8);

if (rd_kafka_topic_conf_set(tconf, k, v,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s: %s", what, errstr);
Py_XDECREF(ks8);
Py_XDECREF(vs8);
Py_DECREF(ks);
Py_DECREF(vs);
return -1;
}

Py_XDECREF(ks8);
Py_XDECREF(vs8);
Py_DECREF(ks);
Py_DECREF(vs);
}

return 0;
}



/**
* @brief Set single special producer config value.
*
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
*/
static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_delivery")) {
Expand Down Expand Up @@ -1474,7 +1407,6 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
*/
static int consumer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {

if (!strcasecmp(name, "on_commit")) {
Expand Down Expand Up @@ -1507,7 +1439,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
PyObject *args,
PyObject *kwargs) {
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *tconf;
Py_ssize_t pos = 0;
PyObject *ko, *vo;
PyObject *confdict = NULL;
Expand Down Expand Up @@ -1560,14 +1491,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}

conf = rd_kafka_conf_new();
tconf = rd_kafka_topic_conf_new();

/*
* Default config (overridable by user)
*/

/* Enable valid offsets in delivery reports */
rd_kafka_topic_conf_set(tconf, "produce.offset.report", "true", NULL, 0);
rd_kafka_conf_set(conf, "produce.offset.report", "true", NULL, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is soon being enabled by default (1.0) :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to wait? I can fix the examples in the meantime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the default.topic.config fix needs to go in v0.11.6.


/*
* Plugins must be configured prior to handling any of their configuration properties.
Expand All @@ -1583,7 +1513,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
PyErr_SetString(PyExc_TypeError,
"expected configuration property name "
"as type unicode string");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand All @@ -1597,7 +1526,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s", errstr);

rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand All @@ -1613,6 +1541,20 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
PyDict_DelItemString(confdict, "plugin.library.paths");
}

if ((vo = PyDict_GetItemString(confdict, "default.topic.config"))) {
/* TODO: uncomment for 1.0 release
PyErr_Warn(PyExc_DeprecationWarning,
"default.topic.config has being deprecated, "
"set default topic configuration values in the global dict");
*/
if (PyDict_Update(confdict, vo) == -1) {
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);
return NULL;
}
PyDict_DelItemString(confdict, "default.topic.config");
}

/* Convert config dict to config key-value pairs. */
while (PyDict_Next(confdict, &pos, &ko, &vo)) {
PyObject *ks, *ks8;
Expand All @@ -1623,35 +1565,21 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
int r = 0;

if (!(ks = cfl_PyObject_Unistr(ko))) {
PyErr_SetString(PyExc_TypeError,
"expected configuration property name "
"as type unicode string");
rd_kafka_topic_conf_destroy(tconf);
PyErr_SetString(PyExc_TypeError,
"expected configuration property name "
"as type unicode string");
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

return NULL;
}

k = cfl_PyUnistr_AsUTF8(ks, &ks8);
if (!strcmp(k, "default.topic.config")) {
if (populate_topic_conf(tconf, k, vo) == -1) {
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);
return NULL;
}
Py_XDECREF(ks8);
Py_DECREF(ks);
continue;

} else if (!strcmp(k, "error_cb")) {
if (!strcmp(k, "error_cb")) {
if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_TypeError,
"expected error_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand All @@ -1676,7 +1604,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
PyErr_SetString(PyExc_ValueError,
"expected throttle_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand All @@ -1701,7 +1628,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
PyErr_SetString(PyExc_TypeError,
"expected stats_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand Down Expand Up @@ -1739,14 +1665,13 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,

/* Special handling for certain config keys. */
if (ktype == RD_KAFKA_PRODUCER)
r = producer_conf_set_special(h, conf, tconf, k, vo);
r = producer_conf_set_special(h, conf, k, vo);
else if (ktype == RD_KAFKA_CONSUMER)
r = consumer_conf_set_special(h, conf, tconf, k, vo);
r = consumer_conf_set_special(h, conf, k, vo);
if (r == -1) {
/* Error */
Py_XDECREF(ks8);
Py_DECREF(ks);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand All @@ -1769,7 +1694,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
"expected configuration "
"property value as type "
"unicode string");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand All @@ -1785,7 +1709,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
RD_KAFKA_CONF_OK) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG,
"%s", errstr);
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(confdict);

Expand Down Expand Up @@ -1821,9 +1744,6 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
rd_kafka_conf_set_log_cb(conf, log_cb);
}

rd_kafka_topic_conf_set_opaque(tconf, h);
rd_kafka_conf_set_default_topic_conf(conf, tconf);

rd_kafka_conf_set_opaque(conf, h);

#ifdef WITH_PY_TSS
Expand Down
5 changes: 3 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ providing a dict of configuration properties to the instance constructor, e.g.::
conf = {'bootstrap.servers': 'mybroker.com',
'group.id': 'mygroup', 'session.timeout.ms': 6000,
'on_commit': my_commit_callback,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
'auto.offset.reset': 'earliest'}
consumer = confluent_kafka.Consumer(conf)

The supported configuration values are dictated by the underlying
Expand All @@ -111,7 +111,8 @@ https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
The Python bindings also provide some additional configuration properties:

* ``default.topic.config``: value is a dict of client topic-level configuration
properties that are applied to all used topics for the instance.
properties that are applied to all used topics for the instance. **DEPRECATED: **
topic configuration should now be specified in the global top-level configuration.

* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served upon calling
``client.poll()`` or ``producer.flush()``.
Expand Down
2 changes: 1 addition & 1 deletion examples/confluent_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def acked(err, msg):
'sasl.username': '<ccloud key>',
'sasl.password': '<ccloud secret>',
'group.id': str(uuid.uuid1()), # this will create a new consumer group on each invocation.
'default.topic.config': {'auto.offset.reset': 'smallest'}
'auto.offset.reset': 'earliest'
})

c.subscribe(['python-test-topic'])
Expand Down
2 changes: 1 addition & 1 deletion examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def print_usage_and_exit(program_name):
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}}
'auto.offset.reset': 'earliest'}

# Check to see if -T option exists
for opt in optlist:
Expand Down