In [None]:
%load_ext rich

In [None]:
import os
import json

import requests
from tqdm import tqdm

API_URL = "http://localhost:8999"  # Url for debugger. change it to your own

## Sample document

In [None]:
doc_path = "/resources/data/sample/document-01.docx"

## /document-extract endpoint output

In [None]:
# Function to extract document using the API
def extract_document(file_path: str) -> dict:
    # Open the file in binary mode and send the POST request
    with open(file_path, "rb") as file:
        files = {"file": file}
        response = requests.post(url=f"{API_URL}/document-extract", files=files)
    response.raise_for_status()
    return response.json()

In [None]:
# /document-extract endpoint output
extracted_document = extract_document(doc_path)
extracted_document

In [None]:
len(extracted_document["document"])

## Inference

In [None]:
# Function to make inference using the API
def get_predictions(sample: str) -> dict:
    response = requests.post(url=f"{API_URL}/anonymizer/predict", json={"text": sample})
    response.raise_for_status()
    return response.json()

In [None]:
predictions = [
    get_predictions(paragraph) for paragraph in tqdm(extracted_document["document"])
]
predictions

In [None]:
predictions[0]

In [None]:
from itertools import groupby


def get_entities(prediction):
    return prediction["labels"]


# entities = [entity for prediction in predictions for entity in get_entities(prediction)]
# entities = sorted(entities, key=lambda x: x["attrs"]["aymurai_label"])

entities = [entity for prediction in predictions for entity in get_entities(prediction)]
entities = [(i, entity) for i, entity in enumerate(entities)]
entities = sorted(entities, key=lambda x: x[1]["attrs"]["aymurai_label"])


groups = {
    label: list(group)
    for label, group in groupby(entities, key=lambda x: x[1]["attrs"]["aymurai_label"])
}
groups

In [None]:
import jiwer

group = groups["PER"]
example = group[0]
# other = [ent["text"] for ent in group[1:]]
other = [ent[1]["text"] for ent in group[1:]]

display(example)

scores = {
    "cer": [jiwer.cer(example[1]["text"], text) for text in other],
    "wer": [jiwer.wer(example[1]["text"], text) for text in other],
    "mer": [jiwer.mer(example[1]["text"], text) for text in other],
}


In [None]:
import numpy as np
import regex
import unicodedata
from jarowinkler import jarowinkler_similarity


def normalize_text(text):
    # normalize tildes
    text = unicodedata.normalize("NFKD", text)
    text = "".join(char for char in text if unicodedata.category(char) != "Mn")

    # remove extra spaces and special characters
    text = regex.sub(r"\s+", " ", text)
    text = regex.sub(r"\p{P}", "", text)

    # lowercase
    text = text.lower()

    return text


def compute_norm_cer(x, y):
    x = normalize_text(x)
    y = normalize_text(y)
    return jiwer.cer(x, y) / len(x)


def compute_jaro_winkler(x, y):
    x = normalize_text(x)
    y = normalize_text(y)
    return jarowinkler_similarity(x.split(), y.split())


def compute_word_subset(x, y):
    x = normalize_text(x)
    y = normalize_text(y)

    x = set(x.split())
    y = set(y.split())
    return bool(x & y)


scores = {
    "cer": np.array(
        [
            [
                compute_norm_cer(sample1[1]["text"], sample2[1]["text"])
                for sample2 in group
            ]
            for sample1 in group
        ]
    ),
    "jaro_winkler": np.array(
        [
            [
                compute_jaro_winkler(sample1[1]["text"], sample2[1]["text"])
                for sample2 in group
            ]
            for sample1 in group
        ]
    ),
    "word_subset": 1
    - np.array(
        [
            [
                compute_word_subset(sample1[1]["text"], sample2[1]["text"])
                for sample2 in group
            ]
            for sample1 in group
        ]
    ),
}

scores["jaro_winkler"].shape, scores["word_subset"].shape

In [None]:
from sklearn.cluster import DBSCAN

dbscan = DBSCAN(eps=0.01, min_samples=2)

clusters = dbscan.fit_predict(scores["word_subset"])
clusters

labels = set(clusters)
labels.discard(-1)
centroids = [
    " ".join(
        [normalize_text(group[i][1]["text"]) for i in np.where(clusters == label)[0]]
    )
    for label in labels
]
centroids

centroids_adj = np.array(
    [[compute_word_subset(c1, c2) for c2 in centroids] for c1 in centroids]
)
centroids_adj
new_labels = np.argmax(centroids_adj, axis=-1)

clusters = np.array([new_labels[label] if label != -1 else -1 for label in clusters])
clusters

In [None]:
import pandas as pd

results = pd.DataFrame(
    {
        "text": [group[i][1]["text"] for i in range(len(clusters))],
        "norm_text": [
            normalize_text(group[i][1]["text"]) for i in range(len(clusters))
        ],
        "index": [group[i][0] for i in range(len(clusters))],
        "cluster": clusters,
    }
)

results = results.sort_values("cluster")


results.groupby("cluster").apply(lambda x: x["norm_text"].tolist()).to_dict()

In [None]:
results