Skip to content
Ronak Nathani edited this page Aug 29, 2016 · 8 revisions

Table of Contents

  1. Introduction
  2. Simple Use Case
  3. Requirements
  4. Dependencies
  5. Install Kafka
  6. Configure Kafka
  7. Check Kafka Configuration
  8. Kafka Manager

Introduction

Kafka is one of the most popular ingestion technologies (quickly becoming an industry standard now). It is a publish-subscribe message passing system in which the elementary unit of transmission is a single message. Messages are the smallest unit of data sent to Kafka. Basic messaging terminology is as follows-

  • Kafka maintains feeds of messages in categories called topics.
  • Processes that publish messages to a Kafka topic are called producers.
  • Processes that subscribe to topics and process the feed of published messages are called consumers.
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

To elaborate, the agents in a Kafka system are split into three roles:

  • A Producer originates messages (data); in other systems this may be called a "source" or "client". Each application that generates log data of analytic interest will be acting as a producer -- web logs, data from various third party APIs like Twitter stream, Facebook, Meetup, Apple watch, Spotify, etc.
  • A Broker (with "Kafka Server" used interchangeably) receives messages from producers, handles message persistence (The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time), and routes them to the appropriate topic queues.
  • A Consumer processes messages from topic queue. Any application which either stores data sent from producers or processes it will be a candidate for a consumer. For example, HDFS, S3, Spark Streaming or Storm are consumers.

[PARTS FROM OFFICIAL DOCUMENTATION]

A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition. Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". If a server goes down, another server is elected as a leader for the partitions for which the crashed server was the leader.

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism.

Simple use case

Consider that we are using Twitter data and we are interested in two specific hashtags - #stream (for stream processing) and #batch (for batch processing). The producer in this case would be a program that we write that collects tweets with these hashtags from Twitter API. The producer (program) will connect to the Kafka brokers and send the messages (tweets) to a particular topic. Now, say that we want to keep the tweets with the two hashtags independent of each other and that we have different consumers for each. For which, we create two topics in Kafka: topic-stream and topic-batch.

Say we want all the tweets from #stream consumed in the order they arrive from the producer, for which we create only one partition and we will have a stream processing technology as a consumer (Storm or Spark Streaming) to consume messages from topic topic-stream.

For topic topic-batch, the order for messages is not so important and we create multiple partitions. We can also have multiple producers (programs) collecting tweets from Twitter API (considering that every producer collects a non-overlapping subset of tweets - the details about how that could be done are not so important here but assuming that non-overlapping subset of tweets can be collected). If for some reason, non-overlapping subset of tweets can’t be collected then we can have just one producer (program). Not only that, we can have multiple consumers in a consumer group which can consume messages (tweets) from the topic topic-batch. Kafka brokers make sure that each consumer receives a subset of non-overlapping messages from the topic topic-batch. Each partition is consumed by exactly one consumer in the consumer group. This increases the throughput of overall consumption via parallelism.

DISCLAIMER: This is an oversimplified example missing out many details which depend on the specific use case. This is just to make you familiar with basic Kafka functionality.

It is highly recommended that you go through topic 1.1 and 1.2 in [Kafka’s original documentation] (http://kafka.apache.org/documentation.html#introduction).

Requirements

At least 3 AWS Instances Zookeeper

Dependencies

Kafka requires Java. If Java is not installed on all the machines, install with the following command

node:~$ sudo apt-get 
node:~$ sudo apt-get install openjdk-7-jdk

You can check if the installation was successful by typing the following:

node:~$ which java
/usr/bin/java
node:~$ java -version
java version “1.7.0.0_79”
OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-0ubuntu0.14.04.2)
OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)

Install Kafka

This installation process must be executed on each machine.

We will grab the kafka 0.8.2.1 version and save it to a Downloads folder. Next we will install it into our /usr/local directory and rename the folder to simply ‘kafka’

node:~$ wget http://apache.claz.org/kafka/0.8.2.1/kafka\_2.9.1-0.8.2.1.tgz -P ~/Downloads
node:~$ sudo tar zxvf ~/Downloads/kafka\_2.9.1-0.8.2.1.tgz -C /usr/local
node:~$ sudo mv /usr/local/kafka_2.9.1-0.8.2.1 /usr/local/kafka

Configure Kafka

The broker ID must be specified in the Kafka configuration file along with the Zookeeper servers.

Change bolded text in /usr/local/kafka/config/server.properties
############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0
increment this number for each node. e.g. If you are on the first node broker.id=0, if you’re on the second node broker.id=1 and so on.
############################# Socket Server Settings ######################### … … … # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181
change this to a list of zookeeper node addresses with the zookeeper port. e.g. <public_dns_1>:2181,<public_dns_1>:2181,<public_dns_1>:2181
# Timeout in ms for connecting to zookeeper … … …

Setup JMX port in /usr/local/kafka/bin/kafka-server-start.sh

node:~$ sudo nano /usr/local/kafka/bin/kafka-server-start.sh 

Add the following on the top of the file after all the comments
# … # … # … export JMX_PORT=${JMX_PORT:-9999} … … …

Now that Kafka is configured on each node, we can start the Kafka server on each machine with the following. This must be executed on each machine. The ampersand symbol at the end of the command is so that the server is executed as a daemon (background process). This allows you to close the SSH connection and still have the Kafka server continue running.

node:~$ sudo /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

Check Kafka Configuration

We can check if Kafka is running correctly by SSH-ing into each node and test out the built in Kafka producer and consumer example. We will first create a topic on one of the Kafka brokers.

