In [4]:
import pandas as pd
import numpy as np
from pathlib import Path
import sys

def charger_siren_valides(fichier_stock_ul):
    """
    Charge le fichier de référence contenant tous les SIREN valides
    """
    print("Chargement du fichier de référence des SIREN...")

    # Colonnes à garder
    colonnes = [
        'siren', 'nicSiegeUniteLegale', 'denominationUniteLegale',
        'denominationUsuelle1UniteLegale', 'denominationUsuelle2UniteLegale',
        'denominationUsuelle3UniteLegale', 'dateCreationUniteLegale',
        'activitePrincipaleUniteLegale', 'categorieEntreprise',
        'trancheEffectifsUniteLegale'
    ]

    # Lire le fichier CSV
    stock_ul = pd.read_csv(fichier_stock_ul, usecols=colonnes, dtype={'siren': str, 'nicSiegeUniteLegale': str})

    # Formater les SIREN avec des zéros à gauche si nécessaire
    stock_ul['siren'] = stock_ul['siren'].str.zfill(9)

    # Créer un set des SIREN valides pour une recherche rapide
    siren_valides = set(stock_ul['siren'].unique())

    print(f"Nombre de SIREN valides chargés : {len(siren_valides)}")

    return siren_valides, stock_ul

def charger_fichier_a_verifier(fichier_path):
    """
    Charge le fichier contenant les SIREN à vérifier
    """
    print(f"Chargement du fichier à vérifier : {fichier_path}")

    # Déterminer le type de fichier
    fichier_path = Path(fichier_path)
    
    try:
        if fichier_path.suffix.lower() in ['.xlsx', '.xls']:
            # Lire le fichier Excel
            df = pd.read_excel(fichier_path, dtype={
                'siren_declarant': str,
                'siren_deposant': str,
                'mere_final': str
            })
        elif fichier_path.suffix.lower() == '.csv':
            # Lire le fichier CSV
            df = pd.read_csv(fichier_path, dtype={
                'siren_declarant': str,
                'siren_deposant': str,
                'mere_final': str
            }, sep=',', on_bad_lines='warn')
        else:
            raise ValueError(f"Format de fichier non supporté : {fichier_path.suffix}")
            
    except pd.errors.ParserError as e:
        print(f"Erreur de parsing du fichier: {e}")
        print("Veuillez vérifier le format du fichier, notamment les délimiteurs et la cohérence du nombre de colonnes par ligne.")
        sys.exit(1)
    except Exception as e:
        print(f"Erreur lors du chargement du fichier: {e}")
        sys.exit(1)

    # Formater les SIREN avec des zéros à gauche si nécessaire
    for col in ['siren_declarant', 'siren_deposant', 'mere_final']:
        if col in df.columns:
            df[col] = df[col].astype(str).str.replace('\.0', '', regex=True)
            df[col] = df[col].str.zfill(9)

    print(f"Nombre de lignes dans le fichier : {len(df)}")
    print(f"Colonnes disponibles : {list(df.columns)}")

    return df

def verifier_siren(df, siren_valides):
    """
    Vérifie la validité des SIREN dans le dataframe
    """
    rapport = {}

    colonnes_siren = ['siren_declarant', 'siren_deposant', 'mere_final']

    for col in colonnes_siren:
        if col in df.columns:
            print(f"\nVérification des {col}...")

            # Obtenir les SIREN uniques dans cette colonne
            siren_uniques = df[col].dropna().unique()

            # Vérifier chaque SIREN
            siren_invalides = []
            for siren in siren_uniques:
                if str(siren) not in siren_valides and str(siren) not in ['nan', '000000000']:
                    siren_invalides.append(siren)

            # Créer un rapport pour cette colonne
            rapport[col] = {
                'total_siren_uniques': len(siren_uniques),
                'siren_invalides': siren_invalides,
                'nombre_invalides': len(siren_invalides),
                'pourcentage_invalides': (len(siren_invalides) / len(siren_uniques) * 100) if len(siren_uniques) > 0 else 0
            }

            # Marquer les lignes avec des SIREN invalides
            df[f'{col}_valide'] = df[col].apply(
                lambda x: False if pd.notna(x) and str(x) in [str(s) for s in siren_invalides] else True
            )

            print(f"  - SIREN uniques : {len(siren_uniques)}")
            print(f"  - SIREN invalides : {len(siren_invalides)}")
            print(f"  - Pourcentage invalides : {rapport[col]['pourcentage_invalides']:.2f}%")

        else:
            print(f"\nAttention : La colonne '{col}' n'existe pas dans le fichier")

    return rapport, df

def generer_rapport(rapport, df, fichier_sortie='rapport_validation_siren.txt'):
    """
    Génère un rapport de validation
    """
    with open(fichier_sortie, 'w', encoding='utf-8') as f:
        f.write("RAPPORT DE VALIDATION DES SIREN\n")
        f.write("=" * 50 + "\n\n")

        for col, stats in rapport.items():
            f.write(f"\n{col.upper()}\n")
            f.write("-" * 30 + "\n")
            f.write(f"SIREN uniques trouvés : {stats['total_siren_uniques']}\n")
            f.write(f"SIREN invalides : {stats['nombre_invalides']}\n")
            f.write(f"Pourcentage d'invalides : {stats['pourcentage_invalides']:.2f}%\n")

            if stats['siren_invalides']:
                f.write("\nListe des SIREN invalides :\n")
                for siren in sorted(stats['siren_invalides']):
                    f.write(f"  - {siren}\n")

        # Résumé global
        f.write("\n\nRÉSUMÉ GLOBAL\n")
        f.write("=" * 50 + "\n")
        total_invalides = sum(stats['nombre_invalides'] for stats in rapport.values())
        f.write(f"Total de SIREN invalides (toutes colonnes) : {total_invalides}\n")

        # Lignes avec au moins un SIREN invalide
        colonnes_validation = [f'{col}_valide' for col in rapport.keys() if f'{col}_valide' in df.columns]
        if colonnes_validation:
            lignes_avec_erreur = df[~df[colonnes_validation].all(axis=1)]
            f.write(f"Nombre de lignes avec au moins un SIREN invalide : {len(lignes_avec_erreur)}\n")

    print(f"\nRapport généré : {fichier_sortie}")

