In [3]:
from kafka import KafkaConsumer
import psycopg2
import json

# Configurer le consommateur Kafka
consumer = KafkaConsumer(
    'election_stream',
    bootstrap_servers=['kafka:29092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Connexion à la base PostgreSQL
conn = psycopg2.connect(
    database="us_election_db",
    user="postgres",
    password="postgres",
    host="pg-ds-dellstore",
    port="5432"
)
cursor = conn.cursor()

# Stocker les données agrégées en mémoire pour mise à jour
state_aggregation = {}

# Traitement des messages reçus
for message in consumer:
    data = message.value
    print(f"Reçu : {data}")
    
    # Calculer la différence de score
    score_difference = abs(float(data['trump']) - float(data['harris']))
    
    # Mettre à jour l'agrégation des scores par État
    state = data['state']
    if state not in state_aggregation:
        state_aggregation[state] = {"total_trump": 0, "total_harris": 0}
    
    state_aggregation[state]["total_trump"] += float(data['trump'])
    state_aggregation[state]["total_harris"] += float(data['harris'])
    
    # Déterminer le candidat en tête pour l'État
    leading_candidate = (
        "trump" 
        if state_aggregation[state]["total_trump"] > state_aggregation[state]["total_harris"] 
        else "harris"
    )
    
    # Données transformées à insérer
    transformed_data = {
        'id': data['id'],
        'state': state,
        'county': data['county'],
        'trump': float(data['trump']),
        'harris': float(data['harris']),
        'score_difference': score_difference,
        'total_trump': state_aggregation[state]["total_trump"],
        'total_harris': state_aggregation[state]["total_harris"],
        'leading_candidate': leading_candidate
    }
    
    # Afficher les données transformées avant insertion
    print(f"Données transformées : {transformed_data}")
    
    # Préparer la requête d'insertion pour les données enrichies
    insert_query = """
        INSERT INTO election_data_enriched (
            id, state, county, trump, harris, score_difference, 
            total_trump, total_harris, leading_candidate
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    # Exécution de la requête d'insertion
    cursor.execute(insert_query, (
        transformed_data['id'],  # ID unique du comté
        transformed_data['state'],  # État
        transformed_data['county'],  # Comté
        transformed_data['trump'],  # Score Trump
        transformed_data['harris'],  # Score Harris
        transformed_data['score_difference'],  # Différence de score
        transformed_data['total_trump'],  # Total Trump dans l'État
        transformed_data['total_harris'],  # Total Harris dans l'État
        transformed_data['leading_candidate']  # Candidat en tête dans l'État
    ))
    
    # Sauvegarder les modifications dans la base
    conn.commit()  # Sauvegarder les modifications dans la base

# Fermer la connexion à la base
cursor.close()
conn.close()

Reçu : {'id': '1', 'state': 'Alabama', 'county': 'Autauga', 'trump': '72.4', 'harris': '26.3'}
Données transformées : {'id': '1', 'state': 'Alabama', 'county': 'Autauga', 'trump': 72.4, 'harris': 26.3, 'score_difference': 46.10000000000001, 'total_trump': 72.4, 'total_harris': 26.3, 'leading_candidate': 'trump'}
Reçu : {'id': '3', 'state': 'Alabama', 'county': 'Barbour', 'trump': '56.9', 'harris': '42.2'}
Données transformées : {'id': '3', 'state': 'Alabama', 'county': 'Barbour', 'trump': 56.9, 'harris': 42.2, 'score_difference': 14.699999999999996, 'total_trump': 129.3, 'total_harris': 68.5, 'leading_candidate': 'trump'}
Reçu : {'id': '5', 'state': 'Alabama', 'county': 'Blount', 'trump': '90.0', 'harris': '9.1'}
Données transformées : {'id': '5', 'state': 'Alabama', 'county': 'Blount', 'trump': 90.0, 'harris': 9.1, 'score_difference': 80.9, 'total_trump': 219.3, 'total_harris': 77.6, 'leading_candidate': 'trump'}
Reçu : {'id': '7', 'state': 'Alabama', 'county': 'Butler', 'trump': '61.

KeyboardInterrupt: 