# Utility Scripts

Ce notebook contient l'int√©gralit√© du code des scripts utilitaires pour le projet dataset insectes.

## tree.py

In [None]:
from pathlib import Path

def tree(
    path: str | Path,
    indent: int = 0,
    tree_file: str | Path = "tree.txt",
    _file=None,
) -> int:
    """
    Parcourt r√©cursivement l'arborescence du dossier pass√© en argument.
    Utile pour v√©rifier le bon transfert des fichiers, en utilisant un programme
    permettant la comparaison de l'arborescence export√©e avec celle ex√©cut√©e
    localement avant l'upload du dataset.
    
    Note : Ce programme n'a pas pour but de v√©rifier l'int√©grit√© des images,
    ce qui se r√©v√©lera √™tre un vrai probl√®me par la suite. En effet, avec les
    interruptions d'Internet, certaines images ont √©t√© corrompues durant le
    transfert, et il m'a fallu longtemps avant de m'en rendre compte.
    
    Args:
        path: Chemin du dossier √† analyser
        indent: Indentation pour l'affichage (interne)
        tree_file: Fichier o√π √©crire l'arborescence
        _file: Handle de fichier (interne)
    
    Returns:
        Nombre total de fichiers dans l'arborescence
    """
    
    path = Path(path)
    file_count = 0
    subfolder_counts = []

    close_file = False
    if _file is None:
        _file = open(tree_file, "w", encoding="utf-8")
        close_file = True

    try:
        entries = list(path.iterdir())
    except FileNotFoundError:
        raise FileNotFoundError(f"Chemin introuvable : {path}")
    except PermissionError:
        return 0

    files = [e for e in entries if e.is_file()]
    dirs = [e for e in entries if e.is_dir()]

    file_count = len(files)

    for d in sorted(dirs, key=lambda p: p.name.lower()):
        sub_count = tree(
            d,
            indent=indent + 4,
            tree_file=tree_file,
            _file=_file,
        )
        subfolder_counts.append((d, sub_count))

    line = " " * indent + f"{path.name}/ [{file_count} fichiers]\n"
    if _file is not None:
        _file.write(line)

    for subfolder, sub_count in subfolder_counts:
        sub_line = " " * (indent + 2) + f"{subfolder.name}/ [{sub_count} fichiers]\n"
        if _file is not None:
            _file.write(sub_line)

    total_files = file_count + sum(count for _, count in subfolder_counts)

    if close_file and _file is not None:
        _file.close()

    return total_files

## taxonomy.py

In [None]:
import os
import re
from collections import defaultdict

def parse_taxonomy(filepath):
    """
    Extrait les informations taxonomiques du chemin d'une image.
    
    Cette fonction extrait le nom du dossier parent de l'image, car le nom de
    l'image en elle-m√™me ne contient pas d'informations taxonomiques.
    
    Exemple de chemin :
    train/train/00980_Animalia_Arthropoda_Insecta_Lepidoptera_Erebidae_Arctia_virginalis/464f3a34-4c04-4eb3-afa2-6cb7444c3fa3.jpg
    
    Taxon folder: 00980_Animalia_Arthropoda_Insecta_Lepidoptera_Erebidae_Arctia_virginalis
    
    Les informations sont s√©par√©es par '_'. Les √©l√©ments 0 (ID), 1, 2, 3 (constantes)
    ne nous int√©ressent pas. Nous cr√©ons un dictionnaire avec :
    - ordre : √©l√©ment 4 (ex: Lepidoptera)
    - famille : √©l√©ment 5 (ex: Erebidae)
    - genre : √©l√©ment 6 (ex: Arctia)
    - espece : √©l√©ment 7 (ex: virginalis)
    
    Args:
        filepath: Chemin complet de l'image
        
    Returns:
        Dictionnaire avec les cl√©s 'ordre', 'famille', 'genre', 'espece' ou None si √©chec
    """
    match = re.search(r'([^/]+)/[^/]+\.jpg$', filepath)
    if not match:
        print(f"Regex ne correspond pas : {filepath}")
        return None
    folder = match.group(1)
    taxonomy_path = folder.split('_')
    if len(taxonomy_path) >= 7:
        return {
            'ordre': taxonomy_path[4],  
            'famille': taxonomy_path[5],
            'genre': taxonomy_path[6],
            'espece': taxonomy_path[7]
        }
    return None

