# Live sentiment analysis - Demo day

This notebook is the main entry point to run the live sentiment analysis by using the Twitter API and the dashboarding tools selected so far, as well as a messages broker to process the incoming Twitter data.

The diagram below shows the overall architecture of the services to run.

![Architecture diagram](./imgs/architecture.png "General Architecture")

[Link to original diagram (read only)](https://drive.google.com/file/d/1Y2t8Xp8DRvXdSBzT_klcOaPPNTXBQkDd/view?usp=sharing)

Major services will be hosted in the cloud with managed alternatives:

* Kafka: Hosted by Confluent cloud under the free tier.
* Elasticsearch: Hosted by Elastic under the free tier.

Both hosted providers make the development easier by giving guidance on how to connect and setup clients for their respective platforms.

## Python producer

A Python producer was created with the [onboarding steps](https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/python.html?utm_source=github&utm_medium=demo&utm_campaign=ch.examples_type.community_content.clients-ccloud) obtained in Confluent Cloud.

> Special steps were followed to install the `confluent-kafka` python library in an M1 machine, see [this issue's comment](https://github.com/confluentinc/confluent-kafka-python/issues/1190#issuecomment-1195952767).

Once the steps were followed in the above linked resources, the first messages were seen in Confluent cloud, these messages were not related to Twitter's data, but to simple integer messages.

![Messages produced via CLI](./imgs/cli-sample-producer.png)

The image above shows the CLI sample producer obtained from the Confluent Cloud setup page, it shows the application working properly with the installed dependencies. This base code will be employed to build the actual producer to send Tweet data into the topic.

![Messages in Confluent](./imgs/confluent-messages.png "Messages in confluent cloud")

The above image shows the produces messages in the Confluent Cloud UI.


### Producing Tweets

By using the Twitter client seen during the Project update N. 2 ([notebook](./hugging-face.ipynb)) the code below retrieves sample tweet data and produces it into the Kafka client.

The code below expects a configuration file to provide the Kafka provider with the right parameters to connect to the Confluent cluster. The `ccloud_lib` utilities file was borrowed from the original sample code from Confluent.

In [1]:
from confluent_kafka import Producer, KafkaError
import ccloud_lib
import json

In [2]:
class BaseProducer:
    """Defines the basic connectivity to reach Kafka instance hosted in Confluent Cloud"""
    def __init__(self, config_file, topic):
        """Creates a BaedProducer with the provided configuration and topic
        """

        conf = ccloud_lib.read_ccloud_config(config_file)

        producer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)
        self.producer = Producer(producer_conf)

        self.topic = topic
        ccloud_lib.create_topic(conf, topic)

        self.delivered_records = 0

    def acked(self, err, msg):
        """Delivery report handler called on
        successful or failed delivery of message
        """
        if err is not None:
            print("Failed to deliver message: {}".format(err))
        else:
            self.delivered_records += 1
            print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))

    def produce(self, values):
        for v in values:
            record_key = str(v['id'])
            record_value = json.dumps(v)
            
            print("Producing record: {}: {}".format(record_key, record_value))
            
            self.producer.produce(self.topic, key=record_key, value=record_value, on_delivery=self.acked)
            # p.poll() serves delivery reports (on_delivery)
            # from previous produce() calls.
            self.producer.poll(0)

        self.producer.flush()

        print("{} messages were produced to topic {}!".format(self.delivered_records, self.topic))


Before continuing, the `BaseProducer` needs to be somehow tested, so that it can be fixed in case we are not able to produce data. Let's start by loading some of the available sample data we have.

In [3]:
import pandas as pd

In [4]:
def sample_data(airline, filename, sample_size=10):
    aircanada_df = pd.read_csv(filename)
    aircanada_sample_df = aircanada_df.sample(sample_size)

    for i in range(aircanada_sample_df.iloc[:, 1:].shape[0]):
        row_json = aircanada_df.iloc[i, 1:].to_dict()
        row_json['airline'] = airline
        yield row_json

In [5]:
gen = sample_data('aircanada', './fresh_data/aircanada_sample_07072022_210411.csv')

In [6]:
producer = BaseProducer('./config.properties', 'twitter-data-test')

%4|1658885792.059|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1658885792.063|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance


In [7]:
producer.produce(gen)

