# Data Streaming with Kafka

## Overview 

### What you'll learn

In this section, you will learn: 

1. What Kafka is
2. How to stream data from one process to another

### Prerequisites
Before starting this section, you should have an understanding of: 

1. Basic python

### Introduction

#### What is Kafka?

Kafka is a **big data streaming service** that allows multiple sources to upload and download data, such as log files or bug reports. Kafka organizes data and ensures that it is kept in order, and that all data goes where it needs to. Kafka was originally created by LinkedIn, but is now maintained by Apache.

## Kafka Concepts

### Messages

Kafka transmits data in the form of **messages**, which are small files containing any type of data. Messages are created and transmitted in real-time, and some examples of messages include:
- program crash logs
- customer support chat records
- forms received through a webpage

These messages are given to Kafka via API calls. Once a message is given to Kafka, the data is queued and stored in servers until users access the messages

### Producers and Consumers

Programs or users that create and send data are called **producers**, and programs or users that read data are called **consumers**. There are separate APIs for [producers](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html) and [consumers](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html), and you can find examples of how to use them [here](https://kafka-python.readthedocs.io/en/master/usage.html).

### Kafka Topics

Producers and consumers subscribe to Kafka topics, which are the structure Kafka uses for **data organization**. A topic is a group of related data that is separated into its own sub-stream. Organizations can pick and choose who has access to which topics, which allows companies to keep data secure by limiting the number of people who have access to it.

## Creating A Simple Kafka Program

### Setup
#### Install Zookeeper and Kafka

In [None]:
!wget http://apache-mirror.8birdsvideo.com/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
!tar -xzf apache-zookeeper-3.5.5-bin.tar.gz
!wget http://www-us.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
!tar -xzf kafka_2.12-2.3.0.tgz
!rm *.gz *.tgz

#### Start Zookeeper and Kafka

In [None]:
!kafka_2.12-2.3.0/bin/zookeeper-server-start.sh -daemon kafka_2.12-2.3.0/config/zookeeper.properties
!kafka_2.12-2.3.0/bin/kafka-server-start.sh -daemon kafka_2.12-2.3.0/config/server.properties

#### Create New Kafka Topic

In [None]:
!kafka_2.12-2.3.0/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic MyHackBUTopic

#### Verify Topic Was Created

In [None]:
!kafka_2.12-2.3.0/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

#### Install the Kafka Python Library

In [None]:
!pip3 install kafka-python

### Creating a (very simple) Kafka Producer/Consumer

The below example is an extremely simple example of a Kafka Producer/Consumer setup. Unfortunately, due to limitations with Google Colab, we cannot run these in parallel. In a normal environment, this would be split into two files, with producer.py and consumer.py running in parallel.

As the producer pushes data to the topic, every consumer that's observing the topic will pull the data in real time. The producer and consumers don't even need to be on the same computer - in fact, they usually aren't. This allows us to efficiently stream data in real-time from one point to another. This can be used for all sorts of applications including chat services and real-time social media analysis. 


In [None]:
import kafka
import json

# producer.py
producer = kafka.KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         json.dumps(x).encode('utf-8'))

for e in range(10):
    data = {'number' : e}
    producer.send('MyHackBUTopic', value=data)


# consumer.py
consumer = kafka.KafkaConsumer(
    'MyHackBUTopic',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    message = message.value
    print(message)