def parse_taxonomy_folders(folder_path):
    """
    Parse la taxonomie depuis les noms des dossiers dans un r√©pertoire.
    
    Parcourt r√©cursivement le dossier donn√© et applique parse_taxonomy √† chaque
    sous-dossier (en simulant un fichier image.jpg virtuel pour extraire la taxonomie).
    Regroupe les esp√®ces rencontr√©es et liste les dossiers non parsables.
    
    Args:
        folder_path: Chemin du dossier racine √† analyser
        
    Returns:
        Tuple (species_encountered, unparsed):
        - species_encountered: dict[str, list] o√π cl√© = nom d'esp√®ce, valeur = liste de (dossier, hi√©rarchie)
        - unparsed: list des chemins de dossiers non parsables
    """
    unparsed = []
    species_encountered = defaultdict(list)
    for root, dirs_, files in os.walk(folder_path):
        for d in dirs_:
            hier = parse_taxonomy(d + "/image.jpg")
            if hier:
                species_name = hier['espece']
                species_encountered[species_name].append((d, hier))
            else:
                unparsed.append(os.path.join(root, d))
    return species_encountered, unparsed

## mapping.py

In [None]:
import os
import json
from collections import defaultdict

def build_taxa_maps(species_encountered, annotated_images, train_mini_folder):
    """
    Construit les mappings taxonomiques et g√©ographiques complets.
    
    √Ä partir des esp√®ces rencontr√©es et des annotations g√©ographiques,
    cr√©e un mapping de taxons (cl√© = tuple (ordre, famille, genre, esp√®ce))
    vers la hi√©rarchie, et une base de donn√©es g√©ographique associant
    chaque taxon √† ses coordonn√©es (lat, lon) issues des images annot√©es.
    
    Args:
        species_encountered: dict[str, list] des esp√®ces (de parse_taxonomy_folders)
        annotated_images: dict[str, tuple] chemin relatif -> (lat, lon)
        train_mini_folder: chemin du dossier train_mini
        
    Returns:
        Tuple (full_taxa_map, full_geo_db):
        - full_taxa_map: dict[tuple, dict] taxon -> hi√©rarchie
        - full_geo_db: dict[tuple, list] taxon -> liste de [lat, lon]
    """
    full_taxa_map = {}
    full_geo_db = defaultdict(list)
    
    for species_name, occurrences in species_encountered.items():
        for d, hier in occurrences:
            taxon_key = (hier['ordre'], hier['famille'], hier['genre'], species_name)
            if taxon_key not in full_taxa_map:
                full_taxa_map[taxon_key] = hier
            
            taxon_folder = os.path.join(train_mini_folder, d)
            if os.path.exists(taxon_folder):
                _, _, files = next(os.walk(taxon_folder))
                seen_rel_paths = set()
                for f in files:
                    rel_path = f"train_mini/{d}/{f}"
                    if rel_path in annotated_images and rel_path not in seen_rel_paths:
                        lat, lon = annotated_images[rel_path]
                        if lat != 0.0 and lon != 0.0:
                            full_geo_db[taxon_key].append([lat, lon])
                            seen_rel_paths.add(rel_path)
    
    return full_taxa_map, full_geo_db

def save_hierarchy_map(full_taxa_map, full_geo_db, stats, output_file):
    """
    Sauvegarde la hi√©rarchie taxonomique et les donn√©es g√©ographiques dans un fichier JSON.
    
    S√©rialise les mappings et statistiques pour une utilisation ult√©rieure,
    par exemple dans les datasets hi√©rarchiques.
    
    Args:
        full_taxa_map: dict[tuple, dict] des taxons
        full_geo_db: dict[tuple, list] des coordonn√©es
        stats: dict des statistiques
        output_file: chemin du fichier JSON de sortie
    """
    full_geo_serializable = {str(k): [[float(c[0]), float(c[1])] for c in v] 
                            for k, v in full_geo_db.items()}
    
    with open(output_file, 'w') as f:
        json.dump({
            'full_taxa_map': {str(k): v for k, v in full_taxa_map.items()},
            'full_geo_db': full_geo_serializable,
            'stats': stats
        }, f, indent=2)
    print(f"üíæ Sauvegard√©: {len(full_taxa_map)} taxons dans {output_file}")

