|
2 | 2 |
|
3 | 3 | [](https://travis-ci.org/mumrah/kafka-python) |
4 | 4 |
|
| 5 | +`Full documentation available on ReadTheDocs <http://kafka-python.readthedocs.org/en/latest/>`_. |
| 6 | + |
5 | 7 | This module provides low-level protocol support for Apache Kafka as well as |
6 | 8 | high-level consumer and producer classes. Request batching is supported by the |
7 | 9 | protocol as well as broker-aware request routing. Gzip and Snappy compression |
@@ -32,233 +34,3 @@ Python versions |
32 | 34 | - 2.7 (tested on 2.7.8) |
33 | 35 | - pypy (tested on pypy 2.3.1 / python 2.7.6) |
34 | 36 | - (Python 3.3 and 3.4 support has been added to trunk and will be available the next release) |
35 | | - |
36 | | -# Usage |
37 | | - |
38 | | -## High level |
39 | | - |
40 | | -```python |
41 | | -from kafka import KafkaClient, SimpleProducer, SimpleConsumer |
42 | | - |
43 | | -# To send messages synchronously |
44 | | -kafka = KafkaClient("localhost:9092") |
45 | | -producer = SimpleProducer(kafka) |
46 | | - |
47 | | -# Note that the application is responsible for encoding messages to type str |
48 | | -producer.send_messages("my-topic", "some message") |
49 | | -producer.send_messages("my-topic", "this method", "is variadic") |
50 | | - |
51 | | -# Send unicode message |
52 | | -producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) |
53 | | - |
54 | | -# To send messages asynchronously |
55 | | -# WARNING: current implementation does not guarantee message delivery on failure! |
56 | | -# messages can get dropped! Use at your own risk! Or help us improve with a PR! |
57 | | -producer = SimpleProducer(kafka, async=True) |
58 | | -producer.send_messages("my-topic", "async message") |
59 | | - |
60 | | -# To wait for acknowledgements |
61 | | -# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to |
62 | | -# a local log before sending response |
63 | | -# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed |
64 | | -# by all in sync replicas before sending a response |
65 | | -producer = SimpleProducer(kafka, async=False, |
66 | | - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, |
67 | | - ack_timeout=2000) |
68 | | - |
69 | | -response = producer.send_messages("my-topic", "another message") |
70 | | - |
71 | | -if response: |
72 | | - print(response[0].error) |
73 | | - print(response[0].offset) |
74 | | - |
75 | | -# To send messages in batch. You can use any of the available |
76 | | -# producers for doing this. The following producer will collect |
77 | | -# messages in batch and send them to Kafka after 20 messages are |
78 | | -# collected or every 60 seconds |
79 | | -# Notes: |
80 | | -# * If the producer dies before the messages are sent, there will be losses |
81 | | -# * Call producer.stop() to send the messages and cleanup |
82 | | -producer = SimpleProducer(kafka, batch_send=True, |
83 | | - batch_send_every_n=20, |
84 | | - batch_send_every_t=60) |
85 | | - |
86 | | -# To consume messages |
87 | | -consumer = SimpleConsumer(kafka, "my-group", "my-topic") |
88 | | -for message in consumer: |
89 | | - # message is raw byte string -- decode if necessary! |
90 | | - # e.g., for unicode: `message.decode('utf-8')` |
91 | | - print(message) |
92 | | - |
93 | | -kafka.close() |
94 | | -``` |
95 | | - |
96 | | -## Keyed messages |
97 | | -```python |
98 | | -from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner |
99 | | - |
100 | | -kafka = KafkaClient("localhost:9092") |
101 | | - |
102 | | -# HashedPartitioner is default |
103 | | -producer = KeyedProducer(kafka) |
104 | | -producer.send("my-topic", "key1", "some message") |
105 | | -producer.send("my-topic", "key2", "this methode") |
106 | | - |
107 | | -producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) |
108 | | -``` |
109 | | - |
110 | | -## Multiprocess consumer |
111 | | -```python |
112 | | -from kafka import KafkaClient, MultiProcessConsumer |
113 | | - |
114 | | -kafka = KafkaClient("localhost:9092") |
115 | | - |
116 | | -# This will split the number of partitions among two processes |
117 | | -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) |
118 | | - |
119 | | -# This will spawn processes such that each handles 2 partitions max |
120 | | -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", |
121 | | - partitions_per_proc=2) |
122 | | - |
123 | | -for message in consumer: |
124 | | - print(message) |
125 | | - |
126 | | -for message in consumer.get_messages(count=5, block=True, timeout=4): |
127 | | - print(message) |
128 | | -``` |
129 | | - |
130 | | -## Low level |
131 | | - |
132 | | -```python |
133 | | -from kafka import KafkaClient, create_message |
134 | | -from kafka.protocol import KafkaProtocol |
135 | | -from kafka.common import ProduceRequest |
136 | | - |
137 | | -kafka = KafkaClient("localhost:9092") |
138 | | - |
139 | | -req = ProduceRequest(topic="my-topic", partition=1, |
140 | | - messages=[create_message("some message")]) |
141 | | -resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) |
142 | | -kafka.close() |
143 | | - |
144 | | -resps[0].topic # "my-topic" |
145 | | -resps[0].partition # 1 |
146 | | -resps[0].error # 0 (hopefully) |
147 | | -resps[0].offset # offset of the first message sent in this request |
148 | | -``` |
149 | | - |
150 | | -# Install |
151 | | - |
152 | | -Install with your favorite package manager |
153 | | - |
154 | | -## Latest Release |
155 | | -Pip: |
156 | | - |
157 | | -```shell |
158 | | -pip install kafka-python |
159 | | -``` |
160 | | - |
161 | | -Releases are also listed at https://github.com/mumrah/kafka-python/releases |
162 | | - |
163 | | - |
164 | | -## Bleeding-Edge |
165 | | -```shell |
166 | | -git clone https://github.com/mumrah/kafka-python |
167 | | -pip install ./kafka-python |
168 | | -``` |
169 | | - |
170 | | -Setuptools: |
171 | | -```shell |
172 | | -git clone https://github.com/mumrah/kafka-python |
173 | | -easy_install ./kafka-python |
174 | | -``` |
175 | | - |
176 | | -Using `setup.py` directly: |
177 | | -```shell |
178 | | -git clone https://github.com/mumrah/kafka-python |
179 | | -cd kafka-python |
180 | | -python setup.py install |
181 | | -``` |
182 | | - |
183 | | -## Optional Snappy install |
184 | | - |
185 | | -### Install Development Libraries |
186 | | -Download and build Snappy from http://code.google.com/p/snappy/downloads/list |
187 | | - |
188 | | -Ubuntu: |
189 | | -```shell |
190 | | -apt-get install libsnappy-dev |
191 | | -``` |
192 | | - |
193 | | -OSX: |
194 | | -```shell |
195 | | -brew install snappy |
196 | | -``` |
197 | | - |
198 | | -From Source: |
199 | | -```shell |
200 | | -wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz |
201 | | -tar xzvf snappy-1.0.5.tar.gz |
202 | | -cd snappy-1.0.5 |
203 | | -./configure |
204 | | -make |
205 | | -sudo make install |
206 | | -``` |
207 | | - |
208 | | -### Install Python Module |
209 | | -Install the `python-snappy` module |
210 | | -```shell |
211 | | -pip install python-snappy |
212 | | -``` |
213 | | - |
214 | | -# Tests |
215 | | - |
216 | | -## Run the unit tests |
217 | | - |
218 | | -```shell |
219 | | -tox |
220 | | -``` |
221 | | - |
222 | | -## Run a subset of unit tests |
223 | | -```shell |
224 | | -# run protocol tests only |
225 | | -tox -- -v test.test_protocol |
226 | | -``` |
227 | | - |
228 | | -```shell |
229 | | -# test with pypy only |
230 | | -tox -e pypy |
231 | | -``` |
232 | | - |
233 | | -```shell |
234 | | -# Run only 1 test, and use python 2.7 |
235 | | -tox -e py27 -- -v --with-id --collect-only |
236 | | -# pick a test number from the list like #102 |
237 | | -tox -e py27 -- -v --with-id 102 |
238 | | -``` |
239 | | - |
240 | | -## Run the integration tests |
241 | | - |
242 | | -The integration tests will actually start up real local Zookeeper |
243 | | -instance and Kafka brokers, and send messages in using the client. |
244 | | - |
245 | | -First, get the kafka binaries for integration testing: |
246 | | -```shell |
247 | | -./build_integration.sh |
248 | | -``` |
249 | | -By default, the build_integration.sh script will download binary |
250 | | -distributions for all supported kafka versions. |
251 | | -To test against the latest source build, set KAFKA_VERSION=trunk |
252 | | -and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) |
253 | | -```shell |
254 | | -SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh |
255 | | -``` |
256 | | - |
257 | | -Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` |
258 | | -env variable to the server build you want to use for testing: |
259 | | -```shell |
260 | | -KAFKA_VERSION=0.8.0 tox |
261 | | -KAFKA_VERSION=0.8.1 tox |
262 | | -KAFKA_VERSION=0.8.1.1 tox |
263 | | -KAFKA_VERSION=trunk tox |
264 | | -``` |
0 commit comments