In [None]:
from confluent_kafka import Consumer, KafkaError
import psycopg2
import json
import random
import time

# Configuration du consommateur Kafka
conf = {
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest',
}

# Création d'une instance de consommateur Kafka
print("Création du consommateur Kafka...")
consumer = Consumer(conf)

# Inscription au topic
topic = 'Mostodon_topic_stream'
print(f"Inscription au topic: {topic}...")
consumer.subscribe([topic])

# Fonction pour établir une connexion à PostgreSQL
def connect_to_postgres():
    print("Connexion à PostgreSQL...")
    while True:
        try:
            conn = psycopg2.connect(
                host="postgres",
                database="DB_Mastodon",
                user="fadi",
                password="fadi"
            )
            print("Connexion à PostgreSQL réussie.")
            return conn
        except Exception as e:
            print(f"Erreur de connexion à PostgreSQL: {e}. Tentative de reconnexion dans 5 secondes...")
            time.sleep(5)

conn = connect_to_postgres()
cursor = conn.cursor()

# Création de la table si elle n'existe pas déjà (avec tags)
create_table_query = '''
CREATE TABLE IF NOT EXISTS Mostodon_BRONZ_bis (
    id BIGINT PRIMARY KEY,
    username TEXT NOT NULL,
    display_name TEXT NOT NULL,
    content TEXT NOT NULL,
    favourites_count INT NOT NULL,
    reblogs_count INT NOT NULL,
    replies_count INT NOT NULL,
    tags TEXT  
);
'''

try:
    cursor.execute(create_table_query)
    conn.commit()
    print("Table Mostodon_BRONZ_bis créée ou déjà existante.")
except Exception as e:
    print(f"Erreur lors de la création de la table: {e}")
    raise

try:
    while True:
        # Lire les messages du topic
        msg = consumer.poll(1.0)  # Timeout de 1 seconde
        
        if msg is None:
            print("Aucun message reçu, en attente...")
            continue  # Pas de message reçu

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print("Atteint la fin de la partition, pas de nouveaux messages.")
                continue
            else:
                print(f"Erreur Kafka: {msg.error()}")
                continue
        
        # Traiter le message
        message_value = msg.value().decode('utf-8')
        try:
            print(f"Message brut reçu : {message_value}")
            message_dict = json.loads(message_value)
            print(f"Message décodé : {message_dict}")
            
            # Afficher les données récupérées
            print("Données récupérées :")
            print(f"ID: {message_dict['id']}")
            print(f"Username: {message_dict['username']}")
            print(f"Display Name: {message_dict['display_name']}")
            print(f"Content: {message_dict['content']}")
            print(f"Favourites Count: {message_dict.get('favourites_count', 'Non spécifié')}")
            print(f"Reblogs Count: {message_dict.get('reblogs_count', 'Non spécifié')}")
            print(f"Replies Count: {message_dict.get('replies_count', 'Non spécifié')}")
            print(f"Tags: {message_dict.get('tags', [])}")  # Debug : afficher les tags

            # Vérifier que les champs nécessaires sont présents
            required_fields = ['id', 'username', 'display_name', 'content']
            if not all(field in message_dict for field in required_fields):
                print("Message manquant des champs nécessaires, ignoré.")
                print(f"Champs trouvés : {message_dict.keys()}")
                continue

            # Remplacer 0 par une valeur aléatoire si les champs existent mais valent 0
            favourites_count = message_dict.get('favourites_count', random.randint(5, 100))
            favourites_count = favourites_count if favourites_count > 0 else random.randint(5, 100)

            reblogs_count = message_dict.get('reblogs_count', random.randint(5, 100))
            reblogs_count = reblogs_count if reblogs_count > 0 else random.randint(5, 100)

            replies_count = message_dict.get('replies_count', random.randint(5, 100))
            replies_count = replies_count if replies_count > 0 else random.randint(5, 100)

            # Récupération des tags (sous forme de liste de chaînes)
            tags = message_dict.get('tags', [])
            tags_str = ','.join(tags)  # Convertir la liste de tags en une chaîne séparée par des virgules

            print(f"Insertion du toot : ID={message_dict['id']}, Username={message_dict['username']}, "
                  f"Display Name={message_dict['display_name']}, Content={message_dict['content']}, "
                  f"Favourites Count={favourites_count}, Reblogs Count={reblogs_count}, Replies Count={replies_count}, Tags={tags_str}")

            # Requête d'insertion avec gestion du conflit sur l'ID (avec tags)
            insert_query = '''INSERT INTO Mostodon_BRONZ_bis (id, username, display_name, content, favourites_count, reblogs_count, replies_count, tags) 
                              VALUES (%s, %s, %s, %s, %s, %s, %s, %s) 
                              ON CONFLICT (id) DO NOTHING;'''
            cursor.execute(insert_query, (
                message_dict['id'],
                message_dict['username'],
                message_dict['display_name'],
                message_dict['content'],
                favourites_count,
                reblogs_count,
                replies_count,
                tags_str  # Insérer les tags sous forme de texte
            ))
            conn.commit()
            print(f"Toot inséré : {message_dict['id']} - {message_dict['username']}")

        except json.JSONDecodeError:
            print("Erreur lors du décodage JSON, message ignoré.")
            continue
        except Exception as e:
            print(f"Erreur lors de l'insertion : {e}")
            conn.rollback()  # Annuler la transaction en cas d'erreur

except KeyboardInterrupt:
    print("Arrêt du consommateur par l'utilisateur.")
except Exception as e:
    print(f"Erreur dans le consommateur: {e}")
finally:
    # Compter le nombre total d'entrées dans la table 'Mostodon_BRONZ_bis'
    print("Calcul du nombre total d'entrées dans la table 'Mostodon_BRONZ_bis'...")
    cursor.execute("SELECT COUNT(*) FROM Mostodon_BRONZ_bis;")
    count = cursor.fetchone()[0]
    print(f"Nombre total d'entrées dans la table 'Mostodon_BRONZ_bis' : {count}")

    # Fermer le consommateur et la connexion à PostgreSQL
    consumer.close()
    cursor.close()
    conn.close()


Création du consommateur Kafka...
Inscription au topic: Mostodon_topic_stream...
Connexion à PostgreSQL...
Connexion à PostgreSQL réussie.
Table Mostodon_BRONZ_bis créée ou déjà existante.
Message brut reçu : {"id": 113242718134571033, "username": "arXiv_qbioQM_bot", "display_name": "arXiv q-bio.QM bot", "content": "<p>Regional End-Systolic Circumferential Strain Demonstrates Reduced Function in Remote Myocardium after Anterior STEMI</p><p>Steve W. Leung, Kanjit Leungsuwan, Ahmed Abdel-Latif, Jonathan F. Wenk<br><a href=\"https://arxiv.org/abs/2410.01075\" rel=\"nofollow noopener noreferrer\" translate=\"no\" target=\"_blank\"><span class=\"invisible\">https://</span><span class=\"\">arxiv.org/abs/2410.01075</span><span class=\"invisible\"></span></a> <a href=\"https://arxiv.org/pdf/2410.01075\" rel=\"nofollow noopener noreferrer\" translate=\"no\" target=\"_blank\"><span class=\"invisible\">https://</span><span class=\"\">arxiv.org/pdf/2410.01075</span><span class=\"invisible\"></span