def exporter_lignes_invalides(df, rapport, fichier_sortie='lignes_siren_invalides.csv'):
    """
    Exporte les lignes contenant des SIREN invalides
    """
    colonnes_validation = [f'{col}_valide' for col in rapport.keys() if f'{col}_valide' in df.columns]
    
    if colonnes_validation:
        lignes_invalides = df[~df[colonnes_validation].all(axis=1)]

        if len(lignes_invalides) > 0:
            lignes_invalides.to_csv(fichier_sortie, index=False, encoding='utf-8')
            print(f"Lignes avec SIREN invalides exportées : {fichier_sortie}")
            return len(lignes_invalides)
        else:
            print("Aucune ligne avec SIREN invalide trouvée.")
            return 0
    else:
        print("Aucune colonne de validation trouvée.")
        return 0

def main():
    # Chemins des fichiers - à adapter selon votre configuration
    fichier_stock_ul = "C://Users//msamb//Documents//StockUniteLegale_utf8.csv"
    fichier_a_verifier = "C://Users//msamb//Documents//new_millesime_CIR_2021-suite.xlsx"

    try:
        # 1. Charger les SIREN valides
        siren_valides, stock_ul_df = charger_siren_valides(fichier_stock_ul)

        # 2. Charger le fichier à vérifier
        df = charger_fichier_a_verifier(fichier_a_verifier)

        # 3. Vérifier les SIREN
        rapport, df_verifie = verifier_siren(df, siren_valides)

        # 4. Générer le rapport
        generer_rapport(rapport, df_verifie)

        # 5. Exporter les lignes avec des SIREN invalides

        nb_lignes_invalides = exporter_lignes_invalides(df_verifie, rapport)

        # 6. Résumé final
        print(f"\n{'='*50}")
        print("RÉSUMÉ FINAL")
        print(f"{'='*50}")
        print(f"Total de lignes traitées : {len(df)}")
        print(f"Lignes avec SIREN invalides : {nb_lignes_invalides}")
        print(f"Pourcentage de lignes valides : {((len(df) - nb_lignes_invalides) / len(df) * 100):.2f}%")
        print("\n✓ Vérification terminée avec succès!")

    except FileNotFoundError as e:
        print(f"Erreur : Fichier non trouvé - {e}")
        print("Vérifiez que les chemins des fichiers sont corrects.")
        sys.exit(1)
    except Exception as e:
        print(f"Erreur inattendue : {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)

if __name__ == "__main__":
    main()

  df[col] = df[col].astype(str).str.replace('\.0', '', regex=True)


Chargement du fichier de référence des SIREN...


  stock_ul = pd.read_csv(fichier_stock_ul, usecols=colonnes, dtype={'siren': str, 'nicSiegeUniteLegale': str})


Nombre de SIREN valides chargés : 28236992
Chargement du fichier à vérifier : C://Users//msamb//Documents//new_millesime_CIR_2021-suite.xlsx
Nombre de lignes dans le fichier : 4554
Colonnes disponibles : ['ANNEE', 'DESIGN', 'COMPLT_DESIGN', 'FJ', 'siren_declarant', 'siren_deposant', 'siren_tete_groupe', 'ADR1', 'ADR2', 'ADR3', 'ADR4', 'ADR5', 'ADR6', 'APE', 'ACTIVITE', 'FIRST_DRD_N_1', 'ESE_NVEL', 'DATE_DBT_ACTI', 'PME', 'NATUR_ESE', 'CAHT', 'NBR_SAL', 'GRP', 'NBR_SCT_GRP', 'NON_IS', 'JEI', 'MT_CR_IMP_GRP', 'MT_CIR_GRP', 'NBR_CHERCH_TECH', 'NB_JD', 'DOT_AMORT_IMMO', 'DOT_AMORT_IMMO_SINISTR', 'DEP_CHERCH_TECH', 'REM_SAL_INV', 'DEP_JD', 'OTR_DEP_FONCT', 'MT_DEP_FONCT_TOT', 'FRAIS_BREV_COV', 'DEP_MAINT_BREV_COV', 'DOT_AMORT_BREV', 'DEP_NORMALI', 'PRIM_COTIZ', 'DEP_VEIL_TECHNO', 'MT_TOT_RD_1', 'DEP_EXT_OPR_NON_LIE_FR', 'DEP_EXT_OPR_TOT', 'DEP_EXT_LIE_FR', 'DEP_EXT_LIE_ETR', 'DEP_EXT_NON_LIE_FR', 'DEP_EXT_NON_LIE_ETR', 'MT_TOT_DEP_EXT_ORG_AGREE', 'PLAF_OP_EXT', 'MT_TOT_OP_SOUS_TRAIT', 'PLAF

In [2]:
def verifier_siren_luhn(siren):
    """
    Vérifie la validité d'un SIREN en utilisant l'algorithme de Luhn
    """
    # Convertir en chaîne et s'assurer qu'il a 9 chiffres
    siren_str = str(siren).strip()
    
    # Vérifier que c'est bien un nombre de 9 chiffres
    if len(siren_str) != 9 or not siren_str.isdigit():
        return False, "Le SIREN doit contenir exactement 9 chiffres"
    
    # Appliquer l'algorithme de Luhn
    somme = 0
    
    # Parcourir les chiffres de droite à gauche
    for i in range(9):
        chiffre = int(siren_str[8-i])
        position = i + 1
        
        # Si position paire (2, 4, 6, 8), multiplier par 2
        if position % 2 == 0:
            resultat = chiffre * 2
            # Si résultat > 9, soustraire 9
            if resultat > 9:
                resultat -= 9
        else:
            resultat = chiffre
        
        somme += resultat
        
        print(f"Position {position}: chiffre {chiffre} → {resultat}")
    
    print(f"\nSomme totale: {somme}")
    print(f"Modulo 10: {somme % 10}")
    
    # Le SIREN est valide si la somme est divisible par 10
    est_valide = (somme % 10) == 0
    
    return est_valide, f"Le SIREN {'est valide' if est_valide else 'est invalide'}"

# Test avec le SIREN fourni
siren_test = "590255741"
print(f"Vérification du SIREN: {siren_test}")
print("="*40)

# Affichage détaillé des positions
print("Positions (de droite à gauche):")
print("Position: 9 8 7 6 5 4 3 2 1")
print(f"Chiffres: {' '.join(siren_test)}")
print("\nCalcul détaillé:")

valide, message = verifier_siren_luhn(siren_test)
print(f"\n{message}")

# Vérification manuelle pour comparaison
print("\n" + "="*40)
print("Vérification manuelle détaillée:")
siren = "590255741"
positions = list(range(9, 0, -1))
calculs = []

for i, (pos, chiffre) in enumerate(zip(positions, siren)):
    c = int(chiffre)
    if (9-i) % 2 == 0:  # positions paires (2, 4, 6, 8)
        result = c * 2
        if result > 9:
            result -= 9
        calculs.append(f"Pos {pos}: {c} × 2 = {c*2} → {result}")
    else:
        result = c
        calculs.append(f"Pos {pos}: {c} → {result}")
    
for calc in calculs:
    print(calc)
    
# Test avec d'autres SIREN pour comparaison
print("\n" + "="*40)
print("Tests supplémentaires:")

autres_sirens = [
    "481132744",  # SIREN valide d'exemple
    "590255741",  # SIREN invalide
    "074544107",  # SIREN valide d'exemple
]

for siren in autres_sirens:
    valide, msg = verifier_siren_luhn(siren)
    print(f"\nSIREN {siren}: {msg}")

Vérification du SIREN: 590255741
Positions (de droite à gauche):
Position: 9 8 7 6 5 4 3 2 1
Chiffres: 5 9 0 2 5 5 7 4 1

Calcul détaillé:
Position 1: chiffre 1 → 1
Position 2: chiffre 4 → 8
Position 3: chiffre 7 → 7
Position 4: chiffre 5 → 1
Position 5: chiffre 5 → 5
Position 6: chiffre 2 → 4
Position 7: chiffre 0 → 0
Position 8: chiffre 9 → 9
Position 9: chiffre 5 → 5

Somme totale: 40
Modulo 10: 0

Le SIREN est valide

Vérification manuelle détaillée:
Pos 9: 5 → 5
Pos 8: 9 × 2 = 18 → 9
Pos 7: 0 → 0
Pos 6: 2 × 2 = 4 → 4
Pos 5: 5 → 5
Pos 4: 5 × 2 = 10 → 1
Pos 3: 7 → 7
Pos 2: 4 × 2 = 8 → 8
Pos 1: 1 → 1

Tests supplémentaires:
Position 1: chiffre 4 → 4
Position 2: chiffre 4 → 8
Position 3: chiffre 7 → 7
Position 4: chiffre 2 → 4
Position 5: chiffre 3 → 3
Position 6: chiffre 1 → 2
Position 7: chiffre 1 → 1
Position 8: chiffre 8 → 7
Position 9: chiffre 4 → 4

Somme totale: 40
Modulo 10: 0

SIREN 481132744: Le SIREN est valide
Position 1: chiffre 1 → 1
Position 2: chiffre 4 → 8
Position 3:

In [5]:
import pandas as pd

# Lire le fichier Excel
df = pd.read_excel('C://Users//msamb//Documents//new_millesime_CIR_2021-suite.xlsx')

# Définir les colonnes clés pour identifier les doublons
colonnes_clés = ['ANNEE', 'siren_declarant', 'CAHT', 'MT_TOT_CIR_CI_COLL_CII_DOM','mere_final']

# Supprimer les doublons en gardant la première occurrence
df_sans_doublons = df.drop_duplicates(subset=colonnes_clés, keep='first')

# Ou si vous voulez garder la dernière occurrence :
# df_sans_doublons = df.drop_duplicates(subset=colonnes_clés, keep='last')

# Sauvegarder le résultat dans un nouveau fichier
df_sans_doublons.to_excel('C://Users//msamb//Documents//new_millesime_CIR_2021-suite_sans_doublon.xlsx', index=False)

# Afficher quelques statistiques
print(f"Nombre de lignes original : {len(df)}")
print(f"Nombre de lignes après suppression des doublons : {len(df_sans_doublons)}")
print(f"Nombre de doublons supprimés : {len(df) - len(df_sans_doublons)}")

Nombre de lignes original : 4554
Nombre de lignes après suppression des doublons : 4501
Nombre de doublons supprimés : 53


In [6]:
import pandas as pd

def eliminer_doublons_specifiques(chemin_fichier_excel, chemin_sortie=None):
    """
    Élimine les doublons selon les critères spécifiés :
    - Si un siren_declarant apparaît 2 fois avec le même MT_TOT_CIR_CI_COLL_CII
    - Vérifie si deposant == declarant et type est 'Fille' ou 'IND'
    - Supprime ces lignes
    """
    
    # Lire le fichier Excel
    df = pd.read_excel(chemin_fichier_excel)
    
    # Afficher les informations initiales
    print(f"Nombre de lignes initial : {len(df)}")
    
    # Identifier les doublons potentiels
    # Grouper par siren_declarant et MT_TOT_CIR_CI_COLL_CII
    duplicates = df.groupby(['siren_declarant', 'MT_TOT_CIR_CI_COLL_CII']).size()
    duplicates = duplicates[duplicates >= 2]
    
    # Créer une liste des lignes à supprimer
    lignes_a_supprimer = []
    
    # Pour chaque groupe de doublons
    for (siren_dec, montant), count in duplicates.items():
        # Filtrer les lignes correspondantes
        mask = (df['siren_declarant'] == siren_dec) & (df['MT_TOT_CIR_CI_COLL_CII'] == montant)
        lignes_concernees = df[mask]
        
        # Pour chaque ligne du groupe
        for index, ligne in lignes_concernees.iterrows():
            # Vérifier si deposant == declarant et type est 'Fille' ou 'IND' (insensible à la casse)
            if (ligne['siren_deposant'] == ligne['siren_declarant'] and 
                str(ligne['type_final']).upper() in ['FILLE', 'IND']):
                lignes_a_supprimer.append(index)
    
    # Supprimer les lignes identifiées
    df_nettoye = df.drop(lignes_a_supprimer)
    
    # Afficher les résultats
    print(f"Nombre de lignes supprimées : {len(lignes_a_supprimer)}")
    print(f"Nombre de lignes final : {len(df_nettoye)}")
    
    # Sauvegarder le résultat
    if chemin_sortie is None:
        chemin_sortie = chemin_fichier_excel.replace('.xlsx', '_nettoye.xlsx')
    
    df_nettoye.to_excel(chemin_sortie, index=False)
    print(f"Fichier sauvegardé : {chemin_sortie}")
    
    return df_nettoye, lignes_a_supprimer

# Fonction utilitaire pour visualiser les doublons avant suppression
def analyser_doublons(chemin_fichier_excel):
    """
    Analyse et affiche les doublons potentiels avant suppression
    """
    df = pd.read_excel(chemin_fichier_excel)
    
    # Identifier les doublons
    duplicates = df.groupby(['siren_declarant', 'MT_TOT_CIR_CI_COLL_CII']).size()
    duplicates = duplicates[duplicates >= 2]
    
    print("Analyse des doublons potentiels :")
    print("-" * 50)
    
    for (siren_dec, montant), count in duplicates.items():
        mask = (df['siren_declarant'] == siren_dec) & (df['MT_TOT_CIR_CI_COLL_CII'] == montant)
        lignes = df[mask]
        
        print(f"\nSIREN Déclarant : {siren_dec}, Montant : {montant}")
        print(f"Nombre d'occurrences : {count}")
        
        for index, ligne in lignes.iterrows():
            print(f"  - Index : {index}")
            print(f"    Déposant : {ligne['siren_deposant']}")
            print(f"    Type : {ligne['type_final']}")
            print(f"    Déposant == Déclarant : {ligne['siren_deposant'] == ligne['siren_declarant']}")
            print(f"    À supprimer : {ligne['siren_deposant'] == ligne['siren_declarant'] and str(ligne['type_final']).upper() in ['FILLE', 'IND']}")

# Utilisation
if __name__ == "__main__":
    # Remplacer par le chemin de votre fichier
    chemin_fichier = "C://Users//msamb//Documents//new_millesime_CIR_2021-suite_sans_doublon.xlsx"
    
    # Analyser les doublons avant suppression
    analyser_doublons(chemin_fichier)
    
    # Éliminer les doublons
    df_nettoye, lignes_supprimees = eliminer_doublons_specifiques(chemin_fichier)
    
    # Optionnel : afficher quelques statistiques
    print("\nStatistiques après nettoyage :")
    print(f"Types restants : {df_nettoye['type_final'].value_counts()}")

Analyse des doublons potentiels :
--------------------------------------------------

SIREN Déclarant : 332690908, Montant : 546781.0
Nombre d'occurrences : 2
  - Index : 271
    Déposant : 332690908
    Type : IND
    Déposant == Déclarant : True
    À supprimer : True
  - Index : 1237
    Déposant : 500625835
    Type : FILLE
    Déposant == Déclarant : False
    À supprimer : False

SIREN Déclarant : 352684377, Montant : 170146.0
Nombre d'occurrences : 2
  - Index : 442
    Déposant : 352684377
    Type : FILLE
    Déposant == Déclarant : True
    À supprimer : True
  - Index : 4159
    Déposant : 888479748
    Type : FILLE
    Déposant == Déclarant : False
    À supprimer : False

SIREN Déclarant : 414227918, Montant : 0.0
Nombre d'occurrences : 7
  - Index : 753
    Déposant : 414227918
    Type : IND
    Déposant == Déclarant : True
    À supprimer : True
  - Index : 893
    Déposant : 438224982
    Type : FILLE
    Déposant == Déclarant : False
    À supprimer : False
  - Index 

In [7]:


def eliminer_doublons_montants_proches(chemin_fichier_excel, chemin_sortie=None, tolerance=2):
    """
    Élimine les doublons où le siren_declarant apparaît plusieurs fois avec des montants
    MT_TOT_CIR_CI_COLL_CII très proches (différence <= tolerance).
    Supprime les lignes où deposant == declarant et type est 'Fille' ou 'IND'.
    """
    df = pd.read_excel(chemin_fichier_excel)
    print(f"Nombre de lignes initial : {len(df)}")
    
    # Trier pour faciliter la comparaison
    df_sorted = df.sort_values(['siren_declarant', 'MT_TOT_CIR_CI_COLL_CII']).reset_index(drop=True)
    lignes_a_supprimer = set()
    
    # Grouper par siren_declarant
    for siren, group in df_sorted.groupby('siren_declarant'):
        montants = group['MT_TOT_CIR_CI_COLL_CII'].values
        indices = group.index.values
        for i in range(len(montants)):
            for j in range(i+1, len(montants)):
                if abs(montants[i] - montants[j]) <= tolerance:
                    # Vérifier les conditions pour les deux lignes
                    for idx in [indices[i], indices[j]]:
                        ligne = df.loc[idx]
                        if (ligne['siren_deposant'] == ligne['siren_declarant'] and 
                            str(ligne['type_final']).upper() in ['FILLE', 'IND']):
                            lignes_a_supprimer.add(idx)
    
    df_nettoye = df.drop(list(lignes_a_supprimer))
    print(f"Nombre de lignes supprimées (montants proches) : {len(lignes_a_supprimer)}")
    print(f"Nombre de lignes final : {len(df_nettoye)}")
    
    if chemin_sortie is None:
        chemin_sortie = chemin_fichier_excel.replace('.xlsx', '_nettoye_montants_proches.xlsx')
    df_nettoye.to_excel(chemin_sortie, index=False)
    print(f"Fichier sauvegardé : {chemin_sortie}")
    return df_nettoye, lignes_a_supprimer

# Exemple d'utilisation :
df_nettoye, lignes_supprimees = eliminer_doublons_montants_proches("C://Users//msamb//Documents//new_millesime_CIR_2021-suite_sans_doublon_nettoye.xlsx")

Nombre de lignes initial : 4496
Nombre de lignes supprimées (montants proches) : 8
Nombre de lignes final : 4488
Fichier sauvegardé : C://Users//msamb//Documents//new_millesime_CIR_2021-suite_sans_doublon_nettoye_nettoye_montants_proches.xlsx


In [8]:
import numpy as np
import pandas as pd

def corriger_siren_similaires(df, col, valeur_cible='057501256'):
    """
    Corrige les SIREN similaires pour uniformiser sur une valeur cible
    """
    # Si la colonne n'existe pas, sortir
    if col not in df.columns:
        return df
    
    # Normaliser la valeur cible
    valeur_cible_norm = format_siren(valeur_cible)
    base_digits = valeur_cible_norm.lstrip('0')  # Chiffres de base sans les zéros
    
    # Stocker les valeurs à corriger
    to_correct = []
    
    # Chercher des patterns similaires (avec des chiffres ajoutés/manquants)
    for val in df[col].unique():
        if pd.isna(val) or val == '':
            continue
            
        val_str = str(val).strip()
        
        # Cas 1: SIREN identique mais formatage différent (zéros devant)
        if format_siren(val_str) == valeur_cible_norm and val_str != valeur_cible_norm:
            to_correct.append(val_str)
        
        # Cas 2: SIREN avec des chiffres supplémentaires à la fin
        elif val_str.startswith(base_digits) and len(val_str) > len(base_digits):
            to_correct.append(val_str)
        
        # Cas 3: SIREN avec des zéros supplémentaires au début
        elif val_str.endswith(base_digits) and val_str.startswith('0'):
            to_correct.append(val_str)
        
        # Cas 4: Le cas spécifique 575012560 (chiffres similaires mais 0 en trop à la fin)
        elif val_str == '575012560' and valeur_cible_norm == '057501256':
            to_correct.append(val_str)
    
    # Appliquer les corrections
    for val in to_correct:
        df.loc[df[col] == val, col] = valeur_cible_norm
        print(f"    Corrigé: {val} -> {valeur_cible_norm}")
    


def format_siren(siren):
    """
    Formate un SIREN sur 9 caractères, prend les 9 premiers si plus long, ajoute des zéros devant si plus court
    Traite également les cas spécifiques comme '575012560'
    """
    if pd.isna(siren) or siren == '' or str(siren).lower() == 'nan':
        return ''
    
    # Convertir en string et nettoyer
    siren_str = str(siren).strip()
    # Garder seulement les chiffres
    siren_str = ''.join(filter(str.isdigit, siren_str))
    
    if not siren_str:
        return ''
    
    # Cas spécifique: si c'est '575012560', le corriger en '057501256'
    if siren_str == '575012560':
        return '057501256'
    
    # Si plus de 9 chiffres (cas SIRET), prendre les 9 premiers
    if len(siren_str) > 9:
        siren_str = siren_str[:9]
    
    # Toujours formater sur 9 caractères
    return siren_str.zfill(9)

def diagnostiquer_siren_problematiques(df):
    """
    Diagnostique les SIREN problématiques dans le dataframe
    """
    print("\n=== Diagnostic des SIREN problématiques ===")
    
    siren_cols = ['siren_declarant', 'siren_deposant', 'siren_tete_groupe', 'mere_final']
    
    for col in siren_cols:
        if col in df.columns:
            print(f"\n--- Colonne {col} ---")
            
            # Valeurs uniques
            unique_vals = df[col].unique()
            print(f"Nombre de valeurs uniques: {len(unique_vals)}")
            
            # Recherche de patterns similaires
            patterns = {}
            for val in unique_vals:
                if pd.notna(val) and val != '':
                    # Enlever les zéros du début pour trouver le pattern
                    pattern = str(val).lstrip('0')
                    if pattern not in patterns:
                        patterns[pattern] = []
                    patterns[pattern].append(str(val))
            
            # Afficher les patterns avec plusieurs variantes
            for pattern, values in patterns.items():
                if len(values) > 1:
                    print(f"  Pattern '{pattern}' a {len(values)} variantes: {values}")
            
            # Afficher quelques exemples
            print(f"  Exemples: {list(df[col].value_counts().head(5).index)}")


# Modifier la fonction principale pour ajouter le diagnostic
def corriger_doublons_cir(file_cir_path, file_millesime_path, output_path):
    """
    Corrige les doublons dans le fichier CIR en utilisant les informations du fichier millesime
    
    Parameters:
    - file_cir_path: chemin vers le fichier new_millesime_cir_2023.xlsx
    - file_millesime_path: chemin vers le fichier millesime_2023.xlsx
    - output_path: chemin pour sauvegarder le fichier corrigé
    """
    
    print("Chargement des fichiers Excel...")
    # Chargement des fichiers Excel
    df_cir = pd.read_excel(file_cir_path, engine='openpyxl')
    df_millesime = pd.read_excel(file_millesime_path, engine='openpyxl')
    
    print(f"Fichier CIR chargé: {len(df_cir)} lignes")
    print(f"Fichier millesime chargé: {len(df_millesime)} lignes")
    
    # Création d'une copie pour les modifications
    df_cir_corrected = df_cir.copy()
    
    # Conversion de toutes les colonnes siren en string
    siren_cols_cir = ['siren_declarant', 'siren_deposant', 'siren_tete_groupe', 'mere_final']
    for col in siren_cols_cir:
        if col in df_cir_corrected.columns:
            df_cir_corrected[col] = df_cir_corrected[col].astype(str)
    
    # Formatage initial des SIREN du fichier millesime
    print("Formatage initial des SIREN...")
    df_millesime['mere_siren'] = df_millesime['mere_siren'].astype(str).apply(format_siren)
    df_millesime['siren_societe'] = df_millesime['siren_societe'].astype(str).apply(format_siren)
    
    # Formatage initial des SIREN du fichier CIR
    for col in siren_cols_cir:
        if col in df_cir_corrected.columns:
            df_cir_corrected[col] = df_cir_corrected[col].apply(format_siren)
    
    # Identification des doublons sur siren_declarant
    print("\nRecherche des doublons sur siren_declarant...")
    duplicates = df_cir_corrected[df_cir_corrected.duplicated(subset=['siren_declarant'], keep=False)]
    unique_siren_doublons = duplicates['siren_declarant'].unique()
    
    print(f"Nombre de siren_declarant en doublon: {len(unique_siren_doublons)}")
    
    corrections_count = 0
    corrections_ignorees = 0
    
    # Pour chaque groupe de doublons
    for siren in unique_siren_doublons:
        # Récupération du groupe de doublons
        groupe_doublons = df_cir_corrected[df_cir_corrected['siren_declarant'] == siren]
        
        # Recherche dans millesime_2023 des sirens ayant ce siren comme mère
        filiales = df_millesime[df_millesime['mere_siren'] == siren]
        
        if len(filiales) == 0:
            continue
            
        # Pour chaque ligne du groupe de doublons
        for idx, row in groupe_doublons.iterrows():
            montant_cir = row['MT_TOT_CIR_CI_COLL_CII']
            
            # Si le montant est NaN, on le remplace par 0
            if pd.isna(montant_cir):
                montant_cir = 0
            
            # Recherche des filiales avec un montant_credit_impot proche (tolérance de 1)
            for _, filiale in filiales.iterrows():
                montant_filiale = filiale['montant_credit_impot']
                
                # Si le montant est NaN, on le remplace par 0
                if pd.isna(montant_filiale):
                    montant_filiale = 0
                
                # Vérification avec tolérance de 1
                if abs(montant_cir - montant_filiale) <= 1:
                    # Vérifier que siren_societe n'est pas NaN ou vide
                    if pd.notna(filiale['siren_societe']) and str(filiale['siren_societe']).lower() != 'nan':
                        # Remplacement des valeurs - Appliquer format_siren immédiatement
                        df_cir_corrected.loc[idx, 'siren_declarant'] = format_siren(filiale['siren_societe'])
                        df_cir_corrected.loc[idx, 'siren_deposant'] = format_siren(filiale['mere_siren'])
                        df_cir_corrected.loc[idx, 'mere_final'] = format_siren(filiale['mere_siren'])
                        corrections_count += 1
                        
                        print(f"Correction appliquée: {siren} -> siren_declarant: {format_siren(filiale['siren_societe'])}, siren_deposant: {format_siren(filiale['mere_siren'])}")
                        break
                    else:
                        print(f"Correction ignorée: {siren} -> NaN (valeur NaN détectée)")
                        corrections_ignorees += 1
    
    print(f"\nNombre total de corrections: {corrections_count}")
    print(f"Nombre de corrections ignorées: {corrections_ignorees}")
    
    # Vérification des doublons restants
    duplicates_after = df_cir_corrected[df_cir_corrected.duplicated(subset=['siren_declarant'], keep=False)]
    unique_siren_doublons_after = duplicates_after['siren_declarant'].unique()
    
    print(f"\nNombre de siren_declarant encore en doublon après correction: {len(unique_siren_doublons_after)}")
    
    # Formatage de tous les SIREN en 9 caractères
    print("\nFormatage des SIREN...")
    siren_columns = ['siren_declarant', 'siren_deposant', 'siren_tete_groupe', 'mere_final']
    
    for col in siren_columns:
        if col in df_cir_corrected.columns:
            # Appliquer la fonction format_siren
            df_cir_corrected[col] = df_cir_corrected[col].apply(format_siren)
            print(f"  Colonne {col} formatée")
    
    # Diagnostic des SIREN problématiques
    diagnostiquer_siren_problematiques(df_cir_corrected)
    
    # Vérification après formatage
    print("\nVérification après formatage des SIREN:")
    # Vérifier qu'il n'y a plus de '575012560'
    for col in ['mere_final', 'siren_deposant']:
        if col in df_cir_corrected.columns:
            problematic = df_cir_corrected[df_cir_corrected[col] == '575012560']
            if len(problematic) > 0:
                print(f"  ATTENTION: {len(problematic)} lignes avec '575012560' encore dans {col}")
            else:
                print(f"  ✓ Plus de '575012560' dans {col}")
    
    # Création/mise à jour de la colonne type_final
    print("\nCréation de la colonne type_final...")
    df_cir_corrected['type_final'] = df_cir_corrected.apply(
        lambda row: 'MERE' if row['mere_final'] == row['siren_declarant'] and row['mere_final'] != '' 
        else 'IND' if row['mere_final'] == '' 
        else 'FILLE', axis=1
    )
    
    # Affichage d'un résumé des types
    type_counts = df_cir_corrected['type_final'].value_counts()
    print("\nRépartition des types:")
    for type_val, count in type_counts.items():
        print(f"  {type_val}: {count}")
    
    # Vérification finale des SIREN
    print("\nVérification finale des SIREN...")
    for col in siren_columns:
        if col in df_cir_corrected.columns:
            non_empty = df_cir_corrected[df_cir_corrected[col] != '']
            if len(non_empty) > 0:
                invalid_siren = non_empty[non_empty[col].str.len() != 9]
                if len(invalid_siren) > 0:
                    print(f"  ATTENTION: {len(invalid_siren)} SIREN invalides dans {col}")
                    # Afficher quelques exemples
                    print(f"    Exemples: {list(invalid_siren[col].head(3).values)}")
                else:
                    print(f"  ✓ Tous les SIREN de {col} sont valides (9 caractères)")
    
    # Sauvegarde du fichier Excel corrigé
    print(f"\nSauvegarde du fichier corrigé: {output_path}")
    df_cir_corrected.to_excel(output_path, index=False, engine='openpyxl')
    
    print(f"\nTraitement terminé. Fichier sauvegardé: {output_path}")
    
    return df_cir_corrected


def analyser_doublons(df_cir, df_millesime):
    """
    Fonction pour analyser les doublons et les correspondances possibles
    """
    print("Analyse des doublons...")
    
    # Identification des doublons
    duplicates = df_cir[df_cir.duplicated(subset=['siren_declarant'], keep=False)]
    unique_siren_doublons = duplicates['siren_declarant'].unique()
    
    for siren in unique_siren_doublons[:10]:  # Analyse des 10 premiers pour exemple
        groupe_doublons = df_cir[df_cir['siren_declarant'] == str(siren)]
        filiales = df_millesime[df_millesime['mere_siren'] == str(siren)]
        
        print(f"\n--- SIREN {siren} ---")
        print(f"Nombre de doublons: {len(groupe_doublons)}")
        print(f"Nombre de filiales trouvées: {len(filiales)}")
        
        if len(filiales) > 0:
            print("\nMontants CIR du groupe:")
            for idx, row in groupe_doublons.iterrows():
                print(f"  MT_TOT_CIR_CI_COLL_CII: {row['MT_TOT_CIR_CI_COLL_CII']}")
            
            print("\nMontants des filiales:")
            for _, filiale in filiales.iterrows():
                siren_filiale = filiale['siren_societe']
                montant_filiale = filiale['montant_credit_impot']
                if pd.isna(siren_filiale) or str(siren_filiale).lower() == 'nan':
                    print(f"  Filiale NaN: {montant_filiale}")
                else:
                    print(f"  Filiale {siren_filiale}: {montant_filiale}")


def verifier_colonnes(file_cir_path, file_millesime_path):
    """
    Vérifie que les colonnes nécessaires existent dans les fichiers
    """
    print("Vérification des colonnes...")
    
    # Chargement des premières lignes pour vérifier les colonnes
    df_cir = pd.read_excel(file_cir_path, nrows=5, engine='openpyxl')
    df_millesime = pd.read_excel(file_millesime_path, nrows=5, engine='openpyxl')
    
    # Colonnes nécessaires CIR
    colonnes_cir_necessaires = ['siren_declarant', 'siren_deposant', 'mere_final', 'MT_TOT_CIR_CI_COLL_CII']
    colonnes_cir_manquantes = [col for col in colonnes_cir_necessaires if col not in df_cir.columns]
    
    # Colonnes nécessaires millesime
    colonnes_millesime_necessaires = ['mere_siren', 'siren_societe', 'montant_credit_impot']
    colonnes_millesime_manquantes = [col for col in colonnes_millesime_necessaires if col not in df_millesime.columns]
    
    if colonnes_cir_manquantes:
        print(f"ATTENTION: Colonnes manquantes dans le fichier CIR: {colonnes_cir_manquantes}")
    else:
        print("✓ Toutes les colonnes nécessaires sont présentes dans le fichier CIR")
        
    if colonnes_millesime_manquantes:
        print(f"ATTENTION: Colonnes manquantes dans le fichier millesime: {colonnes_millesime_manquantes}")
    else:
        print("✓ Toutes les colonnes nécessaires sont présentes dans le fichier millesime")
    
    print(f"\nColonnes fichier CIR: {list(df_cir.columns)[:10]}...")
    print(f"Colonnes fichier millesime: {list(df_millesime.columns)[:10]}...")
    
    # Vérifier quelques exemples de SIREN
    print("\nExemples de SIREN dans CIR:")
    for col in ['siren_declarant', 'siren_deposant', 'mere_final']:
        if col in df_cir.columns:
            exemples = df_cir[col].dropna().head(3)
            print(f"  {col}: {list(exemples)}")
    
    print("\nExemples de SIREN dans millesime:")
    for col in ['mere_siren', 'siren_societe']:
        if col in df_millesime.columns:
            exemples = df_millesime[col].dropna().head(3)
            print(f"  {col}: {list(exemples)}")


# Exemple d'utilisation
if __name__ == "__main__":
    # Remplacez ces chemins par vos fichiers réels
    file_cir = "C://Users//msamb//Documents//new_millesime_CIR_2021-suite_sans_doublon_nettoye.xlsx"
    file_millesime = "C://Users//msamb//Documents//2058CG_millesime_21.xlsx"
    output_file = "C://Users//msamb//Documents//new_millesime_CIR_2021-suite_corrected.xlsx"
    
    # Vérification des colonnes (recommandé)
    verifier_colonnes(file_cir, file_millesime)
    
    # Correction des doublons
    df_corrected = corriger_doublons_cir(file_cir, file_millesime, output_file)
    
    # Analyse détaillée des doublons (optionnel)
    # df_cir = pd.read_excel(file_cir, engine='openpyxl')
    # df_millesime = pd.read_excel(file_millesime, engine='openpyxl')
    # analyser_doublons(df_cir, df_millesime)

Vérification des colonnes...
✓ Toutes les colonnes nécessaires sont présentes dans le fichier CIR
✓ Toutes les colonnes nécessaires sont présentes dans le fichier millesime

Colonnes fichier CIR: ['ANNEE', 'DESIGN', 'COMPLT_DESIGN', 'FJ', 'siren_declarant', 'siren_deposant', 'siren_tete_groupe', 'ADR1', 'ADR2', 'ADR3']...
Colonnes fichier millesime: ['mere_siren', 'id_depot', 'filiale_index', 'dateDebPer', 'dateFinPer', 'dateEnregistrement', 'nombre_filiales', 'nombre_filiales_renseignees', 'siren_societe', 'denomination_societe']...

Exemples de SIREN dans CIR:
  siren_declarant: [6720049, 356000000, 340012392]
  siren_deposant: [6720049, 356000000, 356000000]
  mere_final: [404435521, 356000000, 356000000]

Exemples de SIREN dans millesime:
  mere_siren: [479135311, 851618884, 851618884]
  siren_societe: ['NS', '383474046', '519011878']
Chargement des fichiers Excel...
Fichier CIR chargé: 4496 lignes
Fichier millesime chargé: 380 lignes
Formatage initial des SIREN...

Recherche des d

Note il y'a 12 siren declarant vide (car dans la 2058cg il était vide) que j'ai du corriger à la main à l'aide de la designation

In [1]:
import pandas as pd

# Lire le fichier Excel
df = pd.read_excel('C://Users//msamb//Documents//new_millesime_CIR_2021-suite_corrected.xlsx')

# Définir les colonnes clés pour identifier les doublons
#colonnes_clés = ['ANNEE', 'siren_declarant', 'CAHT', 'MT_TOT_CIR_CI_COLL_CII_DOM','mere_final']
colonnes_clés = ['siren_declarant']

# Supprimer les doublons en gardant la première occurrence
df_sans_doublons = df.drop_duplicates(subset=colonnes_clés, keep='first')

# Ou si vous voulez garder la dernière occurrence :
# df_sans_doublons = df.drop_duplicates(subset=colonnes_clés, keep='last')

# Sauvegarder le résultat dans un nouveau fichier
df_sans_doublons.to_excel('C://Users//msamb//Documents//new_millesime_CIR_2021-suite_corrected_sans_doublon.xlsx', index=False)

# Afficher quelques statistiques
print(f"Nombre de lignes original : {len(df)}")
print(f"Nombre de lignes après suppression des doublons : {len(df_sans_doublons)}")
print(f"Nombre de doublons supprimés : {len(df) - len(df_sans_doublons)}")

Nombre de lignes original : 4496
Nombre de lignes après suppression des doublons : 4473
Nombre de doublons supprimés : 23
