Permalink
Browse files

Merge pull request #4 from amorton/doc-changes

Doc changes
  • Loading branch information...
2 parents 71e7fef + e27c3b0 commit b4c0a0b1a923a77b221356d33aaa9bdde756d543 @ormsbee ormsbee committed Jan 11, 2012
Showing with 21 additions and 13 deletions.
  1. +5 −2 README.md
  2. +16 −11 brod/test/test_zk.py
View
@@ -11,10 +11,13 @@ It's named after Max Brod, Franz Kafka's friend and supporter.
You need to have access to your Kafka instance and be able to connect through
TCP. You can obtain a copy and instructions on how to setup kafka at
-https://github.com/kafka-dev/kafka
+http://incubator.apache.org/kafka/
## Installation
-easy_install brod
+
+`easy_install brod`
+
+**Note:** the `zc.zk` package has a dependency on Python Zoo Keeper bindings which are not included during it's installation. They can be installed with `easy_install zc-zookeeper-static` see the `zc.zk` documentation for more information http://pypi.python.org/pypi/zc.zk/0.5.
## Usage
View
@@ -112,6 +112,11 @@ def total_partitions(self):
ZK_PORT = 2182
ZK_CONNECT_STR = "localhost:{0}".format(ZK_PORT)
+# Messages are not available to clients until they have been flushed.
+# By default is is 1000ms, see log.default.flush.interval.ms in
+# server.properties
+MESSAGE_DELAY_SECS = (1000 * 2) / 1000
+
log = logging.getLogger("brod")
class KafkaServer(object):
@@ -319,7 +324,7 @@ def test_3x5_consumer_rebalancing():
k = Kafka("localhost", kafka_server.kafka_config.port)
for topic in ["t1", "t2", "t3"]:
k.produce(topic, ["bootstrap"], 0)
- time.sleep(1)
+ time.sleep(MESSAGE_DELAY_SECS)
producer = ZKProducer(ZK_CONNECT_STR, "t1")
assert_equals(len(producer.broker_partitions), topology_3x5.total_partitions,
@@ -331,15 +336,15 @@ def test_3x5_consumer_rebalancing():
c2 = ZKConsumer(ZK_CONNECT_STR, "group_3x5", "t1")
assert_equals(len(c2.broker_partitions), (topology_3x5.total_partitions) / 2)
- time.sleep(1)
+ time.sleep(MESSAGE_DELAY_SECS)
assert_equals(len(set(c1.broker_partitions + c2.broker_partitions)),
topology_3x5.total_partitions,
"We should have all broker partitions covered.")
c3 = ZKConsumer(ZK_CONNECT_STR, "group_3x5", "t1")
assert_equals(len(c3.broker_partitions), (topology_3x5.total_partitions) / 3)
- time.sleep(1)
+ time.sleep(MESSAGE_DELAY_SECS)
assert_equals(sum(len(c.broker_partitions) for c in [c1, c2, c3]),
topology_3x5.total_partitions,
"All BrokerPartitions should be accounted for.")
@@ -357,7 +362,7 @@ def test_3x5_consumers():
assert_equals(len(result), 0, "This shouldn't error, but it should be empty")
send_to_all_partitions("topic_3x5_consumers", ["hello"])
- time.sleep(1.5)
+ time.sleep(MESSAGE_DELAY_SECS)
# This should grab "hello" from every partition and every topic
# c1.rebalance()
@@ -380,7 +385,7 @@ def test_3x5_zookeeper_invalid_offset():
autocommit=True)
send_to_all_partitions("topic_3x5_zookeeper_invalid_offset", ["hello"])
- time.sleep(1)
+ time.sleep(MESSAGE_DELAY_SECS)
# The following fetch will also save the ZK offset (autocommit=True)
result = c1.fetch()
@@ -391,7 +396,7 @@ def test_3x5_zookeeper_invalid_offset():
bps_to_fake_offsets = dict((bp, 1000) for bp in c1.broker_partitions)
z1.save_offsets_for(c1.consumer_group, bps_to_fake_offsets)
c1.close()
- time.sleep(1)
+ time.sleep(MESSAGE_DELAY_SECS)
# Now delete c1, and create c2, which will take over all of it's partitions
c2 = ZKConsumer(ZK_CONNECT_STR,
@@ -403,7 +408,7 @@ def test_3x5_zookeeper_invalid_offset():
c2.fetch()
send_to_all_partitions("topic_3x5_zookeeper_invalid_offset", ["world"])
- time.sleep(1.5)
+ time.sleep(MESSAGE_DELAY_SECS)
result = c2.fetch()
assert result
@@ -433,7 +438,7 @@ def test_3x5_reconnects():
partitions that might belong to them.
"""
send_to_all_partitions("topic_3x5_reconnects", ["Rusty"])
- time.sleep(2)
+ time.sleep(MESSAGE_DELAY_SECS)
c1 = ZKConsumer(ZK_CONNECT_STR, "group_3x5_reconnects", "topic_3x5_reconnects")
result = c1.fetch()
@@ -443,14 +448,14 @@ def test_3x5_reconnects():
# Now send another round of messages to our broker partitions
send_to_all_partitions("topic_3x5_reconnects", ["Jack"])
- time.sleep(2)
+ time.sleep(MESSAGE_DELAY_SECS)
# Disable rebalancing to force the consumer to read from the broker we're
# going to kill, and then kill it.
c1.disable_rebalance()
fail_server = RunConfig.kafka_servers[0]
fail_server.stop()
- time.sleep(2)
+ time.sleep(MESSAGE_DELAY_SECS)
# A straight fetch will give us a connection failure because it couldn't
# reach the first broker. It won't increment any of the other partitions --
@@ -467,7 +472,7 @@ def test_3x5_reconnects():
# Now we restart the failed Kafka broker, and do another fetch...
fail_server.start()
- time.sleep(2)
+ time.sleep(MESSAGE_DELAY_SECS)
result = c1.fetch()
# This should have MessageSets from all brokers (they're all reachable)

0 comments on commit b4c0a0b

Please sign in to comment.