In [6]:
from confluent_kafka import Producer
import json
import time
import random

# Configuration de Kafka
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC_INPUT = 'transactions-input'

# Configuration pour 2000 transactions en 120 secondes
TOTAL_TRANSACTIONS = 5000
DURATION_SECONDS = 300
INTERVAL = DURATION_SECONDS / TOTAL_TRANSACTIONS  # Intervalle entre les envois

# Fonction pour produire des transactions
def delivery_report(err, msg):
    """
    Callback pour signaler le succès ou l'échec de la livraison d'un message.
    """
    if err is not None:
        print(f"Échec de l'envoi du message : {err}")
    else:
        print(f"Message envoyé avec succès au topic {msg.topic()} [partition {msg.partition()}]")

def produce_transactions():
    # Configuration du Producer Kafka
    producer_config = {
        'bootstrap.servers': BOOTSTRAP_SERVERS,
        'linger.ms': 10,  # Réduire la latence d'envoi
    }
    producer = Producer(producer_config)

    print(f"Envoi de {TOTAL_TRANSACTIONS} transactions au topic '{TOPIC_INPUT}' en {DURATION_SECONDS} secondes...")

    user_ids = ['UserId1', 'UserId2', 'UserId3', 'UserId4', 'UserId5']
    for i in range(TOTAL_TRANSACTIONS):
        user_id = random.choice(user_ids)
        amount = random.randint(1000, 20000)  # Montant entre 1000 et 20000
        timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
        transaction = {
            "userId": user_id,
            "amount": amount,
            "timestamp": timestamp
        }

        # Envoi au topic Kafka
        try:
            producer.produce(
                topic=TOPIC_INPUT,
                key=str(user_id),
                value=json.dumps(transaction),
                callback=delivery_report
            )
        except Exception as e:
            print(f"Erreur lors de l'envoi : {e}")

        # Pause pour respecter l'intervalle
        time.sleep(INTERVAL)

    # Assurez-vous que tous les messages ont été transmis avant de fermer
    producer.flush()

    print("Toutes les transactions ont été envoyées avec succès.")

if __name__ == '__main__':
    produce_transactions()


Envoi de 5000 transactions au topic 'transactions-input' en 300 secondes...
Message envoyé avec succès au topic transactions-input [partition 2]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 2]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès au topic transactions-input [partition 1]
Message envoyé avec succès au topic transactions-input [partition 0]
Message envoyé avec succès 