In [1]:
from confluent_kafka import Consumer, KafkaError
import psycopg2
import json
import os

# Configuration du consommateur Kafka
conf = {
    'bootstrap.servers': 'host.docker.internal:9092',  # Adresse broker Kafka
    'group.id': 'my_group',                           # Identifiant du groupe de consommateurs
    'auto.offset.reset': 'earliest'                   # Li depuis le début si aucun offset n'existe
}

# Création du consommateur Kafka
consumer = Consumer(conf)

# Connexion à PostgreSQL
pg_conf = {
    'host': 'postgres',
    'port': '5432',
    'database': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD')
}

try:
    # Établir une connexion à PostgreSQL
    print("Connexion à PostgreSQL...")
    conn = psycopg2.connect(**pg_conf)
    cursor = conn.cursor()

    # Crée une table pour les données brutes (Bronze)
    print("Création de la table bronze_mastodon...")
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS bronze_mastodon (
        id BIGINT PRIMARY KEY,
        content TEXT,
        username TEXT,
        reblogs_count BIGINT,
        favourites_count BIGINT,
        replies_count BIGINT,
        language TEXT,
        url TEXT,
        media_url TEXT,
        media_preview_url TEXT,
        hashtags TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """)
    conn.commit()
    print("Table bronze_mastodon prête.")

    # Souscription au topic Kafka
    consumer.subscribe(['mastodon_stream'])
    print("Souscription au topic 'mastodon_stream' réussie.")

    # Variables pour gérer le batch
    batch_size = 100  # Nombre de messages avant de committer la transaction
    counter = 0       # Compteur de messages traités

    while True:
        # Poll pour les nouveaux contenus
        print("Polling Kafka pour de nouveaux contenus...")
        msg = consumer.poll(1.0)  # Attendre jusqu'à 1 seconde pour un contenu

        if msg is None:
            continue
        if msg.error():
            print(f"Erreur Kafka: {msg.error()}")
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Erreur irrécupérable: {msg.error()}")
                break

        try:
            # Décode le contenu reçu de Kafka
            content_value = msg.value().decode('utf-8')
            print(f"Contenu reçu de Kafka : {content_value}")
            content_json = json.loads(content_value)

            # Récupére les données importantes du contenu
            content_id = content_json.get('id')
            content_created_at = content_json.get('created_at')
            content = content_json.get('content')
            username = content_json.get('username')
            reblogs_count = content_json.get('reblogs_count', 0)
            favourites_count = content_json.get('favourites_count', 0)
            replies_count = content_json.get('replies_count', 0)
            language = content_json.get('language')
            url = content_json.get('url')

            # Récupére les médias s'ils sont présents
            media_url = None
            media_preview_url = None
            if 'media_attachments' in content_json and content_json['media_attachments']:
                media_url = content_json['media_attachments'][0].get('url')
                media_preview_url = content_json['media_attachments'][0].get('preview_url')

            # Récupére les hashtags s'ils sont présents (sous forme de chaîne)
            hashtags = content_json.get('hashtags', '')

            # Affiche le contenu reçu
            print(f"ID: {content_id}, Content: {content}, Hashtags: {hashtags}")

            # Vérifie si l'identifiant existe déjà dans la base de données
            cursor.execute("""
            SELECT 1 FROM bronze_mastodon WHERE id = %s
            """, (content_id,))
            exists = cursor.fetchone()

            if not exists:
                # Insére le contenu dans la table bronze
                cursor.execute("""
                INSERT INTO bronze_mastodon (id, created_at, content, username, reblogs_count, favourites_count, replies_count, language, url, media_url, media_preview_url, hashtags) 
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                """, (content_id, content_created_at, content, username, reblogs_count, favourites_count, replies_count, language, url, media_url, media_preview_url, hashtags))
                print(f"Contenu inséré dans la base de données : {content}")
            else:
                # Met à jour les colonnes counts et autres informations si le contenu existe déjà
                cursor.execute("""
                UPDATE bronze_mastodon 
                SET created_at = %s, content = %s, username = %s, reblogs_count = %s, favourites_count = %s, replies_count = %s, language = %s, url = %s, media_url = %s, media_preview_url = %s, hashtags = %s
                WHERE id = %s;
                """, (content_created_at, content, username, reblogs_count, favourites_count, replies_count, language, url, media_url, media_preview_url, hashtags, content_id))
                print(f"Contenu mis à jour dans la base de données : {content}")

            # Incrémente le compteur de messages
            counter += 1

            # Si le batch est atteint, on commit
            if counter >= batch_size:
                conn.commit()  # Commite toutes les modifications en une seule fois
                print(f"Transaction commitée pour {batch_size} messages.")
                counter = 0  # Réinitialise le compteur

        except json.JSONDecodeError as e:
            print(f"Erreur lors du décodage du contenu JSON : {e}")
        except Exception as e:
            print(f"Erreur inattendue : {e}")

except KeyboardInterrupt:
    print("Interruption par l'utilisateur.")

except Exception as e:
    print(f"Erreur générale : {e}")

finally:
    # Commite les messages restants si le batch n'était pas complet
    if counter > 0:
        conn.commit()
        print(f"Transaction commitée pour les {counter} messages restants.")

    # Ferme les connexions
    print("Fermeture des connexions...")
    consumer.close()
    if cursor:
        cursor.close()
    if conn:
        conn.close()
    print("Connexions fermées.")

Connexion à PostgreSQL...
Création de la table bronze_mastodon si elle n'existe pas déjà...
Table bronze_mastodon prête.
Souscription au topic 'mastodon_stream' réussie.
Polling Kafka pour de nouveaux contenus...
Contenu reçu de Kafka : {"id": 113284290045021541, "created_at": "2024-10-10 17:24:44+00:00", "content": "<p>I'm going to start charging me money every time someone engaged in a heterosexual relationship hits me up for advice...also like... Not sure why I seem like a good candidate for advice lol I'm a divorced gay</p>", "username": "telmii_poo", "replies_count": 13, "reblogs_count": 100, "favourites_count": 35, "hashtags": "", "language": "en", "url": "https://jorts.horse/@telmii_poo/113284289983008913", "media_attachments": []}
ID: 113284290045021541, Content: <p>I'm going to start charging me money every time someone engaged in a heterosexual relationship hits me up for advice...also like... Not sure why I seem like a good candidate for advice lol I'm a divorced gay</p>, Ha