### Preliminary note:
This notebook assumes that you are running it on the cluster. You can run it on a local machine, but take into account that you need to modify the addresses of zookeeper and the brokers


# Kafka

**Kafka** is a distributed publish-subscribe message system focused on high throughput. All messages ingested by Kafka are persisted on disk and are also replicated within a given cluster to garantee fault-tolerance and thus prevent data loss. However, Kafka relies on another service, **ZooKeeper** which offers Kafka the synchronisation information it needs to run properly in a distributed way.

The benefits of Kafka are thus : 

- *Reliability* as it is distributed, partitioned, replicated and offers fault tolerance.
- *Scalability* as we can either increase or descrease the actual size of a Kafka cluster on demand and without downtime to best fit the actual load.
- *Durabiliy* as all messages are persisted on disk.
- *Performance* as it is designed for High Throughput.

What can Kafka be used for ?

- *Log aggregation* even though other systems may even be more appropriate for that purpose now.
- *Metrics* as it can ingest vasts amounts of data, collecting metrics from multiple servers was a strong point for monitoring system even though, again, other systems do exist now for such scenarios.
- *Stream Processing* Now we are talking. Framework like Storm and Spark Streaming can take advantage of Kafka to process information in a streaming fashion thus offering us different means of making data analysis. Indeed, in MapReduce, you only do Batch-Processing, you cannot handle continuous influx of information without having to invest lots of time in automating your submissions were in Spark Streaming for instance, you can make an application run permanently and analysing data as it arrives.

# Kafka concepts

Using Kafka, you have to be familiar with the concepts it uses such as *topics*, *brokers*, *producers* and *consumers*.

![alt text](https://www.tutorialspoint.com/apache_kafka/images/fundamentals.jpg "Kafka Architecture with a replication factor of 3")

- **Topic** A stream of messages belonging to a particular category is called a topic. Data is stored in topics.
- **Partition** Topics are split into partitions. For each topic, Kafka keeps a minimum of one partition. Each such partition contains messages in an immutable ordered sequence. **Partition offset ** Each partitioned message has a unique sequence id called as offset.
- **Replicas** Replicas are nothing but backups of a partition. They are used to prevent data loss.
- **Broker** Brokers are worker processes that are responsible for maintaining the published data (accespting data form produces, serving it to consumers). Each broker may have zero or more partitions per topic. 
- **Kafka cluster** The set of all Kafka brokers. A Kafka cluster can be expanded without downtime. These clusters are used to manage the persistence and replication of message data.
- **Producers** are Publishers of messages
- **Consumers** are Consumers of messages
- **Leader** Node responsible for all reads and write for a given partition, thus, each partition has one broker acting as leader.
- **Follower** Node which just follows the **Leader** instructions. A Follower may become a Leader if the node attributed the role of Leader fails at some point. In practise, a Follower acts just as a consumer, consuming data from its Leader to maintain its own data store.
- **ZooKeeper** ZooKeeper is used for managing and coordinating Kafka broker. ZooKeeper service is mainly used to notify producer and consumer about the presence of any new broker in the Kafka system or failure of the broker in the Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker then pro-ducer and consumer takes decision and starts coordinating their task with some other broker.


# Using Kafka from Command Line

## Starting Kafka

On your computer or on the VM, you need to start zookeeper and kafka as explained in the instruction file. Since Jupyter Notebooks do not support background processes, you have to run them from a terminal:


In [None]:
# $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties 
# $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties 



## Listing Topics

In [1]:
# The kafka-topics command can be used to list available topics, create new ones, and delete existing ones
# we have to specify the host+port where zookeeper is running (since zookeeper keeps the metadata of all topics)
# note: we pipe stderr to /dev/null, otherwise you'll get lots of INFO log messages 
# (when kafka is installed on your machine this is normally not necessary)
!kafka-topics.sh --list --zookeeper localhost:2181 2>/dev/null

## Creating a topic

In [2]:
# Let's create a new topic.
#
# 
# Here we create a topic <username>.test1, which is split into 5 partitions, 
# which each partition replicated three times
#
# If you want to increase the replication factor, you have to start more than one instance of kafka brokers as
# explained in the instruction file.
!kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 5 --topic "$USER.test1" 2>/dev/null

Created topic bigdata.test1.


In [3]:
!kafka-topics.sh --list --zookeeper localhost:2181 2>/dev/null

bigdata.test1


## Removing a topic

In [4]:
!kafka-topics.sh --delete --zookeeper localhost:2181 --topic "$USER.test1" 2>/dev/null
!kafka-topics.sh --list --zookeeper localhost:2181 2>/dev/null 

Topic bigdata.test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


## Starting a producer - With file input

**Attention:** the following examples assumes that you have a folder "INFOH515/books" in your current working directory.
We created this folder and put example books in that folder in the 1st lab session. Only execute the following command if you deleted this folder+files in the meantime

In [None]:
!mkdir ./INFOH515
!mkdir ./INFOH515/books

!wget --quiet http://www.gutenberg.org/cache/epub/20417/pg20417.txt -O ./INFOH515/books/pg20417.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20418/pg20418.txt -O ./INFOH515/books/pg20418.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20419/pg20419.txt -O ./INFOH515/books/pg20419.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20420/pg20420.txt -O ./INFOH515/books/pg20420.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20421/pg20421.txt -O ./INFOH515/books/pg20421.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20422/pg20422.txt -O ./INFOH515/books/pg20422.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20423/pg20423.txt -O ./INFOH515/books/pg20423.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20424/pg20424.txt -O ./INFOH515/books/pg20424.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20425/pg20425.txt -O ./INFOH515/books/pg20425.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20426/pg20426.txt -O ./INFOH515/books/pg20426.txt
!echo "Books downloaded in ./INFOH515/books" 

In [None]:
# First re-create the topic
!kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic "$USER.test1" 2>/dev/null

# Now publish messages on this topic
# kafka-console-producer is a shell command that reads from stdin and published every line read
# on the specified topic as a separate message
# to use it, we need to specify the address of at least one kafka broker in our cluster 
# (in our case: localhost at port 9092, see the $KAFKA_HOME/config/server.properties file)
!kafka-console-producer.sh --broker-list localhost:9092 --topic "$USER.test1" < ./INFOH515/books/pg20417.txt 2>/dev/null

## Starting a consumer - From the beginning

In [None]:
# Now let's read all message published on the topic, starting from the beginning
# kafka-console-consumer reads from the topic, and prints every message on std out
# again, we need to specify the address of at least one kafka broker in our cluster
# (in this notebook we are using only one broker)
# the --from-beginning flag means that we start readding from the beginning of the stream, 
# instead of reading from the end (waiting for new items to arrive)
#
# NOTE: this command will keep waiting for new messages to appear on the kafka topic and will 
#       hence never terminate. 
#       You'll need to cancel it (click on  "Kernel" the menu bar, and then on "interrupt") 
#       before going to the next one
#
# NOTE: If you look at the text that is output, you will see that some sentences may appear in an incorrect order
#       This is due to the fact that we partitioned the topic into 5 partitions and we read from all 5 of 
#       these partititions in parallel. While the order is preserved inside a paritition, it is not preserved
#       across partitions
!kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$USER.test1" --from-beginning 2>/dev/null

## Describing a topic

In [None]:
!kafka-topics.sh --describe --zookeeper localhost:2181 --topic "$USER.test1" 2>/dev/null

### Consuming a topic from a given offset
We can also start consuming a topic at a specific offset. If you run the following command, you'll see that the first 10 lines are missing

In [None]:
# Interrupt the execution of this cell if it takes too long!
!kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $USER.test1 --partition 0 --offset 10 2>/dev/null

## Producing through Python

In [None]:
!kafka-topics.sh --delete --zookeeper localhost:2181 --topic "$USER.test1" 2>/dev/null
!kafka-topics.sh --list --zookeeper localhost:2181 2>/dev/null 
!kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic "$USER.test1" 2>/dev/null

In [None]:

import os
import pwd
from kafka import KafkaProducer

topic = pwd.getpwuid( os.getuid() )[ 0 ] + ".test1"
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])

