In [None]:
from confluent_kafka import Consumer, KafkaError
import psycopg2

# Configuration Kafka
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'  # Remplacez par l'adresse de votre serveur Kafka
KAFKA_TOPIC = 'Topic5'  # Remplacez par le nom de votre sujet Kafka

# Configuration PostgreSQL
DB_HOST = 'localhost'
DB_PORT = 5432
DB_NAME = 'WaouhMonde'
DB_USER = 'waouhmonde'
DB_PASSWORD = 'waouhmonde'

# Créez une connexion à PostgreSQL
conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD
)
cursor = conn.cursor()

# Créez un consommateur Kafka
consumer = Consumer({
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest'
})

# Abonnez-vous au sujet Kafka
consumer.subscribe([KAFKA_TOPIC])

# Créez la table dans PostgreSQL si elle n'existe pas
create_table_sql = """
CREATE TABLE IF NOT EXISTS waouhmonde01 (
    id SERIAL PRIMARY KEY,
    nom VARCHAR(255),
    adresse VARCHAR(255),
    code_postal VARCHAR(20),
    ville VARCHAR(255),
    numero_telephone VARCHAR(25) UNIQUE,
    tranche_age VARCHAR(50),
    autre_colonne VARCHAR(255)
)
"""
cursor.execute(create_table_sql)
conn.commit()

# Fonction pour insérer les données dans la base de données PostgreSQL
def insert_into_postgresql(data):
    try:
        # Nettoyez les crochets de la chaîne
        data = data[0].strip("['").strip("']")
        # Divisez la chaîne en utilisant le point-virgule comme délimiteur
        elements = data.split(';')
        if len(elements) == 7:
            nom, adresse, code_postal, ville, numero_telephone, tranche_age, autre_colonne = elements

            # Utilisez la clause ON CONFLICT pour éviter les doublons
            cursor.execute("INSERT INTO waouhmonde01 (nom, adresse, code_postal, ville, numero_telephone, tranche_age, autre_colonne) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (numero_telephone) DO NOTHING",
                           (nom, adresse, code_postal, ville, numero_telephone, tranche_age, autre_colonne))
            conn.commit()
            print('Données insérées dans PostgreSQL :', elements)
            print('Ligne insérée avec succès.')
        else:
            print('Le message Kafka ne contient pas le bon nombre d\'éléments.')
    except psycopg2.IntegrityError:
        conn.rollback()  # Ignore duplicates
        print('Donnée déjà existante, ignorée :', elements)

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('Fin du message - pas de message supplémentaire')
            else:
                print('Erreur lors de la réception du message: {}'.format(msg.error()))
        else:
            # Récupérer le message Kafka
            message = msg.value().decode('utf-8')
            data = [message]  # Créez une liste contenant le message pour correspondre au format de données attendu
            print(data)

            # Insérer les données dans PostgreSQL
            insert_into_postgresql(data)

except KeyboardInterrupt:
    pass
finally:
    # Fermer le consommateur Kafka et la connexion PostgreSQL
    consumer.close()
    cursor.close()
    conn.close()