# Model Service Testing
In this notebook, we will setup a Kafka consumer and producer, and we will setup a few pre-trained models that will interact with our streamed data.

This use case illustrates how we can proccess transcripts of live calls in realtime. We will use some simple sentiment analysis and POS tagging to catagorize which calls are going well and which calls are going poorly.

In [None]:
from kafka import KafkaConsumer
from kafka import KafkaProducer
import nltk
import flair
import json

## Kafka Setup
This notebook needs to both receive data and send out data, therefore we will need two Kafka topics. Our `from_topic` is the in the form of small snippets of text. We will want to produce to the `to_topic` in the form of a json object of our model output.

We define our Kafka producer and consumer bellow.

In [None]:
brokers = 'odh-message-bus-kafka-bootstrap.opf-kafka.svc:9092'
from_topic = 'audio-decoder.decoded-speech'
to_topic = 'audio-decoder.sentiment-text'

consumer = KafkaConsumer(from_topic, bootstrap_servers=brokers, group_id="default")
producer = KafkaProducer(bootstrap_servers=brokers)

## Model Setup
Here is where we would either build, import, or in our case download our models. We will be using a pre-trianed model from [Flair](https://github.com/flairNLP/flair) for sentiment analsysis. We will also be using [NLTK](http://www.nltk.org/) for POS tagging.

In [None]:
flair_sentiment = flair.models.TextClassifier.load('en-sentiment')
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

## Consuming the Data
Now we can start consuming and proccesing the data. To do this, we can itterate over the consumer and proccess each message individually. The message recived includes both an id value and the decoded text. The id is used to differentiate between different calls.

You can also choose to specify a `consumer_id` to identify which call chunks your jupyter notebook proccesed. The beauty of using Kafka to consume, proccess, and produce data is that multiple consumers (notebbok images) can proccess the incoming data together and Kafka splits up the tasks automatically.

In [None]:
# change the consumer_id to any string
consumer_id = "DEFAULT"

In [None]:
for msg in consumer:
    if msg.value is not None:
        # first we will load in the json object
        obj_in = json.loads(msg.value.decode('utf-8'))
        
        if obj_in["sentence"] == "":
            continue
        
        # Using flair, we create a sentence and predict its sentiment.
        s = flair.data.Sentence(obj_in["sentence"])
        flair_sentiment.predict(s)
        
        # Using NLTK, we tokenize the sentence and extract only the nouns
        text = nltk.word_tokenize(obj_in["sentence"])
        tokens = nltk.pos_tag(text)
        nouns = []
        for pair in tokens:
            if pair[1][:2] == 'NN':
                nouns.append(pair[0])
        
        # We complile our model outputs into an object with an ID.
        # We use an ID to track which call this text came from
        data = {
            "sentence": obj_in["sentence"],
            "quality": s.labels[0].value,
            "nouns": nouns,
            "id": obj_in["id"],
            "consumer": consumer_id
        }
        
        print(data["id"])
        print(data["sentence"])
        print(s.labels[0])
        print(data["nouns"])
        print("\n")
        
        # Now we can send this data out to our to_topic, so it
        # can be recived by our web application
        producer.send(to_topic, json.dumps(data).encode('utf-8'))

print('exiting')
