# Spark stream processing from kafka source

I have used following two docker images to setup the kafka stream source and run the saprk streaming jupyter notebook.
*  ches/kafka
*  avikdatta/sparkjupyterdockerimage

### 1. Setup kafka stream source

#### A. Create a docker network
```bash
docker network create kafka-net
```

#### B. Setup Kafka data stream
* Setup zookeeper
```bash
docker run \
 -d \
 --name zookeeper \
 --network kafka-net zookeeper:3.4
```
* Create kafka server
```bash
 docker run \
  -d 
  --name kafka \
  --network kafka-net \
  --env ZOOKEEPER_IP=zookeeper ches/kafka
```
* Create kafka topic
```bash
 docker run \
  --rm \
  --network kafka-net ches/kafka kafka-topics.sh \
  --create --topic test \
  --replication-factor 1 \
  --partitions 1 \
  --zookeeper zookeeper:2181
```

### 2. Start data stream source
Run the following command from a new terminal.
```bash
 docker run \
  --rm --interactive \
  --network kafka-net ches/kafka \
  kafka-console-producer.sh \
  --topic test \
  --broker-list kafka:9092
```
Now type new input messages when its running

`<Type new message>`

### 3.  Start spark jupyter notebook
Run following command from another terminal.

#### A. Download docker image
```bash
docker pull avikdatta/sparkjupyterdockerimage
```
#### B. Start notebook server

```bash
 docker run    \
   --rm  \
   --network kafka-net \
   -d \
   -p 8887:8887 \
   -p 4040:4040 \
   --name spark-client \
   avikdatta/sparkjupyterdockerimage \
   jupyter-notebook \
   --ip=0.0.0.0 \
   --port=8887 \
   --no-browser
```
#### C. Access notebook server
Use following address to connect to notebook server

`http://<DOCKER HOST IP ADDRESS>:8887`

### 4. Process data stream from notebook
Open a new python notebook page and run the following codes.

In [1]:
import os
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

Set Kafka package information for Spark

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

In [3]:
sc = SparkContext("local[2]", "KafkaStream")
sc.setLogLevel("WARN")

In [4]:
ssc = StreamingContext(sc, 20)

In [5]:
kafkaStream = KafkaUtils.\
              createDirectStream(ssc, \
                                 ["test"], \
                                 {"metadata.broker.list": 'kafka:9092'})

Transform stream data

In [6]:
kafkaStream.pprint()

Start stream processing

In [7]:
ssc.start()
ssc.awaitTermination()

It will show messages typed in the kafka input stream terminal
e.g.

```bash
[(None, 'hello there')]
[(None, 'message from kafka')]
```

Stop spark context when processing is finished

In [8]:
sc.stop()