def build_hierarchy_labels(data_dir, hierarchy_map_file):
    """
    Construit le mapping des indices ImageFolder vers les labels hi√©rarchiques.
    
    √Ä partir des classes ImageFolder et de la hi√©rarchie sauvegard√©e,
    cr√©e un dictionnaire associant chaque index de classe √† une liste
    [ordre_id, famille_id, genre_id, espece_id] pour l'entra√Ænement hi√©rarchique.
    
    Args:
        data_dir: r√©pertoire des donn√©es (contient train_mini)
        hierarchy_map_file: fichier JSON de la hi√©rarchie
        
    Returns:
        dict[int, list]: mapping index -> labels hi√©rarchiques
    """
    
    # 1. Scan dossiers ‚Üí classes ImageFolder
    train_path = os.path.join(data_dir, 'train_mini/train_mini')
    class_names = sorted([d for d in os.listdir(train_path) 
                         if os.path.isdir(os.path.join(train_path, d))])
    class_to_idx = {name: i for i, name in enumerate(class_names)}
    print(f"Classes: {len(class_to_idx)} (scan {train_path})")
    
    # 2. Hi√©rarchie depuis JSON
    with open(hierarchy_map_file) as f:
        data = json.load(f)
    
    full_taxa_map_str = data['full_taxa_map']
    unique_ordres = set()
    unique_familles = set()
    unique_genres = set()
    
    for taxon_str, hier in full_taxa_map_str.items():
        parts = taxon_str.strip("('").strip("')").split("', '")
        if len(parts) == 4:
            unique_ordres.add(parts[0])
            unique_familles.add(parts[1])
            unique_genres.add(parts[2])
    
    ordre_to_id = {name: i for i, name in enumerate(sorted(unique_ordres))}
    famille_to_id = {name: i for i, name in enumerate(sorted(unique_familles))}
    genre_to_id = {name: i for i, name in enumerate(sorted(unique_genres))}
    
    print(f"Hi√©rarchie: {len(ordre_to_id)} ordres, {len(famille_to_id)} fam., {len(genre_to_id)} genres")
    
    # 3. Mapping
    final_hierarchy = {}
    mapped = 0
    
    for class_name, class_idx in class_to_idx.items():
        parts = class_name.split('_')
        if len(parts) >= 4:
            ordre, famille, genre, espece = parts[-4:]
            
            taxon_key_str = f"('{ordre}', '{famille}', '{genre}', '{espece}')"
            
            if taxon_key_str in full_taxa_map_str:
                hier = full_taxa_map_str[taxon_key_str]
                final_hierarchy[class_idx] = [
                    ordre_to_id[hier['ordre']],
                    famille_to_id[hier['famille']],
                    genre_to_id[hier['genre']],
                    class_idx
                ]
                mapped += 1
            else:
                if mapped == 0:
                    print(f"DEBUG: cl√© g√©n√©r√©e '{taxon_key_str}' non trouv√©e.")
                    print(f"Exemple cl√© JSON: {list(full_taxa_map_str.keys())[0]}")
    
    print(f"‚úÖ {mapped}/{len(class_to_idx)} mapp√©es")
    
    # Sauvegarde
    with open('hierarchy_labels.json', 'w') as f:
        json.dump({
            'class_to_idx': class_to_idx,
            'final_hierarchy': final_hierarchy,  # {0: [2, 45, 123, 0], 1: [3, 46, 124, 1], ...}
            'id_to_name': {
                'ordre': {i: name for name, i in ordre_to_id.items()},
                'famille': {i: name for name, i in famille_to_id.items()},
                'genre': {i: name for name, i in genre_to_id.items()}
            },
            'stats': {
                'ordres': len(ordre_to_id),
                'familles': len(famille_to_id),
                'genres': len(genre_to_id),
                'total_classes': len(class_to_idx),
                'mapped': mapped
            }
        }, f, indent=2)
    
    print("üíæ hierarchy_labels.json pr√™t pour training")
    return final_hierarchy

# === USAGE ===
data_dir = '/kaggle/input/inaturalist-insects/'
hierarchy_map_file = '/kaggle/working/hierarchy_map.json'
final_hierarchy = build_hierarchy_labels(data_dir, hierarchy_map_file)

print("\nExemples:")
for idx in range(25):
    labels = final_hierarchy.get(idx)
    print(f"Class {idx}: {labels}") # [ordre_id, famille_id, genre_id, espece_id]

## corruption_scan.py