Producing record: 1545212028247220225: {"created_at": "2022-07-08T01:03:50.000Z", "id": 1545212028247220225, "text": "RT @mcdowell_norm: @OmarAlghabra @AirCanada Best thing you can do is RESIGN!", "airline": "aircanada"}
Producing record: 1545212013227479040: {"created_at": "2022-07-08T01:03:47.000Z", "id": 1545212013227479040, "text": "@OmarAlghabra @AirCanada End the masks. End the mandates. End the ArriveCAN app. Problem solved", "airline": "aircanada"}
Producing record: 1545211993564610561: {"created_at": "2022-07-08T01:03:42.000Z", "id": 1545211993564610561, "text": "@OmarAlghabra @AirCanada Remember you recently said out -of-practice travellers  are causing delays at security checkpoints ?  I do", "airline": "aircanada"}
Producing record: 1545211956495130625: {"created_at": "2022-07-08T01:03:33.000Z", "id": 1545211956495130625, "text": "@OmarAlghabra @AirCanada You have to return to the operations that were in place in 2019 and prior. Eliminate all mandates, apps, and restriction

With the above running we can see how the twitter data becomes available inside the Kafka Topic.

![Twitter data Kafka test](./imgs/twitter-data-test.png "Twitter Kafka test")

Now let's look at the consumer code to do the sentiment analysis while reading from the topic.

## Tweets consumer

This section of the notebook focuses on reading the topic and running the sentiment analysis of the data. Remembering how the result of the classification looks, provides an idea on what is the expected output to be inserted into the topic.

In [8]:
aircanada_class_df = pd.read_csv('./classified_data/aircanada_sample_07072022_210411_sa_classified.csv')

In [9]:
aircanada_class_df.iloc[:, 1:].sample(10)

Unnamed: 0,created_at,id,text,classification,score
108,2022-07-08T00:14:38.000Z,1545199643503472640,@OmarAlghabra @AirCanada Resign you vermin,Negative,0.866527
107,2022-07-08T00:14:40.000Z,1545199654597525504,@OmarAlghabra @AirCanada if I sh1t the bed as ...,Negative,0.70969
284,2022-07-07T23:07:07.000Z,1545182652306665472,@OmarAlghabra @AirCanada @OmarAlghabra you nee...,Negative,0.954575
341,2022-07-07T22:37:15.000Z,1545175137321816065,@mnakhleh @gkarstenssmith @AirCanada One name ...,Negative,0.931161
346,2022-07-07T22:36:06.000Z,1545174850582581249,@gkarstenssmith @AirCanada There are worse pla...,Negative,0.87478
15,2022-07-08T00:56:35.000Z,1545210202487631872,RT @AmbArunSahu: Still waiting for update on m...,Neutral,0.79235
338,2022-07-07T22:37:49.000Z,1545175278841995264,Do you let customers take a properly packed pa...,Neutral,0.763792
6,2022-07-08T01:02:43.000Z,1545211747312738304,No progress or updates on my lost baggage stil...,Neutral,0.516521
14,2022-07-08T00:57:31.000Z,1545210437200793600,@OmarAlghabra @AirCanada Canceling flights is ...,Negative,0.714646
386,2022-07-07T22:09:11.000Z,1545168076664193027,@Fernand55591815 @AirCanada @AirCanadaLatAm La...,Negative,0.836714


Let's now add the code for the consumer to retrieve the data from Kafka, it's based on the code provided by Confluent to consume from their managed clusters.

The consumer is executed in a similar fashion to the producer, the `config.properties` file can work for both of them.

![Consumer CLI test](./imgs/cli-sample-consumer.png "Sample Consumer")

The consumer will continue to poll from the topic until a keyboard interruption happens.

The components needed to classify the tweets data are now ready, as the consumer will be able to pull the data and classify it to finally produce it into the new `tweets-data-classified`. The resulting schema will be the same as the original Tweets, but with a new set of properties named: `classification.sentiment` and `classification.score`.

In [10]:
from confluent_kafka import Consumer
from transformers import pipeline

  from .autonotebook import tqdm as notebook_tqdm


In [13]:
class RobertaClassifier:
    def __init__(self):

        model_path = "cardiffnlp/twitter-xlm-roberta-base-sentiment"
        self.classifier = pipeline("sentiment-analysis", model=model_path, tokenizer=model_path)

    def classify(self, text):
        rbt_result = self.classifier(text)[0]

        return rbt_result

class ClassifierConsumer:
    def __init__(self, topic, output_topic, config_file):
        self.topic = topic
        self.output_topic = output_topic

        self.total_count = 0
        
        # Dependency configuration
        self.producer = BaseProducer(config_file, self.output_topic)
        self.classifier = RobertaClassifier()

        # Consumer configuration
        conf = ccloud_lib.read_ccloud_config(config_file)

        consumer_conf = ccloud_lib.pop_schema_registry_params_from_config(conf)

        consumer_conf['group.id'] = 'notebook_classifier_cg'
        consumer_conf['auto.offset.reset'] = 'earliest'

        self.consumer = Consumer(consumer_conf)
        self.consumer.subscribe([self.topic])

    def consume(self):
        try:
            while True:
                msg = self.consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue

                elif msg.error():
                    print('error: {}'.format(msg.error()))

                else:
                    # Check for Kafka message
                    record_key = msg.key()
                    record_value = msg.value()
                    tweet_data = json.loads(record_value)

                    class_result = self.classifier.classify(tweet_data['text'])
                    sentiment = class_result['label']
                    score = class_result['score']

                    tweet_classified = {**tweet_data, 'classification': {'sentiment': sentiment, 'score': score}}
                    
                    self.producer.produce([tweet_classified])

                    print("Consumed record with key {} and ID {}".format(record_key, tweet_data['id']))

        except KeyboardInterrupt:
            pass
        finally:
            # Leave group and commit final offsets
            self.consumer.close()

Time to launch the consumer, start by creating an instance and initialize all of its dependencies.

In [16]:
input_topic = 'twitter-data-test'
output_topic = 'twitter-data-classified'

sentiment_consumer = ClassifierConsumer(input_topic, output_topic, './config.properties')

%4|1658886030.954|CONFWARN|rdkafka#producer-9| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
%4|1658886030.959|CONFWARN|rdkafka#producer-10| [thrd:app]: Configuration property session.timeout.ms is a consumer property and will be ignored by this producer instance
All model checkpoint layers were used when initializing TFXLMRobertaForSequenceClassification.

All the layers of TFXLMRobertaForSequenceClassification were initialized from the model checkpoint at cardiffnlp/twitter-xlm-roberta-base-sentiment.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFXLMRobertaForSequenceClassification for predictions without further training.


Once the consumer and classifier are loaded, consumption can start so that we get the classified data into the output topic.

In [17]:
sentiment_consumer.consume()

Producing record: 1545212028247220225: {"created_at": "2022-07-08T01:03:50.000Z", "id": 1545212028247220225, "text": "RT @mcdowell_norm: @OmarAlghabra @AirCanada Best thing you can do is RESIGN!", "airline": "aircanada", "classification": {"sentiment": "Neutral", "score": 0.4647398293018341}}
Produced record to topic twitter-data-classified partition [0] @ offset 0
1 messages were produced to topic twitter-data-classified!
Consumed record with key b'1545212028247220225' and ID 1545212028247220225
Producing record: 1545212013227479040: {"created_at": "2022-07-08T01:03:47.000Z", "id": 1545212013227479040, "text": "@OmarAlghabra @AirCanada End the masks. End the mandates. End the ArriveCAN app. Problem solved", "airline": "aircanada", "classification": {"sentiment": "Negative", "score": 0.6257174015045166}}
Produced record to topic twitter-data-classified partition [0] @ offset 1
2 messages were produced to topic twitter-data-classified!
Consumed record with key b'1545212013227479040' and

A classification was executed successfully from the data available in the topics. The output from the above run can be seen in the image below for historical purposes (i.e. the output being random as we test this).

![Notebook Classifier Consumer](./imgs/notebook-classifier-consumer.png "Notebook Classifier")

The produced messages can be found also in Confluent Cloud where the classified data is now available in the topic, ready to be ingested by Elastic Search.

![Classified messages in Confluent Cloud](./imgs/confluent-cloud-classified-messages.png "Confluent Cloud Classified")


## Elastic Search ingestion

Now that the data is available in Confluent Cloud, an Elasticsearch connector is also available to send the data there,and start making use of it.