# Anonymization Pipeline
 - Suppression (masking)
 - Perturbation (généralisation / bruit)
 - Tokenisation (pseudonymisation stable)

    Classe qui encapsule plusieurs stratégies d'anonymisation sémantique
    pour des entités PII détectées dans du texte.
    
    Techniques couvertes :
    - Suppression : remplacement par un token générique
    - Perturbation : âge en tranche, montant arrondi
    - Tokenisation : pseudonymes stables par entité (PERSONNE_1, ORG_2, ...)

In [None]:
import snowflake.snowpark as snp
from snowflake.snowpark.context import get_active_session
import pandas as pd
from collections import defaultdict
import re

# Récupère la session Snowpark active dans le notebook Snowflake
session = get_active_session()

class SemanticAnonymizer:
    
    def __init__(self):
        # mapping : (entity_type, original_value) -> pseudonym
        # permet de garantir que la même valeur reçoit toujours le même pseudonyme
        self.mapping = {}
        
        # counters : compteur par type d'entité (PER, ORG, LOC, ...)
        # sert à générer PERSONNE_1, PERSONNE_2, ORG_1, etc.
        self.counters = defaultdict(int)
    
    # 1. Suppression (masking)
    def suppress(self, value, entity_type):
        """
        Suppression totale : remplace la valeur par un token générique
        ex : "Jean Dupont" (PER) -> [PER_SUPPRIMÉ]
        """
        return f"[{entity_type.upper()}_SUPPRIMÉ]"
        
    # 2. Perturbation d'âge (généralisation / bucketing)
    def perturb_age(self, age_str):
        """
        Perturbation d'âge : remplace un âge précis par une tranche d'âges.
        Technique = généralisation / bucketing pour réduire la précision
        """
        match = re.search(r'(\d+)', age_str)
        if match:
            age = int(match.group(1))
            start = (age // 10) * 10
            end = start + 10
            return f"[{start}-{end} ans]"
        # Si aucun nombre trouvé, on renvoie la chaîne originale
        return age_str
    
    # 3. Perturbation de montant (arrondi / bruit contrôlé)
    def perturb_amount(self, amount_str):
        """
        Perturbation de montant : arrondit le montant à ~1000€ près.
        Technique = perturbation / généralisation pour garder l'ordre de grandeur.
        """
        match = re.search(r'(\d+)', amount_str)
        if match:
            amount = int(match.group(1))
            rounded = round(amount / 1000) * 1000
            # Utilisation de séparateur de milliers pour lisibilité
            return f"~{rounded:,}€"
        return amount_str
    
    # 4. Tokenisation / pseudonymisation stable
    def tokenize(self, value, entity_type):
        """
     Tokenisation / pseudonymisation : remplace la valeur par un pseudonyme lisible
        - garantit que la même valeur reçoit toujours le même pseudonyme,
          indépendamment du nombre d'occurrences.
        """
        
        # Clé = couple (type, valeur) pour distinguer mêmes chaînes de types différents
        key = (entity_type, value)
        
        # Si on n'a jamais vu cette valeur pour ce type, on crée un nouveau pseudo
        if key not in self.mapping:
            # Incrémente le compteur pour ce type d'entité
            self.counters[entity_type] += 1
            count = self.counters[entity_type]
            
            # Construit un pseudonyme lisible selon le type
            if entity_type == 'PER':
                pseudo = f'PERSONNE_{count}'
            elif entity_type == 'ORG':
                pseudo = f'ORG_{count}'
            elif entity_type == 'LOC':
                pseudo = f'LOC_{count}'
            else:
                #  générique pour les autres types (SIRET, EMAIL, IBAN, etc.)
                pseudo = f'{entity_type}_{count}'
            
            # Enregistre le mapping pour pouvoir réutiliser le même pseudo
            self.mapping[key] = pseudo
        
        # Retourne toujours le même pseudonyme pour ce (type, valeur)
        return self.mapping[key]




    
    # 5. Export du mapping sous forme de table
    def get_mapping_table(self):
        return [
            {'pseudonym': pseudo, 'original_value': orig, 'entity_type': etype}
            for (etype, orig), pseudo in self.mapping.items()
        ]


anon_global = SemanticAnonymizer()
print("Classe SemanticAnonymizer créée et prête à l'emploi")


Retourne le mapping sous forme de liste de dictionnaires,
exploitable directement comme DataFrame / table Snowflake :
        
        [
          {'pseudonym': 'PERSONNE_1', 'original_value': 'Jean Brunel', 'entity_type': 'PER'},
          {'pseudonym': 'ORG_1',      'original_value': 'ACME SAS',   'entity_type': 'ORG'},
          ...
        ]
        
Cela permet de stocker ce mapping dans PROCESSED_DATA.PII_MAPPING pour audit et éventuelle désanonymisation contrôlée.


    Applique une stratégie d'anonymisation pour chaque entité PII détectée
    dans le texte, en s'appuyant sur l'instance SemanticAnonymizer.
    
    Paramètres
    text : str
        Texte original contenant des PII.
    pii_list : list[dict]
        Liste d'entités détectées, chaque élément doit au minimum contenir :
        - 'type' : type d'entité (PER, ORG, LOC, AGE, MONTANT, SIRET, etc.)
        - 'text' : valeur exacte trouvée dans le texte.
    anon : SemanticAnonymizer
        Instance de la classe gérant les stratégies d'anonymisation.

    Retourne
    anon_text : str
        Texte anonymisé.
    replacements : list[dict]
        Liste des remplacements effectués (original, type, replacement).
    mapping_table : list[dict]
        Mapping global pseudonymes ↔ valeurs originales.


In [None]:
#  appliquer les stratégies d'anonymisation
# sur une liste d'entités PII détectées dans un texte.

def anonymize_with_strategy(text, pii_list, anon: SemanticAnonymizer):
    # Texte à anonymiser progressivement
    anon_text = text
    # Journal des remplacements
    replacements = []

    # 1. Trier les entités par position décroissante dans le texte
    #    pour éviter que des remplacements courts cassent des entités plus longues.
    sorted_pii = sorted(
        pii_list,
        key=lambda x: text.find(x["text"]),
        reverse=True
    )

    # 2. Première passe : IBAN en priorité 
    for pii in sorted_pii:
        entity_type = pii["type"]
        original = pii["text"]

        if entity_type != "IBAN":
            continue

        # Stratégie dédiée : suppression ou masquage total de l'IBAN
        replacement = anon.suppress(original, entity_type)

        if original in anon_text:
            anon_text = anon_text.replace(original, replacement)

        replacements.append({
            "original": original,
            "type": entity_type,
            "replacement": replacement
        })

    # 3. Deuxième passe : toutes les autres entités (dont TEL)
    for pii in sorted_pii:
        entity_type = pii["type"]
        original = pii["text"]

        # IBAN déjà traité à l'étape 2
        if entity_type == "IBAN":
            continue

        # Stratégie par type d'entité
        if entity_type in ["PER", "ORG", "LOC"]:
            # Pseudonymisation des acteurs (personnes, organisations, lieux)
            replacement = anon.tokenize(original, entity_type)

        elif entity_type == "AGE":
            # Généralisation de l'âge en tranche (40-50 ans, etc.)
            replacement = anon.perturb_age(original)

        elif entity_type == "MONTANT":
            # Arrondi / perturbation des montants
            replacement = anon.perturb_amount(original)

        elif entity_type in [
            "SIRET", "SIREN", "ADRESSE", "CODE_POSTAL",
            "DATE", "DATE_NAISSANCE", "EMAIL", "TEL", "NIR"
        ]:
            # Identifiants / coordonnées très sensibles : suppression ou masquage total
            replacement = anon.suppress(original, entity_type)

        else:
            # Par défaut : pseudonymisation générique
            replacement = anon.tokenize(original, entity_type)

        # Remplacer dans le texte anonymisé
        if original in anon_text:
            anon_text = anon_text.replace(original, replacement)

        # Journaliser ce remplacement (utile pour audit / debug)
        replacements.append({
            "original": original,
            "type": entity_type,
            "replacement": replacement
        })

    # 4. Retourner le texte anonymisé, la liste des remplacements, et la table de mapping globale
    return anon_text, replacements, anon.get_mapping_table()


print("Fonction d'anonymisation créée")


In [None]:
# Boucle principale : appliquer l'anonymisation à tous les docs
# et charger le résultat dans PROCESSED_DATA.FISCAL_DOCUMENTS_ANON

from datetime import datetime
import pandas as pd

print("Lancement de l'anonymisation...")

# 1. Charger les documents bruts + PII détectées (CamemBERT+regex)
docs_with_pii = session.sql("""
SELECT 
    r.doc_id,
    r.document_text,
    p.pii_detected
FROM raw_data.fiscal_documents_raw r
LEFT JOIN processed_data.pii_detection_temp p 
    ON r.doc_id = p.doc_id
ORDER BY r.doc_id
""").collect()

all_results = []

# 2. Boucle sur chaque document : construire la liste PII,
#    appliquer la stratégie d'anonymisation.

for i, doc in enumerate(docs_with_pii):
    doc_id = doc['DOC_ID']
    original_text = doc['DOCUMENT_TEXT']
    pii_variant = doc['PII_DETECTED']  # colonne VARIANT -> objet Python (liste/dict) dans Snowpark
    
    # 2.1 Normaliser la liste des PII
    #     On veut une liste de dictionnaire avec au minimum la clé 'text'.
    if pii_variant is not None:
        # Garantir qu'on a bien une liste
        raw_list = list(pii_variant) if not isinstance(pii_variant, list) else pii_variant
        # Ne garder que les entrées exploitables par anonymize_with_strategy
        pii_list = [
            x for x in raw_list
            if isinstance(x, dict) and 'text' in x
        ]
    else:
        pii_list = []

    # 2.2 Appliquer la fonction d'anonymisation
    anon_text, replacements, mappings = anonymize_with_strategy(
        original_text,
        pii_list,
        anon_global
    )

    # 2.3 Construire l'enregistrement de sortie pour ce document

    all_results.append({
        'DOC_ID': doc_id,
        'ORIGINAL_TEXT': original_text,
        'ANONYMIZED_TEXT': anon_text,
        'PII_DETECTED': pii_list,          
        'PII_COUNT': len(pii_list),
        'STRATEGY_USED': replacements,     
        'PROCESSED_AT': datetime.utcnow()  
    })

    # 2.4 Progression
    if (i + 1) % 10 == 0:
        print(f"  ✓ {i+1}/{len(docs_with_pii)} documents anonymisés")

print(f"\n Anonymisation complétée")

# 3. Charger les résultats dans PROCESSED_DATA.FISCAL_DOCUMENTS_ANON

df_anon = pd.DataFrame(all_results)
sdf_anon = session.create_dataframe(df_anon)

# mode("append") : on ajoute les résultats à la table existante
sdf_anon.write.mode("append").save_as_table("processed_data.fiscal_documents_anon")

print(f" {len(all_results)} documents chargés dans processed_data.fiscal_documents_anon")


In [None]:
doc_id_test = 1000  # adapte si besoin

row = (
    session.sql(f"""
        SELECT 
            r.DOC_ID,
            r.DOCUMENT_TEXT,
            p.PII_DETECTED
        FROM RAW_DATA.FISCAL_DOCUMENTS_RAW r
        LEFT JOIN PROCESSED_DATA.PII_DETECTION_TEMP p
            ON r.DOC_ID = p.DOC_ID
        WHERE r.DOC_ID = {doc_id_test}
    """)
    .collect()[0]
)

original_text = row["DOCUMENT_TEXT"]
pii_variant = row["PII_DETECTED"]

print("DOC_ID:", row["DOC_ID"])
print("\n--- TEXTE ORIGINAL (début) ---\n")
print(original_text[:500])
print("\n--- PII_DETECTED ---\n")
print(pii_variant)


In [None]:
SELECT DOC_ID, PII_COUNT, PII_DETECTED
FROM PROCESSED_DATA.PII_DETECTION_TEMP
ORDER BY DOC_ID
LIMIT 10;

In [None]:
import pandas as pd

df_join = (
    session.sql("""
        SELECT 
            r.DOC_ID,
            r.DOCUMENT_TEXT,
            a.ANONYMIZED_TEXT,
            a.PII_COUNT
        FROM RAW_DATA.FISCAL_DOCUMENTS_RAW r
        JOIN PROCESSED_DATA.FISCAL_DOCUMENTS_ANON a
            ON r.DOC_ID = a.DOC_ID
        ORDER BY r.DOC_ID
        LIMIT 3
    """)
    .to_pandas()
)

for idx in range(len(df_join)):
    doc_id = df_join.loc[idx, "DOC_ID"]
    original_text = df_join.loc[idx, "DOCUMENT_TEXT"]
    anon_text = df_join.loc[idx, "ANONYMIZED_TEXT"]
    pii_count = df_join.loc[idx, "PII_COUNT"]

    print("\n" + "=" * 80)
    print(f"DOC_ID : {doc_id}  |  PII détectées : {pii_count}")
    print("=" * 80 + "\n")
    
    print(" AVANT (texte brut) :\n")
    print(original_text)
    
    print("\n" + "-" * 80 + "\n")
    
    print(" APRÈS (texte anonymisé) :\n")
    print(anon_text)
    
    print("\n" + "=" * 80 + "\n")


In [None]:
from datetime import datetime
import pandas as pd
import json

print("Lancement de l'anonymisation...")

docs_with_pii = session.sql("""
SELECT 
    r.doc_id,
    r.document_text,
    p.pii_detected
FROM raw_data.fiscal_documents_raw r
LEFT JOIN processed_data.pii_detection_temp p 
    ON r.doc_id = p.doc_id
ORDER BY r.doc_id
""").collect()

all_results = []

for i, doc in enumerate(docs_with_pii):
    doc_id = doc['DOC_ID']
    original_text = doc['DOCUMENT_TEXT']
    pii_variant = doc['PII_DETECTED']

    # Normalisation corrigée
    if pii_variant:
        if isinstance(pii_variant, str):
            try:
                raw_list = json.loads(pii_variant)
            except Exception:
                raw_list = []
        else:
            raw_list = list(pii_variant) if not isinstance(pii_variant, list) else pii_variant

        pii_list = [
            x for x in raw_list
            if isinstance(x, dict) and 'text' in x
        ]
    else:
        pii_list = []

    anon_text, replacements, mappings = anonymize_with_strategy(
        original_text,
        pii_list,
        anon_global
    )

    all_results.append({
        'DOC_ID': doc_id,
        'ORIGINAL_TEXT': original_text,
        'ANONYMIZED_TEXT': anon_text,
        'PII_DETECTED': pii_list,
        'PII_COUNT': len(pii_list),
        'STRATEGY_USED': replacements,
        'PROCESSED_AT': datetime.utcnow()
    })

    if (i + 1) % 10 == 0:
        print(f"  ✓ {i+1}/{len(docs_with_pii)} documents anonymisés")

print(f"\n✅ Anonymisation complétée")

df_anon = pd.DataFrame(all_results)
sdf_anon = session.create_dataframe(df_anon)
sdf_anon.write.mode("overwrite").save_as_table("PROCESSED_DATA.FISCAL_DOCUMENTS_ANON")

print(f"✅ {len(all_results)} documents chargés dans PROCESSED_DATA.FISCAL_DOCUMENTS_ANON")


In [None]:
doc_id_test = 1000

row = (
    session.sql(f"""
        SELECT 
            r.DOC_ID,
            r.DOCUMENT_TEXT,
            p.PII_DETECTED
        FROM RAW_DATA.FISCAL_DOCUMENTS_RAW r
        LEFT JOIN PROCESSED_DATA.PII_DETECTION_TEMP p
            ON r.DOC_ID = p.DOC_ID
        WHERE r.DOC_ID = {doc_id_test}
    """)
    .collect()[0]
)

original_text = row["DOCUMENT_TEXT"]
pii_variant = row["PII_DETECTED"]

print("DOC_ID:", row["DOC_ID"])
print("\n--- TEXTE ORIGINAL (début) ---\n")
print(original_text[:500])
print("\n--- PII_DETECTED (brut) ---\n")
print(pii_variant)

# Normalisation corrigée
if pii_variant:
    if isinstance(pii_variant, str):
        raw_list = json.loads(pii_variant)
    else:
        raw_list = list(pii_variant) if not isinstance(pii_variant, list) else pii_variant
    pii_list = [x for x in raw_list if isinstance(x, dict) and 'text' in x]
else:
    pii_list = []

print("\n--- PII_LIST ---")
for e in pii_list:
    print(e["type"], "=>", repr(e["text"]), "| trouvé dans texte ?", e["text"] in original_text)

# Appliquer anonymisation
anon_text, replacements, mappings = anonymize_with_strategy(
    original_text,
    pii_list,
    anon_global
)

print("\n--- TEXTE ANONYMISÉ (début) ---\n")
print(anon_text[:500])

print("\n--- REMPLACEMENTS ---")
for r in replacements:
    print(r)


TEST 

In [None]:
import snowflake.snowpark as snp
from snowflake.snowpark.context import get_active_session
import pandas as pd
from collections import defaultdict
import re

# Récupère la session Snowpark active dans le notebook Snowflake
session = get_active_session()

class SemanticAnonymizer:
    
    def __init__(self):
        # mapping : (entity_type, original_value) -> pseudonym
        # permet de garantir que la même valeur reçoit toujours le même pseudonyme
        self.mapping = {}
        self.counters = defaultdict(int)
        # counters : compteur par type d'entité (PER, ORG, LOC, ...)

        # 1. Suppression (masking)
    def suppress(self, value, entity_type):
        """Suppression totale : remplace par un token générique"""
        return f"[{entity_type.upper()}_SUPPRIMÉ]"

        # 2. Perturbation d'âge (généralisation / bucketing)
    def perturb_age(self, age_str):
        """Perturbation d'âge : remplace un âge précis par une tranche d'âges."""
        match = re.search(r'(\d+)', age_str)
        if match:
            age = int(match.group(1))
            start = (age // 10) * 10
            end = start + 10
            return f"[{start}-{end} ans]"
        return age_str
        # 3. Perturbation de montant (arrondi / bruit contrôlé)
    def perturb_amount(self, amount_str):
        """Arrondi à ~1000€ prèS """
        # Extraire TOUS les chiffres (enlever espaces, virgules, €)
        amount_clean = re.sub(r'[^\d]', '', amount_str)
        if amount_clean:
            amount = int(amount_clean)
            rounded = round(amount / 1000) * 1000
            return f"~{rounded:,}€".replace(',', ' ')
        return amount_str
        #4 Tokenisation / pseudonymisation : remplace la valeur par un pseudonyme lisible

    def tokenize(self, value, entity_type):
        """Pseudonymisation stable, garantit que la même valeur reçoit toujours le même pseudonyme,
"""
        key = (entity_type, value)
         # Clé = couple (type, valeur) pour distinguer mêmes chaînes de types différents
         # Si on n'a jamais vu cette valeur pour ce type, on crée un nouveau pseudo
        if key not in self.mapping:
            self.counters[entity_type] += 1
            count = self.counters[entity_type]
            
            # Construit un pseudonyme lisible selon le type
            if entity_type == 'PER':
                pseudo = f'PERSONNE_{count}'
            elif entity_type == 'ORG':
                pseudo = f'ORG_{count}'
            elif entity_type == 'LOC':
                pseudo = f'LOC_{count}'
            else:
                pseudo = f'{entity_type}_{count}'
         #  générique pour les autres types (SIRET, EMAIL, IBAN, etc.)
            self.mapping[key] = pseudo
        
        return self.mapping[key]
         # Retourne toujours le même pseudonyme pour ce (type, valeur)

    def get_mapping_table(self):
        """Export du mapping pour audit"""
        return [
            {'pseudonym': pseudo, 'original_value': orig, 'entity_type': etype}
            for (etype, orig), pseudo in self.mapping.items()
        ]

print(" Classe SemanticAnonymizer créée")


In [None]:
import json

def anonymize_with_strategy(text, pii_list, anon: SemanticAnonymizer):
    """
    Applique les stratégies d'anonymisation sur les PII détectées.
    
    - Filtre les entités chevauchantes (garde la plus longue)
    - Tri par longueur décroissante puis position
    - Remplacements de droite à gauche
    """
    anon_text = text
    replacements = []
    
    # 1. FILTRER les entités chevauchantes (garder la plus longue)
    filtered_pii = []
    sorted_by_length = sorted(pii_list, key=lambda x: len(x["text"]), reverse=True)
    
    for pii in sorted_by_length:
        # Vérifier si cette entité est contenue dans une entité déjà retenue
        is_overlapping = False
        for kept_pii in filtered_pii:
            if pii["text"] in kept_pii["text"]:
                is_overlapping = True
                break
        
        if not is_overlapping:
            filtered_pii.append(pii)
    
    # 2. TRIER par position décroissante (remplacer de droite à gauche)
    sorted_pii = sorted(
        filtered_pii,
        key=lambda x: text.find(x["text"]),
        reverse=True
    )
    
    # 3. TRAITER IBAN en priorité (avant les numéros de téléphone)
    for pii in sorted_pii:
        entity_type = pii["type"]
        original = pii["text"]
        
        if entity_type != "IBAN":
            continue
        
        replacement = anon.suppress(original, entity_type)
        
        if original in anon_text:
            anon_text = anon_text.replace(original, replacement, 1)
        
        replacements.append({
            "original": original,
            "type": entity_type,
            "replacement": replacement
        })
    
    # 4. TRAITER les autres entités
    for pii in sorted_pii:
        entity_type = pii["type"]
        original = pii["text"]
        
        if entity_type == "IBAN":
            continue
        
        # Stratégie par type
        if entity_type in ["PER", "ORG", "LOC"]:
            replacement = anon.tokenize(original, entity_type)
        elif entity_type == "AGE":
            replacement = anon.perturb_age(original)
        elif entity_type == "MONTANT":
            replacement = anon.perturb_amount(original)
        elif entity_type in ["SIRET", "SIREN", "ADRESSE", "CODE_POSTAL",
                             "DATE", "DATE_NAISSANCE", "EMAIL", "TEL", "NIR"]:
            replacement = anon.suppress(original, entity_type)
        else:
            replacement = anon.tokenize(original, entity_type)
        
        if original in anon_text:
            anon_text = anon_text.replace(original, replacement, 1)
        
        replacements.append({
            "original": original,
            "type": entity_type,
            "replacement": replacement
        })
    
    return anon_text, replacements, anon.get_mapping_table()

print("Fonction d'anonymisation créée")


In [None]:
from datetime import datetime
import pandas as pd
import json

# RÉINITIALISER l'anonymizer pour repartir des compteurs à 0
anon_global = SemanticAnonymizer()

print("Script de l'anonymisation")

# 1. Charger les documents + PII
docs_with_pii = session.sql("""
SELECT 
    r.doc_id,
    r.document_text,
    p.pii_detected
FROM raw_data.fiscal_documents_raw r
LEFT JOIN processed_data.pii_detection_temp p 
    ON r.doc_id = p.doc_id
ORDER BY r.doc_id
""").collect()

all_results = []

# 2. Boucle sur chaque document
for i, doc in enumerate(docs_with_pii):
    doc_id = doc['DOC_ID']
    original_text = doc['DOCUMENT_TEXT']
    pii_variant = doc['PII_DETECTED']
    
    # Normalisation robuste (gère str JSON ou liste Python)
    if pii_variant:
        if isinstance(pii_variant, str):
            try:
                raw_list = json.loads(pii_variant)
            except Exception:
                raw_list = []
        else:
            raw_list = list(pii_variant) if not isinstance(pii_variant, list) else pii_variant
        
        pii_list = [
            x for x in raw_list
            if isinstance(x, dict) and 'text' in x
        ]
    else:
        pii_list = []
    
    # Appliquer l'anonymisation
    anon_text, replacements, mappings = anonymize_with_strategy(
        original_text,
        pii_list,
        anon_global
    )
    
    all_results.append({
        'DOC_ID': doc_id,
        'ORIGINAL_TEXT': original_text,
        'ANONYMIZED_TEXT': anon_text,
        'PII_DETECTED': pii_list,
        'PII_COUNT': len(pii_list),
        'STRATEGY_USED': replacements,
        'PROCESSED_AT': datetime.utcnow()
    })
    
    if (i + 1) % 10 == 0:
        print(f"  ✓ {i+1}/{len(docs_with_pii)} documents anonymisés")

print(f"\n Anonymisation complétée")

# 3. Charger avec OVERWRITE pour éviter les doublons
df_anon = pd.DataFrame(all_results)
sdf_anon = session.create_dataframe(df_anon)
sdf_anon.write.mode("overwrite").save_as_table("PROCESSED_DATA.FISCAL_DOCUMENTS_ANON")

print(f" {len(all_results)} documents chargés (table écrasée, pas de doublons)")


In [None]:
doc_id_test = 1000

row = (
    session.sql(f"""
        SELECT 
            r.DOC_ID,
            r.DOCUMENT_TEXT,
            p.PII_DETECTED
        FROM RAW_DATA.FISCAL_DOCUMENTS_RAW r
        LEFT JOIN PROCESSED_DATA.PII_DETECTION_TEMP p
            ON r.DOC_ID = p.DOC_ID
        WHERE r.DOC_ID = {doc_id_test}
    """)
    .collect()[0]
)

original_text = row["DOCUMENT_TEXT"]
pii_variant = row["PII_DETECTED"]

print("DOC_ID:", row["DOC_ID"])
print("\n--- TEXTE ORIGINAL (début) ---\n")
print(original_text[:500])
print("\n--- PII_DETECTED ---\n")
print(pii_variant)


In [None]:
import pandas as pd

df_join = (
    session.sql("""
        SELECT 
            r.DOC_ID,
            r.DOCUMENT_TEXT,
            a.ANONYMIZED_TEXT,
            a.PII_COUNT
        FROM RAW_DATA.FISCAL_DOCUMENTS_RAW r
        JOIN PROCESSED_DATA.FISCAL_DOCUMENTS_ANON a
            ON r.DOC_ID = a.DOC_ID
        ORDER BY r.DOC_ID
        LIMIT 3
    """)
    .to_pandas()
)

for idx in range(len(df_join)):
    doc_id = df_join.loc[idx, "DOC_ID"]
    original_text = df_join.loc[idx, "DOCUMENT_TEXT"]
    anon_text = df_join.loc[idx, "ANONYMIZED_TEXT"]
    pii_count = df_join.loc[idx, "PII_COUNT"]
    
    print("\n" + "=" * 80)
    print(f"DOC_ID : {doc_id}  |  PII détectées : {pii_count}")
    print("=" * 80 + "\n")
    
    print(" AVANT (texte brut) :\n")
    print(original_text[:400])
    
    print("\n" + "-" * 80 + "\n")
    
    print(" APRÈS (texte anonymisé) :\n")
    print(anon_text[:400])
    
    print("\n")


In [None]:
import json

doc_id_test = 1000

row = (
    session.sql(f"""
        SELECT 
            r.DOC_ID,
            r.DOCUMENT_TEXT,
            p.PII_DETECTED
        FROM RAW_DATA.FISCAL_DOCUMENTS_RAW r
        LEFT JOIN PROCESSED_DATA.PII_DETECTION_TEMP p
            ON r.DOC_ID = p.DOC_ID
        WHERE r.DOC_ID = {doc_id_test}
    """)
    .collect()[0]
)

original_text = row["DOCUMENT_TEXT"]
pii_variant = row["PII_DETECTED"]

print("DOC_ID:", row["DOC_ID"])
print("\n--- TEXTE ORIGINAL (début) ---\n")
print(original_text[:500])

# Normalisation
if pii_variant:
    if isinstance(pii_variant, str):
        raw_list = json.loads(pii_variant)
    else:
        raw_list = list(pii_variant) if not isinstance(pii_variant, list) else pii_variant
    pii_list = [x for x in raw_list if isinstance(x, dict) and 'text' in x]
else:
    pii_list = []

print(f"\n--- {len(pii_list)} PII détectées ---")
for e in pii_list:
    print(f"  {e['type']:15} => {repr(e['text']):30} | dans texte: {e['text'] in original_text}")

# Créer un anonymizer temporaire pour ce test
anon_test = SemanticAnonymizer()
anon_text, replacements, mappings = anonymize_with_strategy(
    original_text,
    pii_list,
    anon_test
)

print("\n--- TEXTE ANONYMISÉ (début) ---\n")
print(anon_text[:500])

print(f"\n--- {len(replacements)} REMPLACEMENTS ---")
for r in replacements:
    print(f"  {r['type']:15} | {r['original']:25} → {r['replacement']}")
