### Импорт необходимых библиотек

In [1]:
import urllib3
import pandas as pd
import numpy as np
import random

from keycloak import KeycloakOpenID
from openai import AsyncClient
from catboost import CatBoostClassifier
import umap.umap_ as umap
from hdbscan import HDBSCAN
from collections import defaultdict
from tqdm import tqdm


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

### Настройка доступов, аутентификация и инициализация клиента для эмбеддингов


In [2]:
LLM_ENDPOINT = ""
AUTH_USERNAME = ""
AUTH_PASSWORD = ""
API_TOKEN = ""


client = AsyncClient(
    base_url=LLM_ENDPOINT,
    api_key=API_TOKEN
)

keycloak_openid = KeycloakOpenID(
    server_url="",
    client_id="",
    realm_name="",
    verify=False
)


def get_api_token_from_keycloack() -> str:
    token = keycloak_openid.token(username=AUTH_USERNAME, password=AUTH_PASSWORD)
    return token["access_token"]

access_token = get_api_token_from_keycloack()

### Загрузка данных и предобработка

In [3]:
df = pd.read_csv('lemma_data.csv')
df['clean_purpose'] = df['purpose'].str.lower()
df['clean_purpose'] = df['clean_purpose'].str.replace('ндс не облагается', '', regex=False)


### Загрузка модели и предсказание классов

In [4]:
model = CatBoostClassifier()
model.load_model('model_multiclass.cb')

<catboost.core.CatBoostClassifier at 0x7f6a7088ded0>

In [5]:
test_set = df.sample(100, random_state=69).clean_purpose.values


In [6]:
preds = model.predict(test_set.reshape(len(test_set), -1))


In [7]:
other_indexes = []
for idx, p in enumerate(preds):
    if p[0] == 'other':
        other_indexes.append(idx)

other_texts = test_set[other_indexes]

### Генерация эмбеддингов для текстов категории "other"


In [8]:
client.api_key = get_api_token_from_keycloack()

all_embeds = []

for i in tqdm(range(0, len(other_indexes), 100), desc="Embeddings generation"):
    for _ in range(2):
        try:
            e5_embeddings = await client.embeddings.create(
                input=other_texts[i:i+100],
                model="e5"
            )
            embeddings = [item.embeddings for item in e5_embeddings.data]
            break
        except:
            client.api_key = get_api_token_from_keycloack()
    all_embeds.append(embeddings)

Embeddings generation: 100%|██████████| 1/1 [00:00<00:00,  1.08it/s]


In [9]:
big_data = []
for batch_embeddings in all_embeds:
    big_data.extend(batch_embeddings)

### Уменьшение размерности с помощью UMAP

In [10]:
umap_model = umap.UMAP(
    n_components=5,
    min_dist=0.0,
    metric='cosine',
    random_state=42
)

reduced_embeddings = umap_model.fit_transform(big_data)

  warn(


In [11]:
embedding_dict = {other_texts[i]: reduced_embeddings[i] for i in range(len(other_texts))}


### Кластеризация на большие кластеры

In [12]:
hdbscan_model = HDBSCAN(
    min_cluster_size=50,
    metric="euclidean",
    cluster_selection_method="eom"
)

clusters = hdbscan_model.fit_predict(reduced_embeddings)



In [13]:
cluster_texts = defaultdict(list)
for text, cluster in zip(other_texts, clusters):
    cluster_texts[cluster].append(text)

cluster_texts = dict(cluster_texts)


### Идея: из каждого кластера берём по немного из ближней, средней и дальней зон от центроида, чтобы получить разнообразные примеры

In [14]:
def compute_centroid(vectors: np.ndarray) -> np.ndarray:
    return vectors.mean(axis=0)


def sample_zone(zone, k):
    if len(zone) <= k:
        return zone
    else:
        return random.sample(zone, k)

In [15]:
SAMPLES_PER_ZONE = 40
sample_for_annotation = []


for cluster_id, texts in cluster_texts.items():
    if not texts:
        continue

    cluster_vectors = np.array([embedding_dict[t] for t in texts])
    centroid = compute_centroid(cluster_vectors)
    distances = np.linalg.norm(cluster_vectors - centroid, axis=1)
    text_dist_pairs = list(zip(texts, distances))
    text_dist_pairs.sort(key=lambda x: x[1])
    
    n = len(text_dist_pairs)
    near_boundary = n // 3
    mid_boundary = 2 * n // 3
    
    near_zone = text_dist_pairs[:near_boundary]
    mid_zone  = text_dist_pairs[near_boundary:mid_boundary]
    far_zone  = text_dist_pairs[mid_boundary:]
    
    near_samples = sample_zone(near_zone, SAMPLES_PER_ZONE)
    mid_samples = sample_zone(mid_zone, SAMPLES_PER_ZONE)
    far_samples = sample_zone(far_zone, SAMPLES_PER_ZONE)
    
    for (txt, dist) in (near_samples + mid_samples + far_samples):
        sample_for_annotation.append((cluster_id, txt, dist))

final_texts_for_annotation = [item[1] for item in sample_for_annotation]

print(f"Всего выбрано {len(final_texts_for_annotation)} текстов для разметки.")

Всего выбрано 88 текстов для разметки.


### Удаление «почти дублей»  с помощью кластеризации на мини-кластеры


In [16]:
def remove_near_duplicates_hdbscan(selected_texts, embedding_dict, min_cluster_size=2):
    """
    Убирает «почти дубли» с помощью HDBSCAN.
    
    :param selected_texts: список текстов, в которых ищем дубликаты
    :param embedding_dict: словарь {текст: вектор np.ndarray}
    :param min_cluster_size: минимальный размер кластера. Если выставить 2,
                            то объединяются только тексты с близкими эмбеддингами.
    :return: список текстов без почти дублей
    """
    X = np.array([embedding_dict[text] for text in selected_texts])
    
    clusterer = HDBSCAN(min_cluster_size=min_cluster_size, metric="euclidean")
    labels = clusterer.fit_predict(X)

    unique_labels = np.unique(labels)
    deduped_texts = []
    
    for label in unique_labels:
        cluster_indices = np.where(labels == label)[0]
        representative_idx = cluster_indices[0]
        deduped_texts.append(selected_texts[representative_idx])

    return deduped_texts

In [17]:
clean_texts = remove_near_duplicates_hdbscan(
    selected_texts=final_texts_for_annotation,
    embedding_dict=embedding_dict,
    min_cluster_size=2
)

print("До удаления почти дублей:", len(final_texts_for_annotation))
print("После удаления почти дублей:", len(clean_texts))

До удаления почти дублей: 88
После удаления почти дублей: 17


