Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttle_cb support (#237) #377

Merged
merged 17 commits into from
Jun 2, 2018
Merged

Add throttle_cb support (#237) #377

merged 17 commits into from
Jun 2, 2018

Conversation

rnpridgeon
Copy link
Contributor

No description provided.

@rnpridgeon rnpridgeon requested a review from edenhill May 17, 2018 15:09
docs/index.rst Outdated
@@ -92,6 +92,9 @@ The Python bindings also provide some additional configuration properties:
* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by
poll().

* ``throttle_cb({broker_name_str, broker_id_int, throttle_time_ms_int})``: Callback for thorttled request reporting.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, but as discussed it should be rebased on adminapi branch.

Also:

  • change your code to 8 whitespace tabs
  • remove all unrelated whitespace diffs
  • and the other comments :)

@@ -945,6 +1010,10 @@ def print_usage(exitcode, reason=None):
print('=' * 30, 'Verifying Producer performance (without dr_cb)', '=' * 30)
verify_producer_performance(with_dr_cb=False)

# The throttle test is utilizing the producer.
print('=' * 30, 'Verifying throttle_cb', '=' * 30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the throttle test requires manual setup (zk, whatnot) we can't have it enabled by default in the integration tests.

If you rebase this on the adminapi you can add throttle as a test mode but remove it from the default set, you would then need to do: examples/integration_tests.py --throttle mybroker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

README.md Outdated
@@ -278,12 +278,19 @@ In order to run full test suite, simply execute:

**Run integration tests:**

To run the integration tests, uncomment the following line from `tox.ini` and add the paths to your Kafka and Confluent Schema Registry instances. You can also run the integration tests outside of `tox` by running this command from the source root.
To run the confluent-kafka-python integration tests you will need access to a running Kafka cluster. In order to execute successfully client will need create, read and write topics. Additionally you will need access to Zookeeper in order to set a request quota for the client `throttled-client`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"client throttled-client."

What is that referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's referring to the client the example sets the throttle for. I will remove that as part of moving the the example to the test README

README.md Outdated

examples/integration_test.py <kafka-broker> [<test-topic>] [<schema-registry>]
```
kafka-configs --zookeeper <zookeeper host >:< zookeeper port> \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we want this leave of detail in the top-level README, instead add a section about integration tests (and throttle) to tests/README.md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing and heading spaces from <zookeeper host > and < zookeeper port>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

README.md Outdated

Be sure to update Zookeeper, Broker and Schema Registry connection string with appropriate values. You can also run the integration tests outside of `tox` by running this command from the source root.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"source root" -> "top level directory."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@@ -1131,6 +1131,37 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
CallState_resume(cs);
}

static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id,
int throttle_time_ms, void *opaque) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation in the existing code is mixed up, but all new (and modified) C code shall use 8 whitespaces for tabs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@@ -31,6 +31,7 @@

try:
from progress.bar import Bar

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid unrelated whitespace diffs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

global throttled_requests
throttled_requests += 1

print('Request to broker %s[id=%d] throttled for %d ms' % (t_report.get('broker_name'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use proper type here and verify that object attributes are set and have the expected type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by Py_BuildValue() in confluent_kafka.c. Now that the arguments are passed as a tuple which is immutable and accessible only though the ThrottleEvent object I'm not sure if this is still needed.

@@ -386,9 +399,9 @@ def verify_producer_performance(with_dr_cb=True):
bar.finish()

print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' %
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
(msgs_produced, bytecnt / (1024 * 1024), t_produce_spent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid unrelated whitespace diffs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


print('closing consumer')
c.close()


def verify_throttle_cb():
""" Time how long it takes to produce and delivery X messages """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update docstring

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

'client.id': 'throttled_client',
'throttle_cb': throttle_cb}

p = confluent_kafka.Producer(**conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ..Producer(conf) instead, looks neater and we're moving away from the ** notation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

@rnpridgeon rnpridgeon changed the base branch from master to adminapi May 22, 2018 18:53
Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff!

Also needs a two unit tests:

  • specifying throttle_cb in the config, both positive and negative (not a callable) test.
  • instantiate a ThrottleEvent


This class is typically not user instantiated.

:ivar broker_name:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:ivar <name> <type>: <Description>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

:ivar broker_name:
:ivar broker_id:
:ivar throttle_time_ms:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

goto done;
}

ThrottleEvent_type = cfl_PyObject_lookup("confluent_kafka",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent is off from here on.
Use 8 whitespaces for tabs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

ThrottleEvent_type = cfl_PyObject_lookup("confluent_kafka",
"ThrottleEvent");

if(!ThrottleEvent_type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before paren if (

goto err;
}

args = Py_BuildValue("(sii)", broker_name, broker_id, throttle_time_ms);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is customary to express time as float seconds in Python, thus a float formatter and 1000.0 * throttle_time_ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

tests/README.md Outdated

tox -- --<test mode>

The throttle_cb integration test require an additiona step and as such are not included in the default test mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"default test modes"

kafka-configs --zookeeper <zookeeper host>:<zookeeper port> \
--alter --add-config 'request_percentage=01' \
--entity-name throttled_client --entity-type clients

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curio, is it possible to do this with alterConfigs now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly, worth a try I suppose. I'll push the other review changes first then revisit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/README.md Outdated

Once the throttle has been set you can proceed with the following command:

tox -- --throttle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also include how to remove the throttle setting from zk

tox.ini Outdated
@@ -10,7 +10,8 @@ setenv =
commands =
pip install -v .
py.test -v --timeout 20 --ignore=tmp-build --import-mode append {posargs}
#python examples/integration_test.py [yourhost:yourport] confluent-kafka-testing http://yourhost:yourport
# See tests/README.md for addition notes on testing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional


self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time_ms = throttle_time_ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a str implementation

@rnpridgeon
Copy link
Contributor Author

additional unit tests to follow

@rnpridgeon
Copy link
Contributor Author

@edenhill I believe this completes the laundry list, inviting yet another

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uses a mix of tabs and whitespaces for indention, please use 8 whitespaces only

class ThrottleEvent (object):
"""
ThrottleEvent passed as an argument to throttle_cb.
Contains details about requests throttled by the broker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It contains information pertaining to one single throttled request, don't know how to formulate that properly.

This class is typically not user instantiated.

:ivar broker_name: The hostname of the broker which throttled the request
:ivar broker_id: The broker's id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "The broker id" is used more often


:ivar broker_name: The hostname of the broker which throttled the request
:ivar broker_id: The broker's id
:ivar throttle_time_ms: The delay associated with the request(seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The amount of time the broker throttled (delayed) the request before responding (seconds)"

"""
def __init__(self, broker_name=None,
broker_id=-1,
throttle_time_ms=-1.):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't seen that notation before, why not -1.0?

Also, do we really need the default values? I think not

self.throttle_time_ms = throttle_time_ms

def __str__(self):
return "{}:{}/{}".format(self.broker_name, self.broker_id, self.throttle_time_ms)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"host:port" is a common notation which this conflicts with, also
the broker identity is formatted as "host:port/id" in librdkafka (which is what the user will see in log messages), and it isn't really clear what the throttle time is here.
I suggest being more explicit:
"broker/id throttled for X ms"

tests/README.md Outdated

tox -- --producer

The throttle_cb integration test require an additional step and as such are not included in the default test modes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'is not included'? singularis test

tests/README.md Outdated
--alter --add-config 'request_percentage=01' \
--entity-name throttled_client --entity-type clients

To remove the throttle you can execute the following.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: not .


try:
confluent_kafka.Producer(conf)
except(TypeError) as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except TypeError as e

But you should use:

  pytest.raises(TypeError):
     confluent_kafka.Producer({'throttle_cb': 1})

to have pytest handle this for you

""" Ensure we can configure a proper callback"""

def throttle_cb(throttle_event):
"""NOOP"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a pass


def test_throttleevent_instantiate():
""" Instantiate a ThrottleEvent """
assert isinstance(confluent_kafka.ThrottleEvent(), confluent_kafka.ThrottleEvent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather see you using correct parameters, and then verifying those parameters (type and value) and verifying the str:er

@rnpridgeon
Copy link
Contributor Author

19th time is the charm I always say

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there!


This class is typically not user instantiated.

:ivar broker_name: The hostname of the broker which throttled the request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add types as so:
:ivar broker_name str: The hostname..

goto done;
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

docs/index.rst Outdated
``client.poll()`` or ``producer.flush()``.

* ``throttle_cb(confluent_kafka.ThrottleEvent)``: Callback for throttled request reporting.
This callback is served upon calling ``client.poll()`` or ``producer.flush()``.

* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"by poll() or flush()"

docs/index.rst Outdated

* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll()
every ``statistics.interval.ms`` (needs to be configured separately).
Function argument ``json_str`` is a str instance of a JSON document containing statistics data.
This callback is served upon calling ``client.poll()`` or ``producer.flush()``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# validate argument type
assert isinstance(throttle_event.broker_name, str)
assert isinstance(throttle_event.broker_id, int)
assert isinstance(throttle_event.broker_name, float)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be a a very buoyant broker name that passes that assert

def verify_throttle_cb():
""" Verify throttle_cb is invoked
This test requires client quotas be configured.
See tests/README for more information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/README.md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just testing you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you passed

tests/README.md Outdated
@@ -1,8 +1,51 @@
Unit tests
==========

**WARNING**: These tests require an active Kafka cluster and will create new topics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests wont, the integration tests will, move this.

tests/README.md Outdated
--alter --add-config 'request_percentage=01' \
--entity-name throttled_client --entity-type clients

To remove the throttle you can execute the following
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this below the tox test

confluent_kafka.Producer({'throttle_cb': throttle_cb})


def test_throttle_event_types():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

tests/README.md Outdated
tox -- --throttle


To remove the throttle you can execute the following
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should not be indented (turns it into monospace).
Also remove extra newline after this line

README.md Outdated

See [tests/README.md](tests/README.md) for instructions on how to run integration tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the rendered file and you'll see that 4 space indent gives you monospace, which is not intended here. 0 indent! :)


:ivar broker_name str: The hostname of the broker which throttled the request
:ivar broker_id int: The broker id
:ivar throttle_time float: The amount of time the broker throttled (delayed) the request(seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably do: "The amount of time (in seconds) the ..", or at least add a space before the paren: "request (seconds)"

def verify_throttle_cb():
""" Verify throttle_cb is invoked
This test requires client quotas be configured.
See tests/README for more information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not fixed

testit.sh Outdated
@@ -0,0 +1,15 @@
KAFKA_HOME=$HOME/CPKAFKA
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you really mean to commit this file?

confluent_kafka.Producer({'throttle_cb': throttle_cb})


def test_throttle_event_types():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

tests/README.md Outdated
@@ -15,19 +15,19 @@ Integration tests

To run all of the integration test `modes` uncomment the following line from `tox.ini` and add the addresses to your Kafka and Confluent Schema Registry instances.

#python examples/integration_test.py {posargs} <bootstrap-servers> confluent-kafka-testing [<schema-registry-url>]
#python examples/integration_test.py <bootstrap-servers> confluent-kafka-testing [<schema-registry-url>]

You can also run the integration tests outside of `tox` by running this command from the source root directory

examples/integration_test.py <kafka-broker> [<test-topic>] [<schema-registry>]

To run individual integration test `modes` with `tox` use the following syntax
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"with tox" is no longer correct


def throttle_cb_instantiate_fail():
""" Ensure noncallables raise TypeError"""
with pytest.raises(TypeError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was told to change input type errors to ValueErrors

@edenhill
Copy link
Contributor

edenhill commented Jun 1, 2018

Suggest squashing this (on merge, or manually) into one commit.

@rnpridgeon rnpridgeon merged commit d52858c into confluentinc:adminapi Jun 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants