# Using Kafka Command-line Tools

### Login into Kafka Container 

Find the container id for docker
>docker ps -a

Log into the docker container with the id 
>sudo docker exec -it 2f2187cb69eb /bin/bash

Check Kafka version
>cat /build/.info



## Kafka Topics

List all Kafka Topics
>kafka-topics --list --zookeeper localhost:2181

Create Kafka Topic
> kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic hello_world


# Glossary
### Consumer Configs
#### 1. bootstrap.servers
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

# Kafka Producer

> kafka-console-producer --broker-list localhost:9092 --topic test_topics

- If you don't enter the topic name, it will create error for the first entry but than will create the topic. 
- Also note, instead of Zookeeper, you use broker list here which has a different port  (9092)
- “metadata.broker.list” defines where the Producer can find a one or more Brokers to determine the Leader for each topic. This does not need to be the full set of Brokers in your cluster but should include at least two in case the first Broker is not available. No need to worry about figuring out which Broker is the leader for the topic (and partition), the Producer knows how to connect to the Broker and ask for the meta data then connect to the correct Broker.

In [None]:
#conda install -c conda-forge librdkafka
#conda install -c conda-forge python-confluent-kafka=0.9.4
#!pip install confluent_kafka
import logging
logging.basicConfig(level=logging.DEBUG)
from confluent_kafka import Producer

some_data_source = {
    "1":"Line1",
    "2":"Line2",
    "3":"Line3",
    "4":"Line4",
    "5":"Line5"
}


p = Producer({'bootstrap.servers': 'localhost:9092,', 'api.version.request':'true'})
for key in some_data_source:
    print(key)
    p.produce('test_topics', some_data_source[key].encode('utf-8'))
p.flush()

In [None]:
#import logging
#logging.basicConfig(level=logging.DEBUG)
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10))
for _ in range(10):
    producer.send('test_topics', b'some_message_bytes')