In [1]:
import os
import threading
from langchain_aws import ChatBedrock
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
import warnings
import spacy
import numpy as np 
from colorama import init, Fore

In [2]:
init(autoreset=True)

In [3]:
MODELS = [
    "meta.llama3-70b-instruct-v1:0", #Big Llama
    "anthropic.claude-3-haiku-20240307-v1:0", #Big Claude
    "anthropic.claude-3-5-sonnet-20240620-v1:0", #Last Claude
    "anthropic.claude-3-sonnet-20240229-v1:0", #Last middle Claude
    "mistral.mixtral-8x7b-instruct-v0:1", #Big Mistral
    "meta.llama3-8b-instruct-v1:0" #Small Llama
]

In [4]:
prompt1 = """
Devuelve el mismo texto sin eliminar nada, pero con la información personal que has decidido etiquetar dentro de claudátores, junto al nombre de la etiqueta correspondiente, por ejemplo: [**arquitecto|PROFESION**]
No comentes nada más
"""

# Poner -> Etiquetas 

In [5]:
meddocan = """
Anota todos los datos de información personal médica que encuentres en este informe utilizando las guias meddocan a continuación:
Nombres
Datos geográficos
Todos los elementos de las fechas
Números telefónicos
Números FAX
Correos electrónicos
Números de Seguridad Social
Números de registros médicos
Números de beneficiarios del plan de salud
Números de cuenta11.Certificado / números de licencia
Identificadores de vehículos y números de serie, incluidas placas
Identificadores de dispositivo y números de serie
URL web
Direcciones de protocolo de Interne
Identificadores biométricos (es decir, escaneo retiniano, huellas dactilar
Fotos de cara completa e imágenes comparable
Cualquier número de identificación único, característica o código
"""

In [6]:
prompt_template = ChatPromptTemplate.from_messages([
    ("system", "You are an anonimization tool in identifying attributes in texts that can identify or quasi-identify a user."),
    ("user",    """
                    {prompt1} 
                    {text} 
                    {meddocan}
                """)
])

In [7]:
parser = StrOutputParser()

In [8]:
def get_text_and_masked_carmen(name):
    filename = f'./data/processed/txt/{name}'
    filename_result = f'./data/processed/masked/{name}'
    with open(filename, 'r') as archivo:
        text = archivo.read()

    with open(filename_result, 'r') as archivo:
        text_masked = archivo.read()

    return [text, text_masked]

In [9]:
list_data  = []
counter = 0

In [10]:
import threading
llm_small_llama =   ChatBedrock(  
                            model_id=MODELS[5],
                            region_name='eu-west-2',
                            model_kwargs=dict(temperature=0.1),
                    )
llm_big_llama   =   ChatBedrock(    
                            model_id=MODELS[0],
                            region_name='eu-west-2',
                            model_kwargs=dict(temperature=0.1),
                    )
llm_haiku       =   ChatBedrock(    
                            model_id=MODELS[1],
                            region_name='eu-west-3',
                            model_kwargs=dict(temperature=0.1),
                    )
llm_sonet       =   ChatBedrock(    
                            model_id=MODELS[3],
                            region_name='eu-west-3',
                            model_kwargs=dict(temperature=0.1),
                    )
llm_mistral     =   ChatBedrock(
                        model_id=MODELS[4],
                        region_name='eu-west-2',
                        model_kwargs=dict(temperature=0.1)
                    )

In [11]:
def save_file(filename, text):
    with open(filename, 'w') as archivo:
        archivo.write(text)

In [12]:
def create_folder(name):
    try:
        os.mkdir(name)
        print(f"Folder '{name}' created successfully")
    except FileExistsError:
        pass
    except Exception as e:
        print(f"Error creating the folder '{name}': {e}")


# Metrics

In [13]:
import re
import warnings
import spacy

warnings.filterwarnings("ignore", message="\[W008\] Evaluating Doc.similarity based on empty vectors")
# Cargar el modelo de lenguaje en español mediano

nlp = spacy.load("es_core_news_md")
# Función de similitud de embeddings
def embedding_similarity(str1, str2, threshold=0.8):
    doc1 = nlp(str1)
    doc2 = nlp(str2)
    similarity = doc1.similarity(doc2)
    return similarity >= threshold

def eliminar_adverbios_preposiciones_determinantes(texto):
    doc = nlp(texto)
    # Eliminar preposiciones (ADP) y determinantes (DET)
    tokens_filtrados = [token.text for token in doc if token.pos_ not in ('ADP', 'DET')]
    return ' '.join(tokens_filtrados)

## Cosine similarity

In [14]:
def get_cos_sim(text_hoped, text_generated):
    vectorizer = TfidfVectorizer(token_pattern=r"(?u)\b\w[\w\-/]*\b")
    tfidf_matrix = vectorizer.fit_transform([text_hoped, text_generated])

    try:
        cosine_sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])
    except: 
        return 0.0
    return cosine_sim[0][0]

## Levenshtein distance

In [15]:
from tqdm import tqdm

