# Analyse Mastodon social media posts

### Install dependencies

In [None]:
!pip install confluent-kafka

### Import libraries

In [None]:
from confluent_kafka import Producer, Consumer, KafkaError # to produce and consume data from Apache Kafka topics
import boto3 # to programmatically create, configure, and manage AWS resources
import json # to work with social media messages that are represented as JSON objects
import re # for helper functionality to clean HTML tags from social media messages


## Prepare models 

### Add a method to invoke the endpoints that we have deployed 


In [None]:
def invoke_model_endpoint(endpoint_name, text):
    input_data = json.dumps({"inputs": text})
    client = boto3.client('runtime.sagemaker')
    query_response = client.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/json', Body=input_data, Accept='application/json')
    return json.loads(query_response['Body'].read())

# usage:
# print(invoke_model_endpoint('roberta-base-sentiment','cat'));
# print(invoke_model_endpoint('twitter-roberta-base-offensive-endpoint','cat'));


## Apache Kafka workflow

### Define Apache Kafka connection properties

In [None]:
# TODO: Load Apache Kafka certificates into certificates folder
apache_kafka_ssl_config = {
    'ssl.ca.location': 'certificates/ca.pem', 
    'ssl.certificate.location': 'certificates/service.cert',
    'ssl.key.location': 'certificates/service.key',
    'security.protocol': 'ssl',
}

apache_kafka_uri = ''  # TODO: Set URI for Apache Kafka

apache_kafka_input_topic_name = 'mastodon_posts'
apache_kafka_enriched_output_topic_name = 'mastodon_posts_enriched'
apache_kafka_processing_errors_topic_name = 'mastodon_posts_processing_errors'


### Create Apache Kafka Consumer

In [None]:
consumer = Consumer({'bootstrap.servers': apache_kafka_uri, 'group.id': 'mygroup10', 'auto.offset.reset': 'earliest', **apache_kafka_ssl_config})
consumer.subscribe([apache_kafka_input_topic_name])

CLEANR = re.compile('<.*?>') 

def get_json_body(message):    
    decoded_message = message.value().decode('utf-8') # Decode from binary 
    json_message = json.loads(decoded_message)  # Parse JSON message
    return json_message

def get_clean_content(json_object):    
    content = json_object.get("content", "")  # Retrieve 'content' property    
    only_text = re.sub(CLEANR, '', content)
    return only_text


### Create Apache Kafka Producer

In [None]:
producer = Producer({
    'bootstrap.servers': apache_kafka_uri, 
    **apache_kafka_ssl_config 
})

# Send a message to a Kafka topic
def send_message(message, topic_name):
    producer.produce(topic_name, json.dumps(message).encode('utf-8'))
    producer.flush()
    
def send_enriched_data(message, offensive_score, is_offensive, sentiment_score, sentiment_label):
    message['offensive_score'] = offensive_score
    message['is_offensice'] = is_offensive
    message['sentiment_score'] = sentiment_score
    message['sentiment_label'] = sentiment_label
    send_message(message, apache_kafka_enriched_output_topic_name)
    
def report_processing_error(message, error_code, error_message):
    message['processing_error_code'] = error_code
    message['processing_error_message'] = error_message
    send_message(message, apache_kafka_processing_errors_topic_name)
    


### Read messages from Apache Kafka **input topic** and push processed data back to **output topic**

In [None]:
print(f"Processing messages")
while True:
    message = consumer.poll(1.0)  # Poll for messages, with a timeout of 1 second
    if message is None:
        continue

    if message.error():
        if message.error().code() == KafkaError._PARTITION_EOF:
            # End of partition event
            print(f"Reached end of partition for topic {message.topic()} [{message.partition()}]")
        else:
            print(f"Error while consuming message: {message.error()}")
    else:
        # Process the message
        json_body = get_json_body(message)
        content_property = get_clean_content(json_body)
        if content_property == "":
            continue
        try:
            # Get offensive probability
            offensive_result = invoke_model_endpoint('twitter-roberta-base-offensive-endpoint', content_property)[0]
            print(offensive_result)
            offensive_score = offensive_result['score']
            offensive_label = offensive_result['label']

            # Get sentiment probability
            sentiment_result = invoke_model_endpoint('roberta-base-sentiment', content_property)[0]
            sentiment_score = sentiment_result['score']
            sentiment_label = sentiment_result['label']
            

            print('Inference:')
            print(f"Input text: '{content_property}'")
            print(f"Offensive score: {offensive_score}")
            print(f"The message is: {offensive_label}")
            print(f"Sentiment score: {sentiment_score}")
            print(f"The message is: {sentiment_label}")

            send_enriched_data(json_body, offensive_result['score'], offensive_result['label'], sentiment_result['score'], sentiment_result['label'])
            

        except Exception as e:
            print(f"An error occurred: {e}")
            response = getattr(e, "response", {})
            error_code = response.get("Error", {}).get("Code", "Unknown")
            error_message = response.get("Error", {}).get("Message", "Unknown")
            report_processing_error(json_body, error_code, error_message)
            

# Close the consumer
consumer.close()
