In [None]:
import pandas as pd
from confluent_kafka import Consumer, KafkaError
import json
import psycopg2
from datetime import datetime

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.svm import SVC
from joblib import load

# Configuration du consumer
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)

topic = 'NELSI'
consumer.subscribe([topic])

conn = psycopg2.connect(
    host="localhost",
    database="NELSI",
    user="postgres",
    password="Simon09@09"
)

cur = conn.cursor()

cur.execute("""
    CREATE TABLE IF NOT EXISTS Reddit (
        id SERIAL PRIMARY KEY,
        subreddit VARCHAR(500),
        topic VARCHAR(500),
        title VARCHAR(500),
        content TEXT,
        label TEXT,
        created_at TIMESTAMP
    )
""")

# Consommer les messages
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Fin du topic
                break
            else:
                # Erreur de consommation
                print('Erreur de consommation : {}'.format(msg.error().str()))
                continue

        # Traitement du message
        key = msg.key()
        value = msg.value()

        # Vérifier si la clé existe
        if key is not None:
            key = key.decode('utf-8')

        # Vérifier si la valeur existe
        if value is not None:
            value = value.decode('utf-8')
            post_data = json.loads(value)
            subreddit = post_data['subreddit']
            title = post_data['title']
            content = post_data['content']
            combined_text = f"{title} {content}"

            model = load('model.joblib')
            vectorizer = load('vectorizer.joblib')
            # Preprocess the combined text using the same vectorizer
            X_pred = vectorizer.transform([combined_text])

            # Make predictions using the trained model
            y_pred = model.predict(X_pred)

            y_pred_str = y_pred[0]

        else:
            subreddit = None
            title = None
            content = None
            y_pred = None

        MAX_LENGTH = 150  # Maximum length for the columns

        # Truncate values that exceed the maximum length
        subreddit = subreddit[:MAX_LENGTH]
        topic = topic[:MAX_LENGTH]
        title = title[:MAX_LENGTH]
        content = content[:MAX_LENGTH]

        # Append ellipsis to truncated values
        subreddit = subreddit[:MAX_LENGTH-3] + '...' if len(subreddit) > MAX_LENGTH else subreddit
        topic = topic[:MAX_LENGTH-3] + '...' if len(topic) > MAX_LENGTH else topic
        title = title[:MAX_LENGTH-3] + '...' if len(title) > MAX_LENGTH else title
        content = content[:MAX_LENGTH-3] + '...' if len(content) > MAX_LENGTH else content

        # Get the current timestamp
        created_at = datetime.now()

        # Insert the truncated values into the table
        cur.execute(
            "INSERT INTO Reddit (subreddit, topic, title, content, label, created_at) VALUES (%s, %s, %s, %s, %s, %s)",
            (subreddit, topic, title, content, y_pred_str, created_at)
        )

        # Valider la transaction
        conn.commit()

except KeyboardInterrupt:
    pass

# Fermer la connexion et le consumer
cur.close()
conn.close()
consumer.close()