# Publishing back to Kafka

## Continuing from last notebook

We will revisit the previous exercise, but this time we will be publishing back results to Kafka. The idea is that we will tranform the streaming tweets into something we can use in other parts of the system. In this specific example we will filter tweets by having a specifi word, and publish only those back to another topic.

Another application in the system could then subscribe to the new topic and use it for example for showing latest tweet messages in a web page.

Spark streaming requires at least one [output operation](http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#output-operations-on-dstreams) in order to work. We used print judiciously in the last notebook, but will be using [foreachRDD](http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd) in order to send data out to Kafka. This requires a couple of functions to be defined beforehand. We will walk through them further down the notebook.

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-shell'

### Import dependencies
In addition to the previously used modules, we will install and import the kafka module. The command below will download it from the python package index repository and install it in our system.

In [None]:
!pip install kafka

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import KafkaProducer

### Create Spark context
Let's start the Spark context in local mode using all the available cores

In [None]:
sc = SparkContext('local[*]')

### Create Streaming Context

Let's create the streaming contaxt as before.

In [None]:
batchInterval = 10
ssc = StreamingContext(sc, batchInterval)

Checkpointing is still required.

In [None]:
ssc.checkpoint('/tmp')

### Connect to Kafka

In [None]:
kafka_host = 'kafka:9092'
topic = 'Twitter.live'
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {'bootstrap.servers': kafka_host})
tweets = kafkaStream.map(lambda kv: json.loads(kv[1]))

### Filter tweets by word

In [None]:
tweets_filtered = tweets.filter(lambda tweet: 'iot' in tweet['text'].lower())
tweets_filtered = tweets_filtered.map(lambda tweet: tweet['text'])

### Lazily instantiated Kafka connection
`foreachRDD` will be executed at the [driver](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-architecture.html), and once for each RDD sent in. In order to reuse connections to the Kafka broker we will have a function that creates the `KafkaProducer` instance if it does not exist, registering it as a global variable. Subsequent calls to the function will return the already instanciated Producer and therefore avoid re-crerating it for each RDD.

In [None]:
def getKafkaProducerInstance(kafka_host):
    # lazily instantiated Kafka Producer instance
    if ('kafkaProducerSingletonInstance' not in globals()):
        globals()['kafkaProducerSingletonInstance'] = KafkaProducer(bootstrap_servers=kafka_host, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    return globals()['kafkaProducerSingletonInstance']

### The output function
The `process` function will be executed each time a batch is ready, in our case it is every batchInterval. We'll collect the RDD data, fetch the Kafka Producer, and publish a new message to the broker.

In [None]:
def process(rdd_time, rdd):
    print("========= %s =========" % str(rdd_time))

    try:
        data = rdd.collect()
        producer = getKafkaProducerInstance(globals()['kafka_host'])
        
        for tweet in data:
            message = {
                    'time': rdd_time.isoformat(),
                    'tweet': tweet,
                    'filter': 'iot',
                }
            producer.send('Twitter.processed', message)
            print('Published: {}'.format(tweet))
            
        producer.flush()

    except:
        logger.exception("A problem has occured ")

In [None]:
tweets_filtered.foreachRDD(process)

## Start the streaming context

Having defined the streaming context, now we're ready to actually start it! When you run this cell, the program will start, and you'll see the result of all the `pprint` functions above appear in the output to this cell below. If you're running it outside of Jupyter (via `spark-submit`) then you'll see the output on stdout.

The `timeout` will deliberately cancel the execution after two minutes.

In [None]:
ssc.start()
ssc.awaitTermination(timeout=120)