# Instructions d'Installation et de Configuration de l'Environnement pour le Projet Kafka

Ce document fournit un guide pour installer Python 3.10, configurer un environnement virtuel, et installer les bibliothèques nécessaires pour le projet de streaming de données avec Kafka.

---

## Installation de Python 3.10

1. **Ajouter le dépôt de Python** :

   ```bash
   sudo apt update
   sudo apt install -y software-properties-common
   sudo add-apt-repository ppa:deadsnakes/ppa
   sudo apt update


2. **Installer Python 3.10 :** :

   ```bash
   sudo apt install python3.10

3. **Installer le module venv pour Python 3.10 :** :

   ```bash
   sudo apt install python3.10-venv


## Création et Activation d'un Environnement Virtuel

4. **Créer l’environnement virtuel :** :

   ```bash
   python3.10 -m venv kafka_env

5. **Activer l’environnement virtuel :** :

   ```bash
   source kafka_env/bin/activate

## Installation des Bibliothèques Requises
Dans l’environnement virtuel, installez les bibliothèques suivantes :

5. **kafka-python : pour interagir avec Kafka en tant que producteur et consumer.** :

   ```bash
   pip install kafka-python
5. **requests : pour récupérer les données depuis l'API** :

   ```bash
   pip install requests




In [3]:
import requests

# Configuration
API_KEY = '6365575828f48fe0263c2d56aa2e323c'  # Clé API
CITIES = ['Paris']  # Villes cibles


# Fonction pour récupérer les données météo
def get_weather_data(city):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    return response.json() if response.status_code == 200 else None

get_weather_data("Paris")

{'coord': {'lon': 2.3488, 'lat': 48.8534},
 'weather': [{'id': 803,
   'main': 'Clouds',
   'description': 'broken clouds',
   'icon': '04d'}],
 'base': 'stations',
 'main': {'temp': 12.66,
  'feels_like': 11.89,
  'temp_min': 11.88,
  'temp_max': 13.31,
  'pressure': 1029,
  'humidity': 73,
  'sea_level': 1029,
  'grnd_level': 1019},
 'visibility': 10000,
 'wind': {'speed': 3.09, 'deg': 10},
 'clouds': {'all': 75},
 'dt': 1731588447,
 'sys': {'type': 2,
  'id': 2012208,
  'country': 'FR',
  'sunrise': 1731567517,
  'sunset': 1731600706},
 'timezone': 3600,
 'id': 2988507,
 'name': 'Paris',
 'cod': 200}

sudo apt install python3.10-venv


In [5]:
# Exemple pour collecter des données dans un topic Kafka
from kafka import KafkaProducer
import json

# Configuration
KAFKA_SERVER = 'localhost:9092'

# Initialisation du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

data =  {'temp': 11.32,
  'feels_like': 10.67,
  'temp_min': 10.21,
  'temp_max': 12.36,
  'pressure': 1030,
  'humidity': 83,
  'sea_level': 1030,
  'grnd_level': 1020}

# Envoi des données en continu

producer.send("tp-meteo",  value=data)



<kafka.producer.future.FutureRecordMetadata at 0x774bf204c8b0>

In [6]:
from kafka import KafkaConsumer

# Configuration de Kafka
KAFKA_TOPIC = 'tp-meteo'
KAFKA_SERVER = 'localhost:9092'

# Initialisation du consumer Kafka
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Consommer et afficher les messages
for message in consumer:
    data = message.value       
    print(data)

{'temp': 11.32, 'feels_like': 10.67, 'temp_min': 10.21, 'temp_max': 12.36, 'pressure': 1030, 'humidity': 83, 'sea_level': 1030, 'grnd_level': 1020}


KeyboardInterrupt: 

# TP : Pipeline de Données Météorologiques en Temps Réel avec Kafka

**Objectif** : Dans ce TP, vous allez utiliser Kafka pour transmettre des données météorologiques en temps réel d’un producteur (qui récupère les données) vers un consumer (qui les affiche). Vous configurerez un producteur pour récupérer les données météo de plusieurs villes via une API, puis un consumer pour consommer et afficher ces données en continu.

---

## Partie 1 : Producteur Kafka – Récupération et Envoi des Données Météorologiques

Le producteur Kafka va récupérer les données météo pour trois villes cibles et les envoyer dans un topic Kafka nommé `tp-meteo`. 

### Étapes :

1. **Configuration de l'API et des Villes** :
   - Utilisez une clé API pour accéder à OpenWeatherMap (une API de données météo).
   - Définissez trois villes cibles : `Paris`, `London`, et `Tokyo`.

2. **Initialisation du Producteur Kafka** :
   - Configurez Kafka pour qu’il envoie des messages au serveur `localhost:9092` dans le topic `tp-meteo`.
   - Kafka sera utilisé pour envoyer les données météorologiques formatées en JSON.

3. **Fonction de Récupération des Données** :
   - Écrivez une fonction `get_weather_data` qui récupère les informations météo pour une ville donnée.
   - La fonction envoie une requête à l’API et retourne les données si la requête est réussie.

4. **Boucle d’Envoi en Continu** :
   - Utilisez une boucle pour envoyer les données de chaque ville au topic Kafka toutes les minutes.
   - Affichez un message de confirmation dans la console pour chaque envoi réussi.



In [26]:
from kafka import KafkaProducer
import requests
import json
import time


ImportError: cannot import name 'KafkaProducer' from 'kafka' (unknown location)

In [13]:
API_KEY = "6365575828f48fe0263c2d56aa2e323c"  

cities = ["Paris", "London", "Tokyo"] 


In [14]:
def get_weather_data(city):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
    response = requests.get(url)
    if response.status_code == 200: #200 veut dire que la requete a reussit
        return response.json()  # Retourne les données sous forme de dictionnaire
    else:
        print(f"Erreur lors de la récupération des données pour {city}")
        return None


In [15]:
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  # Connexion à Kafka
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Sérialisation des données en JSON
)


NameError: name 'KafkaProducer' is not defined

In [1]:
def send_weather_data_to_kafka():
    while True:
        for city in cities:
            # Récupérer les données météo
            weather_data = get_weather_data(city)
            if weather_data:
                # Envoi des données dans le topic Kafka
                producer.send('tp-meteo', value=weather_data)
                print(f"Données météo envoyées pour {city}: {weather_data['main']['temp']}°C")
        
        # Attendre 1 minute avant de renvoyer les données
        time.sleep(60)

# Lancer l'envoi des données
send_weather_data_to_kafka()


NameError: name 'cities' is not defined