In [None]:
import os
import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image

def verify_image_validity(full_path):
    """
    V√©rifie l'int√©grit√© d'une image.
    
    Cette fonction a √©t√© ajout√©e apr√®s un premier entra√Ænement complet, car nous
    exp√©rimentions des erreurs difficiles √† trouver : la corruption d'images,
    qui ayant servi √† entra√Æner le mod√®le, introduisait des erreurs dans les
    pr√©dictions (par exemple, ordre et famille corrects, mais genre et esp√®ce
    n'existant pas sous cette famille).
    
    Nous ne pouvons pas √©liminer ces images du dataset sur Kaggle car le dossier
    d'input est en lecture seule. Pour contourner le probl√®me, nous √©crivons
    dans un fichier la liste des fichiers corrompus √† l'avance, puis lors de
    l'entra√Ænement final, nous ajoutons une v√©rification pour ne pas traiter
    les images dont le chemin appara√Æt dans cette liste.
    
    Args:
        full_path: Chemin complet de l'image √† v√©rifier
        
    Returns:
        Tuple (bool, str): True si valide, False sinon, avec message d'erreur
    """
    try:
        if not os.path.exists(full_path):
            return False, "FILE_MISSING"
        
        with Image.open(full_path) as img:
            img.verify()
        
        img = Image.open(full_path).convert('RGB')
        img.thumbnail((64, 64))
        img = img.resize((224, 224))
        
        return True, None
    except Exception as e:
        return False, str(type(e).__name__) + ": " + str(e)[:50]
    
def scan_corrupted_images(root_folder, max_workers=4):
    """
    Scanne un dossier pour d√©tecter les images corrompues.
    
    Parcourt r√©cursivement le dossier, v√©rifie chaque image JPG/JPEG
    avec verify_image_validity, et sauvegarde la liste des fichiers
    corrompus dans un fichier texte. Utilise un ThreadPool pour
    parall√©liser les v√©rifications.
    
    Args:
        root_folder: dossier racine √† scanner
        max_workers: nombre de threads pour la parall√©lisation
        
    Returns:
        Tuple (corrupted, log_file):
        - corrupted: list des chemins relatifs des images corrompues
        - log_file: chemin du fichier log des erreurs
    """
    start_time = time.time()
    
    image_paths = []
    for root, _, files in os.walk(root_folder):
        for f in files:
            if f.lower().endswith(('.jpg', '.jpeg')):  # JPG seulement
                full_path = os.path.join(root, f)
                rel_path = os.path.relpath(full_path, root_folder)
                image_paths.append(rel_path)
    
    total_files = len(image_paths)
    print(f"Scan {total_files} JPG dans {root_folder}")
    
    if total_files == 0:
        return [], f"/kaggle/working/corrupted_{os.path.basename(root_folder)}.txt"
    
    corrupted = []
    error_types = defaultdict(int)
    
    # ThreadPool (fichiers I/O)
    full_paths = [os.path.join(root_folder, p) for p in image_paths]
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(verify_image_validity, fp): fp for fp in full_paths}
        
        for future in as_completed(futures):
            fp = futures[future]
            is_valid, error = future.result()
            
            if not is_valid:
                rel_path = os.path.relpath(fp, root_folder)
                corrupted.append(rel_path)
                error_types[error] += 1
                if len(corrupted) < 10:  # 10 premiers
                    print(f"‚ùå {rel_path[:50]}: {error}")
    
    # Stats erreurs
    print("\nTYPES D'ERREURS:")
    for err, count in sorted(error_types.items(), key=lambda x: x[1], reverse=True):
        print(f"  {err}: {count}")
    
    # Sauvegarde
    output_file = f"/kaggle/working/corrupted_{os.path.basename(root_folder)}.txt"
    rate = len(corrupted) / total_files * 100
    
    with open(output_file, 'w') as f:
        f.write(f"# Corrompus: {len(corrupted)}/{total_files} ({rate:.1f}%)\n")
        f.write("# Erreurs:\n")
        for err, count in error_types.items():
            f.write(f"# {err}: {count}\n")
        f.write("\n")
        for path in corrupted:
            f.write(path + '\n')
    
    elapsed = time.time() - start_time
    print(f"‚úÖ {len(corrupted)}/{total_files} ({rate:.1f}%) en {elapsed:.1f}s")
    
    return corrupted, output_file