# On lance l'API

In [1]:
import requests

# Configuration
API_KEY = '68a331bbd87eefddf393ce4d32db7509'  # Clé API
CITIES = ['Paris', 'London', 'Tokyo']  # Villes cibles


# Fonction pour récupérer les données météo
def get_weather_data(city):
    weather = []
    for i in city:
        url = f"http://api.openweathermap.org/data/2.5/weather?q={i}&appid={API_KEY}&units=metric"
        response = requests.get(url)
        weather.append(response.json() if response.status_code == 200 else None)
    return weather

get_weather_data(CITIES)


[{'coord': {'lon': 2.3488, 'lat': 48.8534},
  'weather': [{'id': 804,
    'main': 'Clouds',
    'description': 'overcast clouds',
    'icon': '04d'}],
  'base': 'stations',
  'main': {'temp': 9.47,
   'feels_like': 8.15,
   'temp_min': 8.88,
   'temp_max': 10.1,
   'pressure': 1028,
   'humidity': 79,
   'sea_level': 1028,
   'grnd_level': 1018},
  'visibility': 10000,
  'wind': {'speed': 2.57, 'deg': 50},
  'clouds': {'all': 100},
  'dt': 1731662848,
  'sys': {'type': 2,
   'id': 2012208,
   'country': 'FR',
   'sunrise': 1731654011,
   'sunset': 1731687034},
  'timezone': 3600,
  'id': 2988507,
  'name': 'Paris',
  'cod': 200},
 {'coord': {'lon': -0.1257, 'lat': 51.5085},
  'weather': [{'id': 804,
    'main': 'Clouds',
    'description': 'overcast clouds',
    'icon': '04d'}],
  'base': 'stations',
  'main': {'temp': 8.37,
   'feels_like': 6.84,
   'temp_min': 7.49,
   'temp_max': 9.24,
   'pressure': 1029,
   'humidity': 79,
   'sea_level': 1029,
   'grnd_level': 1024},
  'visibilit

# On lance le producer

Il faut lancer le consumer et le producer dans deux fichiers différents

In [2]:
# Exemple pour collecter des données dans un topic Kafka
from kafka import KafkaProducer
import json

# Configuration
KAFKA_SERVER = 'localhost:9092'

# Initialisation du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=[KAFKA_SERVER],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

data = get_weather_data(CITIES)

# Envoi des données en continu

producer.send("tp-meteo",  value=data)

<kafka.producer.future.FutureRecordMetadata at 0x77d0377bfc70>

# On lance le consumer

In [9]:
from kafka import KafkaConsumer
import psycopg2
import json
from datetime import datetime

# Configuration de Kafka
KAFKA_TOPIC = 'tp-meteo'
KAFKA_SERVER = 'localhost:9092'
PASSWORD_DB = "rTZCy1iXwU2O"

# Configuration de la base de données Neon PostgreSQL
db_connection = psycopg2.connect("postgresql://meteodb_owner:rTZCy1iXwU2O@ep-still-star-a2pop1yt.eu-central-1.aws.neon.tech/meteodb?sslmode=require")
cursor = db_connection.cursor()

# Initialisation du consumer Kafka
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("En attente de messages...")

# Consommer les messages et insérer dans la base de données
for message in consumer:
    
    data_list = message.value  # Assume data_list is a list of weather records

    for data in data_list:
        # Extraire les informations nécessaires pour chaque ville
        ville = data.get('name')
        latitude = data['coord']['lat']
        longitude = data['coord']['lon']
        temperature = data['main']['temp']
        humidite = data['main']['humidity']
        vent = data['wind']['speed']
        
        # Récupérer la date et l'heure actuelles
        date = datetime.now()

        # Insérer les données dans la table meteo
        try:
            cursor.execute(
                """
                INSERT INTO meteo (ville, latitude, longitude, temperature, humidite, vent, date)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                """,
                (ville, latitude, longitude, temperature, humidite, vent, date)
            )
            db_connection.commit()
            print(f"Données insérées pour {ville}: Temp={temperature}°C, Humidité={humidite}%, Vent={vent} m/s, Date={date}")
        except Exception as e:
            print(f"Erreur lors de l'insertion pour {ville}: {e}")
            db_connection.rollback()

# Fermer la connexion à la base de données à la fin du traitement
cursor.close()
db_connection.close()


En attente de messages...
Données insérées pour Paris: Temp=9.47°C, Humidité=79%, Vent=2.57 m/s, Date=2024-11-15 09:43:25.012177
Données insérées pour London: Temp=8.37°C, Humidité=79%, Vent=2.57 m/s, Date=2024-11-15 09:43:25.070533
Données insérées pour Tokyo: Temp=17.23°C, Humidité=83%, Vent=4.12 m/s, Date=2024-11-15 09:43:25.122939
Données insérées pour Paris: Temp=9.54°C, Humidité=78%, Vent=2.57 m/s, Date=2024-11-15 09:43:25.174895
Données insérées pour London: Temp=8.42°C, Humidité=79%, Vent=2.57 m/s, Date=2024-11-15 09:43:25.226410
Données insérées pour Tokyo: Temp=17.23°C, Humidité=83%, Vent=4.12 m/s, Date=2024-11-15 09:43:25.278021
Données insérées pour Paris: Temp=9.54°C, Humidité=78%, Vent=2.57 m/s, Date=2024-11-15 09:44:09.876745
Données insérées pour London: Temp=8.42°C, Humidité=79%, Vent=2.57 m/s, Date=2024-11-15 09:44:09.928695
Données insérées pour Tokyo: Temp=17.23°C, Humidité=83%, Vent=4.12 m/s, Date=2024-11-15 09:44:09.980559
Données insérées pour Paris: Temp=9.54°C,

KeyboardInterrupt: 