From e39ea1f02636fa3dacc54e60f73249665ec704ba Mon Sep 17 00:00:00 2001 From: Milind L Date: Wed, 1 Feb 2023 11:51:12 +0530 Subject: [PATCH 1/3] Add set_sasl_credentials bindings across clients This binding calls internally rd_kafka_sasl_set_credentials. This binding is needed across consumers, producers, and adminclient. --- src/confluent_kafka/admin/__init__.py | 22 ++++++++++++ src/confluent_kafka/src/Admin.c | 4 +++ src/confluent_kafka/src/Consumer.c | 3 ++ src/confluent_kafka/src/Producer.c | 3 ++ src/confluent_kafka/src/confluent_kafka.c | 42 +++++++++++++++++++++++ src/confluent_kafka/src/confluent_kafka.h | 3 +- tests/test_misc.py | 17 +++++++++ 7 files changed, 93 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index c15bf0e25..9970f6f07 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -787,3 +787,25 @@ def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **k super(AdminClient, self).alter_consumer_group_offsets(alter_consumer_group_offsets_request, f, **kwargs) return futmap + + def set_sasl_credentials(self, username, password): + """ + Sets the SASL credentials used for this client. + These credentials will overwrite the old ones, and will be used the + next time the client needs to authenticate. + This method will not disconnect existing broker connections that + have been established with the old credentials. + This method is applicable only to SASL PLAIN and SCRAM mechanisms. + + :param str username: The username to set. + :param str password: The password to set. + + :returns: A dict of futures for each group, keyed by the group id. + The future result() method returns :class:`ConsumerGroupTopicPartitions`. + + :rtype: None + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeException: Invalid input. + """ + super(AdminClient, self).set_sasl_credentials(username, password) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 210a011c0..27f209143 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2152,6 +2152,10 @@ static PyMethodDef Admin_methods[] = { Admin_delete_acls_doc }, + { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, + set_sasl_credentials_doc + }, + { NULL } }; diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 66f5b7540..6bbadb9ca 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1477,6 +1477,9 @@ static PyMethodDef Consumer_methods[] = { "send_offsets_to_transaction() API.\n" "\n" }, + { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, + set_sasl_credentials_doc + }, { NULL } diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index caa0c6084..b6a51f510 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -811,6 +811,9 @@ static PyMethodDef Producer_methods[] = { " Treat any other error as a fatal error.\n" "\n" }, + { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, + set_sasl_credentials_doc + }, { NULL } }; diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 16ff496a3..9a7f2584c 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -2578,6 +2578,48 @@ PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt) { } +/**************************************************************************** + * + * + * Methods common across all types of clients. + * + * + * + * + ****************************************************************************/ + +const char set_sasl_credentials_doc[] = PyDoc_STR( + ".. py:function:: set_sasl_credentials(username, password)\n" + "\n" + " Sets the SASL credentials used for this client.\n" + " These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate.\n" + " This method will not disconnect existing broker connections that have been established with the old credentials.\n" + " This method is applicable only to SASL PLAIN and SCRAM mechanisms.\n"); + + +PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) { + const char *username = NULL; + const char *password = NULL; + rd_kafka_error_t* error; + static char *kws[] = {"username", "password", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss", kws, + &username, &password)) { + return NULL; + } + + error = rd_kafka_sasl_set_credentials(self->rk, username, password); + + if (error) { + cfl_PyErr_from_error_destroy(error); + return NULL; + } + + Py_RETURN_NONE; +} + + + /**************************************************************************** * * diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index f2768c3f3..4392b0e69 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -386,10 +386,11 @@ PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node); rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist); PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs); PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs); - +PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs); extern const char list_topics_doc[]; extern const char list_groups_doc[]; +extern const char set_sasl_credentials_doc[]; #ifdef RD_KAFKA_V_HEADERS diff --git a/tests/test_misc.py b/tests/test_misc.py index ae016a3a9..b11d09247 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -262,3 +262,20 @@ def on_delivery(err, msg): if "CI" in os.environ: pytest.xfail("Timeout exceeded") pytest.fail("Timeout exceeded") + +def test_set_sasl_credentials_api(): + clients = [ + AdminClient({}), + confluent_kafka.Consumer({"group.id": "dummy"}), + confluent_kafka.Producer({})] + + for c in clients: + c.set_sasl_credentials('username', 'password') + + c.set_sasl_credentials('override', 'override') + + with pytest.raises(TypeError): + c.set_sasl_credentials(None, 'password') + + with pytest.raises(TypeError): + c.set_sasl_credentials('username', None) From b0f0633c2d924124fd6755c3603d5d2b9b71fe02 Mon Sep 17 00:00:00 2001 From: Milind L Date: Wed, 1 Feb 2023 11:58:05 +0530 Subject: [PATCH 2/3] Add changelog entry --- CHANGELOG.md | 5 +++++ src/confluent_kafka/admin/__init__.py | 3 --- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6554d4ce7..45d6c5104 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Confluent's Python client for Apache Kafka +## vNext + +- Added `set_sasl_credentials`. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored + SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1511). + ## v2.0.2 diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 9970f6f07..8b12358ee 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -800,9 +800,6 @@ def set_sasl_credentials(self, username, password): :param str username: The username to set. :param str password: The password to set. - :returns: A dict of futures for each group, keyed by the group id. - The future result() method returns :class:`ConsumerGroupTopicPartitions`. - :rtype: None :raises KafkaException: Operation failed locally or on broker. From c526c2f0c85c4d0dc0b8871acccc87dcc2558eff Mon Sep 17 00:00:00 2001 From: Milind L Date: Wed, 1 Feb 2023 15:03:09 +0530 Subject: [PATCH 3/3] Address review comments --- src/confluent_kafka/src/confluent_kafka.c | 9 ++++++++- src/confluent_kafka/src/confluent_kafka.h | 1 + tests/test_misc.py | 1 + 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 9a7f2584c..2fab961b0 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -2601,6 +2601,7 @@ PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) { const char *username = NULL; const char *password = NULL; rd_kafka_error_t* error; + CallState cs; static char *kws[] = {"username", "password", NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss", kws, @@ -2608,8 +2609,15 @@ PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) { return NULL; } + CallState_begin(self, &cs); error = rd_kafka_sasl_set_credentials(self->rk, username, password); + if (!CallState_end(self, &cs)) { + if (error) /* Ignore error in favour of callstate exception */ + rd_kafka_error_destroy(error); + return NULL; + } + if (error) { cfl_PyErr_from_error_destroy(error); return NULL; @@ -2619,7 +2627,6 @@ PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) { } - /**************************************************************************** * * diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 4392b0e69..23fbdb0c1 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -388,6 +388,7 @@ PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs); PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs); PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs); + extern const char list_topics_doc[]; extern const char list_groups_doc[]; extern const char set_sasl_credentials_doc[]; diff --git a/tests/test_misc.py b/tests/test_misc.py index b11d09247..aca7b5a4f 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -263,6 +263,7 @@ def on_delivery(err, msg): pytest.xfail("Timeout exceeded") pytest.fail("Timeout exceeded") + def test_set_sasl_credentials_api(): clients = [ AdminClient({}),