# Instructions d'Installation et de Configuration de l'Environnement pour le Projet Kafka

Ce document fournit un guide pour installer Python 3.10, configurer un environnement virtuel, et installer les bibliothèques nécessaires pour le projet de streaming de données avec Kafka.

---

## Installation de Python 3.10

1. **Ajouter le dépôt de Python** :

   ```bash
   sudo apt update
   sudo apt install -y software-properties-common
   sudo add-apt-repository ppa:deadsnakes/ppa
   sudo apt update


2. **Installer Python 3.10 :** :

   ```bash
   sudo apt install python3.10

3. **Installer le module venv pour Python 3.10 :** :

   ```bash
   sudo apt install python3.10-venv


## Création et Activation d'un Environnement Virtuel

4. **Créer l’environnement virtuel :** :

   ```bash
   python3.10 -m venv kafka_env

5. **Activer l’environnement virtuel :** :

   ```bash
   source kafka_env/bin/activate

## Installation des Bibliothèques Requises
Dans l’environnement virtuel, installez les bibliothèques suivantes :

5. **kafka-python : pour interagir avec Kafka en tant que producteur et consumer.** :

   ```bash
   pip install kafka-python
5. **requests : pour récupérer les données depuis l'API** :

   ```bash
   pip install requests




In [None]:
import requests

# Configuration
API_KEY = '5cd0c68e3773c0513b41138ade44163b'  # Clé API
CITIES = ['Paris']  # Villes cibles


# Fonction pour récupérer les données météo
def get_weather_data(city):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    return response.json() if response.status_code == 200 else None

get_weather_data("Lyon")

In [1]:
import subprocess
import time

# Paths to Zookeeper and Kafka start scripts and configuration files
ZOOKEEPER_START_CMD = "./kafka_2.12-2.6.0/bin/zookeeper-server-start.sh"
ZOOKEEPER_CONFIG = "./kafka_2.12-2.6.0/config/zookeeper.properties"
KAFKA_START_CMD = "./kafka_2.12-2.6.0/bin/kafka-server-start.sh"
KAFKA_CONFIG = "./kafka_2.12-2.6.0/config/server.properties"

# Start Zookeeper
zookeeper_process = subprocess.Popen([ZOOKEEPER_START_CMD, ZOOKEEPER_CONFIG])
print("Starting Zookeeper...")
time.sleep(5)  # Wait for Zookeeper to initialize

# Start Kafka
kafka_process = subprocess.Popen([KAFKA_START_CMD, KAFKA_CONFIG])
print("Starting Kafka...")
time.sleep(5)  # Wait for Kafka to initialize

# Kafka and Zookeeper are now running in the background.
# You can proceed with other operations here, like producing and consuming messages.

# To stop the servers after usage
def stop_servers():
    zookeeper_process.terminate()
    kafka_process.terminate()
    print("Zookeeper and Kafka have been stopped.")

# Example usage of stopping the servers
# stop_servers()


Starting Zookeeper...
[2024-11-14 16:38:49,013] INFO Reading configuration from: ./kafka_2.12-2.6.0/config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-11-14 16:38:49,017] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-11-14 16:38:49,017] INFO secureClientPort is not set (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2024-11-14 16:38:49,019] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2024-11-14 16:38:49,020] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2024-11-14 16:38:49,020] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2024-11-14 16:38:49,021] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2024-11-14 16:38:49,026] INFO Log4j 1.2 jmx support found and enabled. (org.apa

[2024-11-14 16:39:05,898] WARN [SocketServer brokerId=0] Unexpected error from /0:0:0:0:0:0:0:1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at kafka.network.Processor.poll(SocketServer.scala:913)
	at kafka.network.Processor.run(SocketServer.scala:816)
	at java.base/java.lang.Thread.run(Thread.java:1583)
[2024-11-14 16:39:05,916] WARN [SocketServer brokerId=0] Unexpected error from /0:0:0:0:0:0

sudo apt install python3.10-venv


In [7]:
# Exemple pour collecter des données dans un topic Kafka
from kafka import KafkaProducer
import json

# Configuration
KAFKA_SERVER = 'localhost:9092'

# Initialisation du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

data =  {'temp': 11.32,
  'feels_like': 10.67,
  'temp_min': 10.21,
  'temp_max': 12.36,
  'pressure': 1030,
  'humidity': 83,
  'sea_level': 1030,
  'grnd_level': 1020}

# Envoi des données en continu

producer.send("tp-meteo",  value=data)



<kafka.producer.future.FutureRecordMetadata at 0x73fa1029a2c0>

In [None]:
from kafka import KafkaConsumer

# Configuration de Kafka
KAFKA_TOPIC = 'tp-meteo'
KAFKA_SERVER = 'localhost:9092'

# Initialisation du consumer Kafka
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Consommer et afficher les messages
for message in consumer:
    data = message.value       
    print(data)

# TP : Pipeline de Données Météorologiques en Temps Réel avec Kafka

**Objectif** : Dans ce TP, vous allez utiliser Kafka pour transmettre des données météorologiques en temps réel d’un producteur (qui récupère les données) vers un consumer (qui les affiche). Vous configurerez un producteur pour récupérer les données météo de plusieurs villes via une API, puis un consumer pour consommer et afficher ces données en continu.

---

## Partie 1 : Producteur Kafka – Récupération et Envoi des Données Météorologiques

Le producteur Kafka va récupérer les données météo pour trois villes cibles et les envoyer dans un topic Kafka nommé `tp-meteo`. 

### Étapes :

1. **Configuration de l'API et des Villes** :
   - Utilisez une clé API pour accéder à OpenWeatherMap (une API de données météo).
   - Définissez trois villes cibles : `Paris`, `London`, et `Tokyo`.

2. **Initialisation du Producteur Kafka** :
   - Configurez Kafka pour qu’il envoie des messages au serveur `localhost:9092` dans le topic `tp-meteo`.
   - Kafka sera utilisé pour envoyer les données météorologiques formatées en JSON.

3. **Fonction de Récupération des Données** :
   - Écrivez une fonction `get_weather_data` qui récupère les informations météo pour une ville donnée.
   - La fonction envoie une requête à l’API et retourne les données si la requête est réussie.

4. **Boucle d’Envoi en Continu** :
   - Utilisez une boucle pour envoyer les données de chaque ville au topic Kafka toutes les minutes.
   - Affichez un message de confirmation dans la console pour chaque envoi réussi.



In [12]:
import requests
import json
import time
from kafka import KafkaProducer, KafkaConsumer

In [13]:
# Configuration de l'API et Kafka
API_KEY = '5cd0c68e3773c0513b41138ade44163b'  # Remplacez par votre clé API
CITIES = ['Paris', 'London', 'Tokyo']  # Villes cibles
KAFKA_SERVER = 'localhost:9092'  # Serveur Kafka
KAFKA_TOPIC = 'tp-meteo'  # Nom du topic Kafka


In [14]:
# Initialisation du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Producteur Kafka initialisé.")


Producteur Kafka initialisé.


In [15]:
# Fonction pour récupérer les données météo d'une ville
def get_weather_data(city):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Échec de la récupération des données pour {city}, statut : {response.status_code}")
        return None


In [None]:
# Boucle d'envoi en continu des données météo au topic Kafka
while True:
    for city in CITIES:
        data = get_weather_data(city)
        if data:
            # Extraction des données météo pertinentes
            weather_info = {
                'city': city,
                'temp': data['main']['temp'],
                'feels_like': data['main']['feels_like'],
                'temp_min': data['main']['temp_min'],
                'temp_max': data['main']['temp_max'],
                'pressure': data['main']['pressure'],
                'humidity': data['main']['humidity']
            }
            # Envoi des données dans le topic Kafka
            producer.send(KAFKA_TOPIC, value=weather_info)
            print(f"Données météo envoyées pour {city} dans le topic Kafka.")
    # Attente de 60 secondes avant de récupérer à nouveau les données
    time.sleep(60)


Données météo envoyées pour Paris dans le topic Kafka.
Données météo envoyées pour London dans le topic Kafka.
Données météo envoyées pour Tokyo dans le topic Kafka.
Données météo envoyées pour Paris dans le topic Kafka.
Données météo envoyées pour London dans le topic Kafka.
Données météo envoyées pour Tokyo dans le topic Kafka.
Données météo envoyées pour Paris dans le topic Kafka.
Données météo envoyées pour London dans le topic Kafka.
Données météo envoyées pour Tokyo dans le topic Kafka.
Données météo envoyées pour Paris dans le topic Kafka.
Données météo envoyées pour London dans le topic Kafka.
Données météo envoyées pour Tokyo dans le topic Kafka.
Données météo envoyées pour Paris dans le topic Kafka.
Données météo envoyées pour London dans le topic Kafka.
Données météo envoyées pour Tokyo dans le topic Kafka.
Données météo envoyées pour Paris dans le topic Kafka.
Données météo envoyées pour London dans le topic Kafka.
Données météo envoyées pour Tokyo dans le topic Kafka.
Donn

In [None]:
# Initialisation du consumer Kafka
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print("Consumer Kafka initialisé et prêt à recevoir les données.")


Consumer Kafka initialisé et prêt à recevoir les données.
