<a href="https://colab.research.google.com/github/IgnasiOliveras/anonimitzar/blob/main/DEF_v1_BERT_BBDD_FAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install langdetect deep_translator faker tqdm transformers torch

import sqlite3
import pandas as pd
import re
import time
from langdetect import detect, DetectorFactory, LangDetectException
from deep_translator import GoogleTranslator
from faker import Faker
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification

# Configuración inicial
DetectorFactory.seed = 0
fake_es = Faker("es_ES")

# Cargar modelo BERT para NER en español
ner_model = pipeline("ner", model="mrm8488/bert-spanish-cased-finetuned-ner", aggregation_strategy="simple")


def generar_nombre_con_palabras(original_name):
    """Genera un nombre falso con el mismo número de palabras que el original"""
    num_palabras = len(original_name.split())

    # Generar nombres hasta que coincida el número de palabras
    while True:
        if num_palabras == 1:
            nombre = fake_es.first_name()
        elif num_palabras == 2:
            nombre = f"{fake_es.first_name()} {fake_es.last_name()}"
        else:
            nombre = f"{fake_es.first_name()} {fake_es.last_name()} {fake_es.last_name()}"

        if len(nombre.split()) == num_palabras:
            return nombre

def detectar_entidades(texto):
    """Detecta entidades PER usando BERT"""
    resultados = ner_model(texto)
    entidades = []

    for ent in resultados:
        if ent['entity_group'] == 'PER':
            start = ent['start']
            end = ent['end']
            original = texto[start:end]

            # Ajuste fino para capturar correctamente los espacios
            while start > 0 and texto[start-1] != ' ':
                start -= 1
            while end < len(texto) and texto[end] != ' ':
                end += 1

            entidades.append((start, end, original.strip()))

    return entidades

def traducir_y_anonimizar(texto):
    """Traduce y anonimiza manteniendo el número de palabras"""
    if not texto.strip():
        return texto

    # Traducción
    try:
        if len(texto) > 3 and detect(texto) != "es":
            texto = GoogleTranslator(source="auto", target="es").translate(texto)
    except LangDetectException:
        pass

    # Anonimizar nombres en "Me llamo..."
    texto = re.sub(
        r"(Me llamo\s+)([A-ZÁÉÍÓÚÑa-záéíóúñ]+(?:\s+[A-ZÁÉÍÓÚÑa-záéíóúñ]+)*)",
        lambda match: match.group(1) + generar_nombre_con_palabras(match.group(2)),
        texto,
        flags=re.IGNORECASE
    )

    # Anonimizar números
    texto = re.sub(r"\b\d{9}\b", lambda _: fake_es.phone_number(), texto)
    texto = re.sub(r"\b\d{8}[A-Za-z]\b", lambda _: fake_es.ssn(), texto)

    # Detección de entidades con BERT
    entidades = detectar_entidades(texto)
    replacements = []

    for start, end, original in entidades:
        fake_name = generar_nombre_con_palabras(original)
        replacements.append((start, end, fake_name))

    # Aplicar reemplazos en orden inverso
    for start, end, fake_name in sorted(replacements, key=lambda x: -x[0]):
        texto = texto[:start] + fake_name + texto[end:]

    return texto




def procesar_fila(row):
    """Procesa una fila aplicando traducción y anonimización."""
    row["body"] = traducir_y_anonimizar(row["body"])
    return row