def levenshtein_distance(s1, s2, show_progress=True):
    """
    Calcula la distancia de Levenshtein entre dos cadenas.

    La distancia de Levenshtein es el número mínimo de operaciones de edición 
    (inserción, eliminación o sustitución de un carácter) necesarias para 
    transformar una cadena en otra.

    Parámetros:
        s1 (str): Primera cadena
        s2 (str): Segunda cadena
        show_progress (bool): Si es True, muestra una barra de progreso. 
                              Por defecto es False.
    Retorna:
        int: La distancia de Levenshtein entre s1 y s2
    """
    # Usar tqdm solo si show_progress es True
    iterable = tqdm(s1) if show_progress else s1

    if len(s1) < len(s2):
        s1, s2 = s2, s1
    if len(s2) == 0:
        return len(s1)

    previous_row = list(range(len(s2) + 1))
    for i, c1 in enumerate(iterable):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
    return previous_row[-1]



## Precision and Recall

In [16]:
def calc_metrics(ground_truth, predictions):
    # Convertir arrays de ground_truth y predictions a listas de str
    ground_truth_processed = np.array([eliminar_adverbios_preposiciones_determinantes(str(item)) for item in ground_truth])
    predictions_processed = np.array([eliminar_adverbios_preposiciones_determinantes(str(item)) for item in predictions])

    # Crear matrices de similitud de coseno y embedding 
    get_cos_sim_vectorized = np.vectorize(lambda gt, pred: get_cos_sim(str(gt), str(pred)))
    embedding_similarity_vectorized = np.vectorize(lambda gt, pred: embedding_similarity(str(gt), str(pred)))

    cosine_results = get_cos_sim_vectorized(ground_truth_processed[:, None], predictions_processed[None, :])
    embedding_results = embedding_similarity_vectorized(ground_truth_processed[:, None], predictions_processed[None, :])

    # Promediar las similitudes
    avg_similarities = (cosine_results + embedding_results) / 2

    # Determinar verdaderos positivos
    matches = avg_similarities > 0.5
    true_positives = np.sum(np.any(matches, axis=1))

    # Determinar falsos negativos
    false_negatives = len(ground_truth) - true_positives

    # Determinar falsos positivos
    predicted_matches = np.any(matches, axis=0)
    false_positives = len(predictions) - np.sum(predicted_matches)

    # Cálculo de métricas
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1

def evaluate(masked, generated):
    """ 
    Input: 
        - masked (str): Ground_truth text
        - generated(str): Text to be evaluated

    Output:
        - Precision, Recall and F1 (float)
    """
    ground_truth = re.findall(r'\[\*\*(.*?)\*\*\]', masked)
    predictions = re.findall(r'\[\*\*(.*?)\*\*\]', generated)
    labels = [ground_truth, predictions]
    
    return [calc_metrics(ground_truth, predictions), labels]

# Loop

In [17]:
def anonimized(llm=None, name_model=""):
    counter = 0
    path = './data/processed/txt'
    chain = prompt_template | llm | parser
    for filename in os.listdir(path):
        [text, text_hoped] = get_text_and_masked_carmen(filename)
        text_generated = chain.invoke({"prompt1":prompt1, "text": text, "meddocan": meddocan})
        create_folder(f'data/anon/raw/{name_model}')
        save_file(f'data/anon/raw/{name_model}/{filename}', text_generated)
        [cal_met, labels] = evaluate(text_hoped, text_generated)    
        cosine_sim = get_cos_sim(text_hoped, text_generated)
        text_generated = text_generated.replace('[**', '').replace('**]', '')
        text_hoped = text_hoped.replace('[**', '').replace('**]', '')
        result = levenshtein_distance(text_generated, text_hoped[:len(text_generated)], show_progress=False)
        metrics_data = {}
        metrics_data["filename"] = filename
        metrics_data["precision"] = cal_met[0]
        metrics_data["recall"] = cal_met[1]
        metrics_data["f1"] = cal_met[2]
        metrics_data["cos"] = cosine_sim
        metrics_data["levenshtein"] = result
        metrics_data["labels hoped"] = labels[0]
        metrics_data["labels generated"] = labels[1]
        list_data.append(metrics_data)
        counter += 1
        if counter > 4:
            break

In [18]:
thread_small_llama = threading.Thread(target=anonimized, args=(llm_small_llama, "small_llama" ))
thread_big_llama = threading.Thread(target=anonimized, args=(llm_big_llama, "big_llama" ))
thread_haiku = threading.Thread(target=anonimized, args=(llm_haiku, "haiku" ))
thread_sonet = threading.Thread(target=anonimized, args=(llm_sonet, "sonet" ))
thread_mistral = threading.Thread(target=anonimized, args=(llm_mistral, "mistral" ))

In [None]:
thread_small_llama.start()
thread_big_llama.start()
thread_haiku.start()
thread_sonet.start()

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/zq/7jhg214n1211qtx7dtxxljvc0000gn/T/ipykernel_53336/2501132566.py", line 10, in anonimized
  File "/var/folders/zq/7jhg214n1211qtx7dtxxljvc0000gn/T/ipykernel_53336/3886728708.py", line 47, in evaluate
  File "/var/folders/zq/7jhg214n1211qtx7dtxxljvc0000gn/T/ipykernel_53336/3886728708.py", line 10, in calc_metrics
  File "/Users/petteraxcell/Library/Python/3.9/lib/python/site-packages/numpy/lib/function_base.py", line 2328, in __call__
    return self._vectorize_call(func=func, args=vargs)
  File "/Users/petteraxcell/Library/Python/3.9/lib/p