In [None]:
import pandas as pd
# Importation des bibliothèques nécessaires
from pyspark.sql import SparkSession
import os
import sys

from dotenv import load_dotenv
from mistralai import Mistral


In [None]:
# Variables
# Paramètres de connexion MySQL
db_url = "jdbc:mysql://mysql-container:3306/wildlens?serverTimezone=UTC"
db_properties = { "user": "root", "password": "root", "driver": "com.mysql.cj.jdbc.Driver" }

In [None]:
# Initialisation de SparkSession
spark = SparkSession.builder \
    .appName("WildLens ETL - MSPR 24-25") \
    .config("spark.jars", "/installation/mysql-connector-j-9.1.0.jar") \
    .getOrCreate()

# Gestion des métadonnées des espèces

 Dans un premier temps, nous scannons les dossiers disponibles afin d'en faire un dataframe et réutiliser ces informations.
 Puis nous récupérons les métadonnées depuis l'API Mistral grâce à un prompt optimisé (optimisation du grounding, du prompt engineering)
 un sleep de 3s a été ajouté afin d'éviter de trop spam l'API

In [None]:
# Chargement des informations sur les espèces
species_info_path = './data/csv/infos_especes.csv'
species_info_df = spark.read.csv(species_info_path, sep=";",header=True, inferSchema=True)

folder_all_animals = [d for d in os.listdir("data/OpenAnimalTracks/raw_imgs") if os.path.isdir(os.path.join("data/OpenAnimalTracks/raw_imgs", d))]
df_all_animals = pd.DataFrame(folder_all_animals, columns=["Nom du dossier"])
display(df_all_animals)

In [None]:
import time

# Charger les variables d'environnement
load_dotenv()

# Récupérer la clé API
api_key = os.environ.get("API_KEY")
if not api_key:
    raise ValueError("La clé API n'est pas définie dans les variables d'environnement.")

# Modèle utilisé
model = "open-mistral-nemo"

# Initialiser le client Mistral
client = Mistral(api_key=api_key)

# Définition du fichier CSV
fichier_csv = "infos_animaux.csv"

# Supprimer le fichier s'il existe déjà
if os.path.exists(fichier_csv):
    os.remove(fichier_csv)

# Liste pour stocker les informations des animaux
donnees_animaux = []

# Récupérer les informations pour chaque animal avec un délai entre les requêtes
for animal in df_all_animals["Nom du dossier"]:
    print(f"🔍 Recherche des informations pour {animal}...")

    # Génération du prompt
    prompt = f"""
    En français, donne-moi les informations suivantes sur {animal} :
    - le nom de l'espèce,
    - la famille,
    - le nom latin,
    - la population estimée (uniquement un nombre, sans texte, sans unité, sans ponctuation sauf le point ou la virgule pour les milliers),
    - la localisation (uniquement le ou les pays, séparés par un espace).
    - la description, une courte phrase décrivant l'animal.

    Attention :
    - Ne mets pas d'explication ou de phrase, uniquement les valeurs demandées.
    - Pour la population, écris uniquement un nombre sans texte. Par exemple : 1000000 au lieu de '1 million d'espèces environ'.
    - Pour la localisation, écris uniquement le ou les pays séparés par un espace.
    - Pour la Description, je souhaite 30 mots grand maximum.
    - Le nom de l'espèce sera systématiquement traduit en français

    Présente les informations sous ce format exact :
    Espèce : <nom de l'espèce>
    Famille : <famille>
    Nom latin : <nom latin>
    Description: <description>
    Population estimée : <population estimée>
    Localisation : <localisation>
    """

    # Appel à l'API Mistral
    try:
        chat_response = client.chat.complete(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )

        # Extraire la réponse
        reponse = chat_response.choices[0].message.content

        # Parser la réponse pour extraire les valeurs
        informations = {}
        for ligne in reponse.split("\n"):
            if ":" in ligne:
                cle, valeur = ligne.split(":", 1)
                informations[cle.strip()] = valeur.strip()

        # Ajouter les informations à la liste
        donnees_animaux.append(informations)

    except Exception as e:
        print(f"⚠️ Erreur lors de la récupération des infos pour {animal} : {e}")

    # Pause pour éviter d'être banni
    print("⏳ Attente de 3 secondes avant la prochaine requête...")
    time.sleep(3)

# Création du DataFrame
df_animaux = pd.DataFrame(donnees_animaux)

display(df_animaux)

# Sauvegarde dans un CSV
df_animaux.to_csv(fichier_csv, index=False)

print(f"✅ Les informations des animaux ont été enregistrées dans {fichier_csv}.")


# Gestion des images
Dans un premier temps, nous allons faire une première analyse des images: leurs nombre par espèces (donc, par dossier), leurs tailles moyenne, leurs poid moyen, etc...

In [None]:
# Sauvegarde au format CSV
cleaned_df.write.csv("./output/data/nettoye.csv", header=True, mode="overwrite")

# Sauvegarde au format Parquet
cleaned_df.write.parquet("./output/data/nettoye.parquet", mode="overwrite")


In [None]:
# Écrire les données dans la table MySQL
cleaned_df.write \
    .jdbc(url=db_url, table="Animaux", mode="overwrite", properties=db_properties)
