-
Notifications
You must be signed in to change notification settings - Fork 62
Kafka advanced
First let’s create a topic where all the data will be streamed to. Here we will have a topic with 4 partitions to increase parallelized reads and writes.
any-node:~$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic price_data_part4 --partitions 4 --replication-factor 2
We can check if the topic exist with the following
any-node:~$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
Let’s now describe the topics and see who takes care of which partition
any-node:~$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 Topic:price_data_part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: price_data_part4 Partition: 0 Leader: 0 Replicas: 0,3 Isr: 0,3 Topic: price_data_part4 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0 Topic: price_data_part4 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: price_data_part4 Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2
##Create a Kafka Producer on your local machine Let’s first install the Kafka Python package
localhost:~$ sudo pip install kafka-python
Next let’s create a file named kafka_producer.py and paste the following into the file. The current script will provide a ticker that performs a random walk and tag the data with a key.
import random
import sys
import six
from datetime import datetime
from kafka.client import KafkaClient
from kafka.producer import KafkaProducer
class Producer(object):
def __init__(self, addr):
self.producer = KafkaProducer(bootstrap_servers=addr)
def produce_msgs(self, source_symbol):
price_field = random.randint(800,1400)
msg_cnt = 0
while True:
time_field = datetime.now().strftime("%Y%m%d %H%M%S")
price_field += random.randint(-10, 10)/10.0
volume_field = random.randint(1, 1000)
str_fmt = "{};{};{};{}"
message_info = str_fmt.format(source_symbol,
time_field,
price_field,
volume_field)
print message_info
self.producer.send('price_data_part4', message_info)
msg_cnt += 1
if __name__ == "__main__":
args = sys.argv
ip_addr = str(args[1])
partition_key = str(args[2])
prod = Producer(ip_addr)
prod.produce_msgs(partition_key)
The streaming data is simulated price data from several data sources. Each record contains a source, date, the last price at that time, and the number of contracts (or volume) traded at that price. The date is in the format of:
Schema:
Let’s now spawn 8 producers from your machine all in parallel. This simulates 8 different data sources. This can be done with the use of tmux. The following script spawn_kafka_streams.sh will help you perform the task. Feel free to tailor this to your use case. This will create a new session that also contains N number of windows based on the argument you send into the script.
#!/bin/bash
IP_ADDR=$1
NUM_SPAWNS=$2
SESSION=$3
tmux new-session -s $SESSION -n bash -d
for ID in `seq 1 $NUM_SPAWNS`;
do
echo $ID
tmux new-window -t $ID
tmux send-keys -t $SESSION:$ID 'python kafka_producer.py '"$IP_ADDR"' '"$ID"'' C-m
done
call the script with the following:
localhost:~$ bash spawn_kafka_streams.sh <KAFKA_BROKER_PUBLIC_IP>:9092 <NUM_SPAWNS> <SESSION_NAME> e.g. localhost:~$ bash spawn_kafka_streams.sh 52.26.67.30:9092 8 k1
If you want to see your existing sessions you can type the following
localhost:~$ tmux ls
You can attach to a tmux session with the following command
localhost:~$ tmux a -t <SESSION_NAME>:<WINDOW_NAME> e.g. localhost:~$ tmux a -t k1
Toggling between windows is as simple as using the following commands
Ctrl-b n moves to the next window indicated by an * sign on the bottom Ctrl-b p moves to the previous window indicated by an * sign on the bottom
Detaching from a session can be done with the following
Ctrl-b d detach from session
If you wish to kill all producers spawned in the tmux session you can call the following
localhost:~$ tmux kill-session -t
We can now monitor keyed messages on all 4 nodes
node1:~$ /usr/local/kafka/bin/kafka-simple-consumer-shell.sh --broker-list localhost:9092 --topic price_data_part4 --partition 0 node2:~$ /usr/local/kafka/bin/kafka-simple-consumer-shell.sh --broker-list localhost:9092 --topic price_data_part4 --partition 1 … …
You will notice that when we produce messages to the Kafka broker in a keyed fashion, each node takes care of a partition of the specified topic due to the hashing of the keys. This also reduces the write workload on each node since each node only takes care of a fraction of the incoming data.
Find out more about the Insight Data Engineering Fellows Program in New York and Silicon Valley, apply today, or sign up for program updates.
You can also read our engineering blog here.