producer.send(topic, b'Hello world !!!')
producer.send(topic, b'Ok, go check the consumer !')
producer.send(topic, b'Bye!')
print("Messsage sent to topic : "+topic)

# you can also send messages as key/value pairs. 
# It is ensured that all messages with the same key (if it is not None) will end up in the same partition.
# The key needs to be of type bytes or bytearray 
producer.send(topic, key=b"some_key_1", value=b'Hello world with a key!')
producer.flush()
producer.close()

## Consuming through Python

In [None]:
# Note : this script keeps running waiting for new messages. Interrupt the kernel to abort it!
import os
import pwd
from kafka import KafkaConsumer

topic = pwd.getpwuid( os.getuid() )[ 0 ] + ".test1"

# if you do not specify auto_offset_reset, you resume consuming where you left off last time
consumer = KafkaConsumer(topic, bootstrap_servers=["localhost:9092"], auto_offset_reset='earliest')

print("Waiting for data to consume from topic %s..." % topic)

for message in consumer:
    print(message.topic, message.partition, message.key, message.value)
    
# If you prefer to consume JSON messages that could be more practical
# KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))      

# If you desire to consume a specific pattern of topic
# consumer = KafkaConsumer()
# consumer.subscribe(pattern='^awesome.*')

# Exercises

**Note**:The Kafka server is not secured with per user access rules. So, to avoid any collision or interferences between you, we advise you to name your topics using systematically your NETID as prefix !!!

1. Create a new Kafka topic named `<userid>.books` where `<userid>` is your netid
2. Write a Python script that will read all books downloaded previously and store them in the kafka topic `<userid>.books` where `userid` is your netid. Make sure that all lines of the same book are put in the same topic partition. (Hint: use the fact that messages with the same key are put in the same partition.)
3. Create a new Kafka topic names `<userid>.book-count` where <userid> is your netid
4. Write a python script that will read from the Kafka `<userid>.books` topic and count the number of lines in each book. Whenever a count is updated, it publishes a message `(bookname, current_count)` to `<userid>.book-count`

*Important*: for the last python script, pass the options `auto_offset_reset='earliest'` and `enable_auto_commit=False` when creating the KafkaConsumer object, this ensures that you will be able to re-read the stream if you re-execute the python stream multiple times.