node-0:~$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-topic

Created topic “my-topic”

This will create a topic that is replicated 3 times across the cluster with a 2 partitions. We can check and see if this topic is seen by other nodes by describing them on another node.

node-1:~$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic

Topic:my-topic   PartitionCount:2   ReplicationFactor:3    Configs:
  Topic: my-topic   Partition: 0   Leader: 1   Replicas: 1,0,2   Isr: 1,0,2
  Topic: my-topic   Partition: 1   Leader: 2   Replicas: 2,1,0   Isr: 2,1,0

The output contains the following information:

  • “leader” is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions

  • “replicas” is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

  • “isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader

Next let’s publish some messages on a different node.

node-2:~$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

[2015-05-19 23:45:37,947] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
…
Now enter the following text and press enter
This is my first message!

We can consume this message from any node. Let’s go back to our first node and view the message.

node-0:~$ /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-topic

This is my first message!

If you continue publishing messages on node-2 you should also see the messages consumed on node-0.

Let’s now test for fault-tolerance by killing Broker 1 on node-2 which is the leader of Partition 0 for my-topic.

node-1:~$ ps aux | grep server.properties

root     10120  0.0  0.0  67896  2200 ?        S    22:51   0:00 sudo /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
root     10121  0.6  6.5 2480612 263988 ?      Sl   22:51   0:25 java -Xmx1G -Xms1G -server -XX:+...2.10*.jar kafka.Kafka /usr/local/kafka/config/server.properties
ubuntu   10372  0.0  0.0  10460   952 pts/0    S+   23:59   0:00 grep --color=auto server.properties

node-1:~$ sudo kill -9 10121

If we go to either of the remaining nodes and describe the topic again, we’ll see a re-election of the leader for Partition 0. We should still be able to consume the message as well

node-0:~$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-topic

Topic:my-topic   PartitionCount:2   ReplicationFactor:3    Configs:
  Topic: my-topic   Partition: 0   Leader: 0   Replicas: 1,0,2   Isr: 0,2
  Topic: my-topic   Partition: 1   Leader: 2   Replicas: 2,1,0   Isr: 2,0

node-0:~$ /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-topic

This is my first message!

IMPORTANT

Kafka clients are available in various languages. You can find links to each of them here. You will be writing your producer/consumer codes using these clients - choose the client based on your language of preference.

For Kafka -> HDFS consumer, check out Camus dev.

Kafka Manager

Kafka Manager is a tool built by Yahoo to monitor/manage multiple Kafka clusters. It allows easy inspection of cluster state - topics, brokers, replica distribution, partition distribution.

You can configure it on any of the Kafka nodes.

Requirements

SBT Git Java 8

node:~$ wget https://dl.bintray.com/sbt/debian/sbt-0.13.7.deb -P ~/Downloads
node:~$ sudo dpkg -i ~/Downloads/sbt-0.13.7.deb
node:~$ sudo apt-get install sbt
node:~$ sudo apt-get install git
node:~$ sudo add-apt-repository ppa:webupd8team/java -y 
node:~$ sudo apt-get update 
node:~$ sudo apt-get install oracle-java8-installer
node:~$ git clone https://github.com/yahoo/kafka-manager.git

Change the conf file in kafka-manager/conf/application.conf
node:~$ nano ./kafka-manager/conf/application.conf
Modify the bolded text
… … … # Secret key # ~~~~~ # The secret key is used to secure cryptographics functions. # If you deploy your application to several instances be sure to use the same key! application.secret="changeme" application.secret=${?APPLICATION_SECRET} # The application languages # ~~~~~ application.langs="en" … … … logger.application=DEBUG kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
change this to a list of zookeeper node addresses with the zookeeper port. e.g. <public_dns_1>:2181,<public_dns_1>:2181,<public_dns_1>:2181
kafka-manager.zkhosts=${?ZK_HOSTS} pinned-dispatcher.type="PinnedDispatcher" pinned-dispatcher.executor="thread-pool-executor" application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"] node:~$ cd ./kafka-manager node:~$ sbt clean dist

This will generate a zip file which will be in /home/ubuntu/kafka-manager/target/universal/

node:~$ cd /home/ubuntu/kafka-manager/target/universal/
node:~$ sudo mv kafka-manager-1.3.0.4.zip /usr/local/
Install zip if the system doesn’t have zip installed
node:~$ sudo apt-get install zip node:~$ unzip /usr/local/kafka-manager-1.3.0.4.zip -d . node:~$ sudo mv ./kafka-manager-* /usr/local/kafka-manager node:~$ nano ~/.profile
Add this at the bottom
… … export KAFKA_MANAGER_HOME=/usr/local/kafka-manager node:~$ . ~/.profile node:~$ $KAFKA_MANAGER_HOME/bin/kafka-manager -Dhttp.port=9001
Note that we start on port 9001 because the default port is likely already bound by one of the other technologies we've setup.

You can check the UI on <public-dns>:9001 It will look as follows:

Click on Cluster in the navigation pane -> then click on Add Cluster

Name your cluster and type in the zookeeper hosts as localhost:2181 -> select the correct Kafka version -> Select both the checkboxes (Enable JMX Polling & Enable Logkafka) -> save. Now, click on clusters and you will see your cluster there with the number of topics and brokers.

Click on those numbers and you will see more details.

Now your Kafka Manager is all set to monitor your Kafka cluster and topics.

Clone this wiki locally