In [None]:
from kafka import KafkaConsumer
from hdfs import InsecureClient
import json

# Consommateur Kafka
consumer = KafkaConsumer(
    'sensor_data',
    bootstrap_servers=['localhost:9093'],  # Utilise l'IP du conteneur Kafka
    group_id='sensor_group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
)

# Initialiser le client HDFS avec l'adresse correcte du conteneur Hadoop
hdfs_client = InsecureClient('http://172.19.0.6:50070', user='hadoop')  # Utiliser 50070 ici pour l'accès WebHDFS

# Consommer les messages de Kafka et les stocker dans HDFS
for message in consumer:
    data = message.value
    try:
        # Afficher les données reçues
        print(f"Received data: {data}")

        # Vérifier si le fichier existe dans HDFS avant d'écrire
        file_path = '/user/iot/sensor_data/sensor_data.json'
        try:
            hdfs_client.status(file_path)  # Vérifie si le fichier existe
            file_exists = True
        except FileNotFoundError:
            file_exists = False
        
        # Si le fichier n'existe pas, crée-le et écrit les données
        if not file_exists:
            with hdfs_client.write(file_path, overwrite=True) as writer:
                writer.write(json.dumps(data) + "\n")
        else:
            # Ajouter les données dans le fichier JSON en mode append
            with hdfs_client.write(file_path, append=True) as writer:
                writer.write(json.dumps(data) + "\n")
        
        # Confirmer l'ajout des données dans la console
        print("Data written to HDFS successfully.")
    except Exception as e:
        # Afficher l'erreur en cas de problème avec HDFS
        print(f"Error writing to HDFS: {e}")