# Conectar a la base de datos
with sqlite3.connect("mi_base_de_datos.db") as conn:
    cursor = conn.cursor()

    # Crear la tabla si no existe
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS mi_tabla (
            id INTEGER PRIMARY KEY,
            body TEXT,
            secret TEXT,
            direction TEXT,
            createdAt TEXT,
            OpenchannelAccountId INTEGER,
            OpenchannelInteractionId INTEGER,
            UserId INTEGER,
            ContactId INTEGER,
            AttachmentId INTEGER,
            sentBy TEXT
        );
    """)
    conn.commit()

    # Cargar datos desde Excel (solo si es necesario)
    df = pd.read_excel("MOSTRA_1.xlsx")
    df.to_sql("mi_tabla", conn, if_exists="replace", index=False)

    # Extraer solo columnas relevantes
    df_body = df[["id", "body", "direction","createdAt","UserId", "ContactId"]].copy()

    # Usar tqdm para mostrar progreso
    start_time = time.time()
    with Pool(cpu_count()) as pool:
        result = list(tqdm(pool.imap(procesar_fila, df_body.to_dict(orient="records")), total=len(df_body)))

    # Convertir la lista de diccionarios de vuelta a DataFrame
    df_body = pd.DataFrame(result)

    # Actualizar base de datos en un solo paso eficiente
    df_body.to_sql("mi_tabla", conn, if_exists="replace", index=False)

    elapsed_time = time.time() - start_time
    print(f"Procesamiento completado en {elapsed_time:.2f} segundos.")






Some weights of the model checkpoint at mrm8488/bert-spanish-cased-finetuned-ner were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu
  0%|          | 0/6943 [00:00<?, ?it/s]Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length.

Procesamiento completado en 878.13 segundos.


In [None]:
!pip install langdetect deep_translator faker tqdm transformers torch

import sqlite3
import pandas as pd
import re
import time
from langdetect import detect, DetectorFactory, LangDetectException
from deep_translator import GoogleTranslator
from faker import Faker
from multiprocessing import Pool, cpu_count
from tqdm import tqdm
from transformers import pipeline

# Configuración inicial
DetectorFactory.seed = 0
fake_es = Faker("es_ES")

# Cargar modelo BERT para NER en español
ner_model = pipeline("ner", model="mrm8488/bert-spanish-cased-finetuned-ner", aggregation_strategy="simple")


def detectar_genero(nombre):
    """Detecta si un nombre es masculino o femenino según Faker"""
    if nombre in fake_es.first_name_male():
        return "male"
    elif nombre in fake_es.first_name_female():
        return "female"
    return "neutral"


def generar_nombre_con_palabras(original_name):
    """Genera un nombre falso con el mismo número de palabras y el mismo género que el original"""
    palabras = original_name.split()
    num_palabras = len(palabras)

    # Determinar el género del primer nombre
    genero = detectar_genero(palabras[0])

    while True:
        if num_palabras == 1:
            nombre = fake_es.first_name_male() if genero == "male" else fake_es.first_name_female()
        elif num_palabras == 2:
            nombre = f"{fake_es.first_name_male() if genero == 'male' else fake_es.first_name_female()} {fake_es.last_name()}"
        else:
            nombre = f"{fake_es.first_name_male() if genero == 'male' else fake_es.first_name_female()} {fake_es.last_name()} {fake_es.last_name()}"

        if len(nombre.split()) == num_palabras:
            return nombre


def detectar_entidades(texto):
    """Detecta entidades PER usando BERT"""
    resultados = ner_model(texto)
    entidades = []

    for ent in resultados:
        if ent['entity_group'] == 'PER':
            start = ent['start']
            end = ent['end']
            original = texto[start:end]

            # Ajuste fino para capturar correctamente los espacios
            while start > 0 and texto[start-1] != ' ':
                start -= 1
            while end < len(texto) and texto[end] != ' ':
                end += 1

            entidades.append((start, end, original.strip()))

    return entidades


def traducir_y_anonimizar(texto):
    """Traduce y anonimiza manteniendo el número de palabras y el género"""
    if not texto.strip():
        return texto

    # Traducción
    try:
        if len(texto) > 3 and detect(texto) != "es":
            texto = GoogleTranslator(source="auto", target="es").translate(texto)
    except LangDetectException:
        pass

    # Anonimizar nombres en "Me llamo..."
    texto = re.sub(
        r"(Me llamo\s+)([A-ZÁÉÍÓÚÑa-záéíóúñ]+(?:\s+[A-ZÁÉÍÓÚÑa-záéíóúñ]+)*)",
        lambda match: match.group(1) + generar_nombre_con_palabras(match.group(2)),
        texto,
        flags=re.IGNORECASE
    )

    # Anonimizar números
    texto = re.sub(r"\b\d{9}\b", lambda _: fake_es.phone_number(), texto)
    texto = re.sub(r"\b\d{8}[A-Za-z]\b", lambda _: fake_es.ssn(), texto)

    # Detección de entidades con BERT
    entidades = detectar_entidades(texto)
    replacements = []

    for start, end, original in entidades:
        fake_name = generar_nombre_con_palabras(original)
        replacements.append((start, end, fake_name))

    # Aplicar reemplazos en orden inverso
    for start, end, fake_name in sorted(replacements, key=lambda x: -x[0]):
        texto = texto[:start] + fake_name + texto[end:]

    return texto





def procesar_fila(row):
    """Procesa una fila aplicando traducción y anonimización."""
    row["body"] = traducir_y_anonimizar(row["body"])
    return row

# Conectar a la base de datos
with sqlite3.connect("mi_base_de_datos.db") as conn:
    cursor = conn.cursor()

    # Crear la tabla si no existe
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS mi_tabla (
            id INTEGER PRIMARY KEY,
            body TEXT,
            secret TEXT,
            direction TEXT,
            createdAt TEXT,
            OpenchannelAccountId INTEGER,
            OpenchannelInteractionId INTEGER,
            UserId INTEGER,
            ContactId INTEGER,
            AttachmentId INTEGER,
            sentBy TEXT
        );
    """)
    conn.commit()

    # Cargar datos desde Excel (solo si es necesario)
    df = pd.read_excel("MOSTRA_1.xlsx")
    df.to_sql("mi_tabla", conn, if_exists="replace", index=False)

    # Extraer solo columnas relevantes
    df_body = df[["id", "body", "direction","createdAt","UserId", "ContactId"]].copy()

    # Usar tqdm para mostrar progreso
    start_time = time.time()
    with Pool(cpu_count()) as pool:
        result = list(tqdm(pool.imap(procesar_fila, df_body.to_dict(orient="records")), total=len(df_body)))

    # Convertir la lista de diccionarios de vuelta a DataFrame
    df_body = pd.DataFrame(result)

    # Actualizar base de datos en un solo paso eficiente
    df_body.to_sql("mi_tabla", conn, if_exists="replace", index=False)

    elapsed_time = time.time() - start_time
    print(f"Procesamiento completado en {elapsed_time:.2f} segundos.")






Some weights of the model checkpoint at mrm8488/bert-spanish-cased-finetuned-ner were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu
  0%|          | 0/6943 [00:00<?, ?it/s]Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
  0%|          | 1/6943 [00:02<4:10:07,  2.16s/it]Asking to truncate to max_length but no maximum length is provid