# Parsing + Chunking

### Парсинг веб-страниц:

In [None]:
import requests
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter


def parse_url(url):
    """
    Парсит содержимое указанного URL и возвращает текстовое содержимое страницы.

    Args:
        url (str): URL-адрес для парсинга.

    Returns:
        dict: Словарь с ключами 'text' (текстовая информация) и 'description' (краткое описание страницы).
    """
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")

        # Извлекаем краткое описание (title страницы)
        description = soup.title.string.strip() if soup.title else url

        # Обработка контента
        soup_copy = BeautifulSoup(response.text, "html.parser")
        for tag in soup_copy.find_all(['pre', 'code']):
            tag.replace_with(tag.get_text(separator=" ", strip=True))
        final_text = soup_copy.get_text(separator="\n", strip=True)

        return {'text': final_text, 'description': description}

    except requests.exceptions.RequestException as e:
        print(f"Ошибка при загрузке страницы {url}: {e}")
        return {'text': '', 'description': url}


def filter_text(text, min_words=3):
    """
    Фильтрует текст, удаляя строки с минимальным количеством слов и стоп-фразы.

    Args:
        text (str): Исходный текст.
        min_words (int): Минимальное количество слов в строке для сохранения.

    Returns:
        str: Отфильтрованный текст.
    """
    if not text:
        return ''

    lines = text.split("\n")
    filtered_text = [line.strip() for line in lines if len(line.split()) > min_words]

    # Поиск стоп-фраз
    stop_phrases = [
        "Политика в отношении файлов cookie",
        "Мы используем cookie",
        "Дата обращения:",
        "Использованная литература и источники:"
    ]
    cutoff_index = next((i for i, line in enumerate(filtered_text) if any(phrase in line for phrase in stop_phrases)), None)

    if cutoff_index is not None:
        filtered_text = filtered_text[:cutoff_index]

    return "\n".join(filtered_text)


def save_to_file(filename, content):
    """
    Сохраняет текстовое содержимое в указанный файл.

    Args:
        filename (str): Имя файла для сохранения.
        content (str): Текстовое содержимое для записи.
    """
    with open(filename, "w", encoding="utf-8") as file:
        file.write(content)


def load_from_file(filename):
    """
    Загружает текстовое содержимое из указанного файла.

    Args:
        filename (str): Имя файла для чтения.

    Returns:
        str: Текстовое содержимое файла.
    """
    with open(filename, "r", encoding="utf-8") as file:
        return file.read()


def generate_chunks(loaded_text, url_data, chunk_size=1500, chunk_overlap=0):
    """
    Генерирует чанки из загруженного текста, добавляя описание источника в начало каждого чанка.

    Args:
        loaded_text (str): Загруженный текст из файла.
        url_data (dict): Словарь с описаниями страниц.
        chunk_size (int): Максимальный размер чанка.
        chunk_overlap (int): Перекрытие между чанками.

    Returns:
        list: Список отформатированных чанков.
    """
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", "."],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len
    )

    all_chunks = []
    chunks = text_splitter.split_text(loaded_text)

    # Распределение чанков по источникам (простая эвристика)
    source_descriptions = list(url_data.values())

    for i, text in enumerate(loaded_text.split('\n\n\n')):
        chunks = text_splitter.split_text(text)
        for chunk in chunks:
            source_description = source_descriptions[i]['description']
            formatted_chunk = f"[Источник: {source_description}]\n{chunk}"
            all_chunks.append(formatted_chunk)

    return all_chunks


In [84]:
# Список URL-адресов
urls = [
    "https://www.eurochem.ru/",
    "https://www.eurochem.ru/global-operations/",
    "https://www.eurochem.ru/about-us/komplaens/",
    "https://www.eurochem.ru/proteh-lab/",
    "https://digtp.com/",
    "https://digtp.com/projects/machine-learning-platforma",
    "https://digtp.com/projects/rekomendatelnye-modeli",
    "https://digtp.com/projects/mobilnoe-prilozenie-mineralogiia",
    "https://digtp.com/contacts",
    "https://www.eurochem-career.com/news/iskusstvennyi-intellekt-v-ximii-gpt-assistenty-v-evroxime",
    "https://otus.ru/instructors/10517",
    "https://ru.wikipedia.org/wiki/ЕвроХим",
    "https://www.eurochem.ru/usolskij-kalijnyj-kombinat/",
    "https://uralmines.ru/evrohim-usolskij-kalijnyj-kombinat/",
    "https://docs.ultralytics.com/tasks/segment",
    "https://docs.ultralytics.com/tasks/detect",
    "https://docs.ultralytics.com/tasks",
    "https://docs.ultralytics.com/modes/",
    "https://docs.ultralytics.com/solutions",
    "https://github.com/Koldim2001",
    "https://github.com/Koldim2001/YOLO-Patch-Based-Inference",
    "https://github.com/Koldim2001/TrafficAnalyzer",
    "https://github.com/Koldim2001/COCO_to_YOLOv8"
]

# Словарь с описаниями и текстами
url_data = {}

# Парсинг и фильтрация
for url in urls:
    print(f"Парсинг {url}...")
    parsed_data = parse_url(url)
    if parsed_data['text']:
        filtered_text = filter_text(parsed_data['text'])
        url_data[url] = {
            'text': filtered_text,
            'description': parsed_data['description']
        }

Парсинг https://www.eurochem.ru/...
Парсинг https://www.eurochem.ru/global-operations/...
Парсинг https://www.eurochem.ru/about-us/komplaens/...
Парсинг https://www.eurochem.ru/proteh-lab/...
Парсинг https://digtp.com/...
Парсинг https://digtp.com/projects/machine-learning-platforma...
Парсинг https://digtp.com/projects/rekomendatelnye-modeli...
Парсинг https://digtp.com/projects/mobilnoe-prilozenie-mineralogiia...
Парсинг https://digtp.com/contacts...
Парсинг https://www.eurochem-career.com/news/iskusstvennyi-intellekt-v-ximii-gpt-assistenty-v-evroxime...
Парсинг https://otus.ru/instructors/10517...
Парсинг https://ru.wikipedia.org/wiki/ЕвроХим...
Парсинг https://www.eurochem.ru/usolskij-kalijnyj-kombinat/...
Парсинг https://uralmines.ru/evrohim-usolskij-kalijnyj-kombinat/...
Парсинг https://docs.ultralytics.com/tasks/segment...
Парсинг https://docs.ultralytics.com/tasks/detect...
Парсинг https://docs.ultralytics.com/tasks...
Парсинг https://docs.ultralytics.com/modes/...
Парсинг http

In [85]:
# Сохранение объединенного текста
combined_text = "\n\n\n".join([data['text'] for data in url_data.values()])
save_to_file("output_parsing.txt", combined_text)
print("Итоговый текст сохранен в файл output_parsing.txt")

Итоговый текст сохранен в файл output_parsing.txt


In [89]:
# Загрузка текста для чанкинга
loaded_text = load_from_file("result_parsing.txt")

# Генерация чанков
all_chunks = generate_chunks(loaded_text, url_data, chunk_size=1500, chunk_overlap=0)

# Сохранение чанков
with open("chunks_output.txt", "w", encoding="utf-8") as file:
    for i, chunk in enumerate(all_chunks):
        file.write(f"Чанк {i+1} ({len(chunk)} символов):\n{chunk}\n{'='*50}\n")
print("Чанки сохранены в файл chunks_output.txt")

Чанки сохранены в файл chunks_output.txt


# Производим сохранение в БД векторов:

In [None]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
import numpy as np
from langchain.embeddings.base import Embeddings
import requests
from typing import List

# Подключение к Milvus
def connect_to_milvus(host="localhost", port="19530"):
    """
    Устанавливает соединение с Milvus.

    Args:
        host (str): Хост Milvus.
        port (str): Порт Milvus.

    Returns:
        None
    """
    print(f"Connecting to Milvus at {host}:{port}...")
    connections.connect("default", host=host, port=port)
    print("Connected successfully!")


# Создание коллекции в Milvus (с проверкой существования)
def create_milvus_collection(collection_name, dim):
    """
    Создает новую коллекцию в Milvus, если она еще не существует.

    Args:
        collection_name (str): Имя коллекции.
        dim (int): Размерность векторов.

    Returns:
        Collection: Экземпляр коллекции.
    """
    # Проверяем, существует ли коллекция
    if utility.has_collection(collection_name) and False:
        print(f"Collection '{collection_name}' already exists. Loading the collection...")
        collection = Collection(name=collection_name)
    else:
        # Определение полей коллекции
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=dim),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
            FieldSchema(name="chunk_length", dtype=DataType.INT64)
        ]
        schema = CollectionSchema(fields, description="Collection for text chunks")
        collection = Collection(name=collection_name, schema=schema)
        print(f"Collection '{collection_name}' created successfully.")
    
    return collection


# Генерация векторов для чанков
def generate_embeddings(chunks, embedder):
    """
    Генерирует векторные представления для списка чанков.

    Args:
        chunks (list): Список текстовых чанков.
        embedder (callable): Функция или модель для генерации эмбеддингов.

    Returns:
        list: Список векторных представлений.
    """
    embeddings = []
    for chunk in chunks:
        embedding = embedder(chunk)  # Предполагается, что embedder принимает строку и возвращает вектор
        embeddings.append(embedding)
    return embeddings


class CustomEmbedder(Embeddings):
    def __init__(self, embedder_url="http://localhost:8080/embed"):
        self.embedder_url = embedder_url

    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Получает эмбеддинги для списка текстов."""
        response = requests.post(
            self.embedder_url,
            json={"inputs": texts}
        )
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Ошибка: {response.status_code}, {response.text}")

    def embed_query(self, text: str) -> List[float]:
        """Получает эмбеддинг для одного текста."""
        return self.embed_documents([text])[0]


# Загрузка данных в Milvus
def insert_data_into_milvus(collection, chunks, embedder):
    """
    Вставляет чанки текста и их векторные представления в Milvus.

    Args:
        collection (Collection): Коллекция Milvus.
        chunks (list): Список текстовых чанков.
        embedder (callable): Функция или модель для генерации эмбеддингов.

    Returns:
        None
    """
    
    # Получаем эмбеддинги для списка текстов
    embeddings = embedder.embed_documents(chunks)
    print(np.array(embeddings).shape)
    texts = [chunk for chunk in chunks]
    lengths = [len(chunk) for chunk in chunks]

    # Преобразование данных в формат, приемлемый для Milvus
    data = [
        embeddings,  # Векторы
        texts,       # Тексты чанков
        lengths      # Длины чанков
    ]
    # Вставка данных в коллекцию
    collection.insert(data)
    
    print("Data inserted into Milvus successfully.")


def create_index(collection_name, field_name="embedding", index_params=None):
    """
    Создает индекс на указанном поле в коллекции Milvus.

    Args:
        collection_name (str): Имя коллекции.
        field_name (str): Поле, на котором создается индекс (по умолчанию "embedding").
        index_params (dict): Параметры индекса (по умолчанию IVF_FLAT).

    Returns:
        None
    """
    # Проверяем, существует ли коллекция
    if not utility.has_collection(collection_name):
        print(f"Collection '{collection_name}' does not exist.")
        return

    # Загружаем коллекцию
    collection = Collection(name=collection_name)

    # Проверяем, создан ли уже индекс
    if collection.has_index():
        print(f"Index already exists for collection '{collection_name}'.")
        return

    # Устанавливаем параметры индекса (по умолчанию используем IVF_FLAT)
    if index_params is None:
        index_params = {
            "index_type": "IVF_FLAT",  # Тип индекса
            "params": {"nlist": 128},  # Количество кластеров
            "metric_type": "IP"        # Метрика (внутреннее произведение для косинусной близости)
        }

    # Создаем индекс
    collection.create_index(field_name=field_name, index_params=index_params)
    print(f"Index created on field '{field_name}' for collection '{collection_name}'.")

    # Выгружаем и заново загружаем коллекцию для применения индекса
    collection.load()

In [212]:
def display_milvus_collection(collection_name, limit=10):
    """
    Выводит содержимое указанной коллекции Milvus.

    Args:
        collection_name (str): Имя коллекции.
        limit (int): Количество записей для вывода (по умолчанию 10).

    Returns:
        None
    """
    # Проверяем, существует ли коллекция
    if not utility.has_collection(collection_name):
        print(f"Collection '{collection_name}' does not exist.")
        return

    # Загружаем коллекцию
    collection = Collection(name=collection_name)

    # Проверяем, есть ли данные в коллекции
    if collection.num_entities == 0:
        print(f"Collection '{collection_name}' is empty.")
        return

    # Выполняем запрос для получения данных
    collection.load()  # Загружаем данные в память (если они ещё не загружены)
    results = collection.query(expr="id >= 0", output_fields=["id", "text", "chunk_length"], limit=limit)

    # Выводим результаты
    print(f"Displaying {len(results)} records from collection '{collection_name}':")
    for record in results:
        print(f"ID: {record['id']}, Text: {record['text'][:50]}..., Length: {record['chunk_length']}")

In [213]:
# Создаем экземпляр эмбеддера
embedder = CustomEmbedder(embedder_url="http://localhost:8080/embed")

# Подключение к Milvus
connect_to_milvus(host="localhost", port="19530")

# Создание коллекции (если она еще не существует)
collection_name = "text_chunks"
dim = 1024  # Размерность векторов (замените на реальную размерность вашего эмбеддера)

collection = create_milvus_collection(collection_name, dim)
insert_data_into_milvus(collection, all_chunks, embedder)

Connecting to Milvus at localhost:19530...
Connected successfully!
Collection 'text_chunks' created successfully.
(123, 1024)
Data inserted into Milvus successfully.


In [214]:
display_milvus_collection(collection_name)

Displaying 10 records from collection 'text_chunks':
ID: 456077840787178679, Text: [Источник: АО «Минерально-химическая компания Евро..., Length: 770
ID: 456077840787178680, Text: [Источник: Наши активы - добыча, производство, про..., Length: 1412
ID: 456077840787178681, Text: [Источник: Наши активы - добыча, производство, про..., Length: 1491
ID: 456077840787178682, Text: [Источник: Комплаенс]
Удобрения и кормовые продукт..., Length: 1482
ID: 456077840787178683, Text: [Источник: Комплаенс]
Основная роль в противодейст..., Length: 1417
ID: 456077840787178684, Text: [Источник: Комплаенс]
создана в ЕвроХим как один и..., Length: 1410
ID: 456077840787178685, Text: [Источник: Комплаенс]
как часть системы комплаенс-..., Length: 948
ID: 456077840787178686, Text: [Источник: ПроТех Лаб]
Удобрения и кормовые продук..., Length: 1503
ID: 456077840787178687, Text: [Источник: ПроТех Лаб]
НИЦ ПроТехИнжиниринг (г. Са..., Length: 1460
ID: 456077840787178688, Text: [Источник: ПроТех Лаб]
4. Внедрение и

In [215]:
def search_similar_chunks(collection_name, query_embedding, top_k=15):
    """
    Выполняет поиск top_k самых ближайших чанков к заданному запросу в коллекции Milvus.

    Args:
        collection_name (str): Имя коллекции.
        query_embedding (list): Векторный запрос (эмбеддинг).
        top_k (int): Количество ближайших чанков для поиска.

    Returns:
        list: Список кортежей (текст чанка, расстояние до запроса).
    """
    # Проверяем, существует ли коллекция
    if not utility.has_collection(collection_name):
        print(f"Collection '{collection_name}' does not exist.")
        return []

    # Загружаем коллекцию
    collection = Collection(name=collection_name)

    # Если индекс не создан, создаём его
    if not collection.has_index():
        create_index(collection_name, field_name="embedding")

    # Если данные еще не загружены, выполняем загрузку
    collection.load()

    # Выполняем поиск
    search_params = {
        "metric_type": "IP",  # Используем внутреннее произведение для косинусной близости
        "params": {"nprobe": 16}  # Параметр для оптимизации поиска
    }
    results = collection.search(
        data=[query_embedding],  # Список запросов (векторов)
        anns_field="embedding",  # Поле для поиска (векторное представление)
        param=search_params,
        limit=top_k,  # Количество результатов
        output_fields=["text", "chunk_length"]  # Дополнительные поля для вывода
    )


    # Обработка результатов
    similar_chunks = []
    seen_text = set()  # Множество для отслеживания уникальных векторов

    for result in results[0]:  # results[0] содержит результаты для первого запроса
        entity = result.entity
        distance = result.distance
        text = entity.get("text")
        chunk_length = entity.get("chunk_length")

        # Проверяем, не встречался ли этот вектор ранее
        if text not in seen_text:
            seen_text.add(text)  # Добавляем вектор в множество
            similar_chunks.append((text, distance, chunk_length))

    return similar_chunks

In [218]:
# Имя коллекции
collection_name = "text_chunks"

# Создание эмбеддинга для запроса
query = "Кто такой Дмитрий Колесников и какие у него проекты?"
query_embedding = embedder.embed_query(query)  # Предполагается, что embedder уже определен
print("Эмбеддинг для запроса:", query_embedding)

# Поиск похожих чанков
similar_chunks = search_similar_chunks(collection_name, query_embedding, top_k=5)

# Вывод результатов
print(f"\nНайдено {len(similar_chunks)} похожих чанков:")
for i, (text, distance, length) in enumerate(similar_chunks, start=1):
    print(f"{i}. Расстояние: {distance:.4f}, Длина: {length}, Текст: {text[:150]}...")

Эмбеддинг для запроса: [-0.01427019, -0.019570012, -0.047941845, -0.04419639, 0.025899833, 4.6123252e-05, 0.0010616028, 0.06569531, 0.041949116, -0.01678901, 0.027622743, 0.03910257, -0.029682744, -0.03736093, -0.021461466, -0.03314729, -0.0647964, 0.027154561, -0.02164874, -0.010197006, 0.040301114, -0.013642826, -0.027604016, -0.031349473, 0.021199284, -0.03702384, -0.04048839, -0.03516984, -0.022360377, -0.015899464, -0.02998238, 0.019083101, -0.010534097, -0.03895275, -0.04232366, 0.012069734, 0.056069486, 0.0326042, -0.02661147, 0.045057844, -0.013914372, 0.05741785, -0.0014267849, -0.024776196, -0.021349104, 0.014803918, 0.026049651, 0.005988049, 0.0019687058, 0.02619947, 0.02275365, 0.016189737, -0.026873652, -0.010037824, -0.05460876, 0.052473847, -0.014354463, -0.00012004482, -0.053110577, 0.035282202, -0.026499106, -0.030375654, 0.06532077, -0.022004558, -0.02054383, 0.073186226, 0.034926385, 0.02370874, -0.055957124, 0.030562926, -0.036967658, 0.028634017, -0.0091435965, -0.

In [177]:
def clear_milvus_collection(collection_name):
    """
    Очищает все данные из указанной коллекции Milvus, оставляя саму коллекцию intact.

    Args:
        collection_name (str): Имя коллекции.

    Returns:
        None
    """
    # Проверяем, существует ли коллекция
    if not utility.has_collection(collection_name):
        print(f"Collection '{collection_name}' does not exist.")
        return

    # Загружаем коллекцию
    collection = Collection(name=collection_name)

    # Проверяем, есть ли данные в коллекции
    if collection.num_entities == 0:
        print(f"Collection '{collection_name}' is already empty.")
        return

    # Очищаем данные
    collection.delete(expr="id > 0")  # Удаляем все записи
    print(f"All data from collection '{collection_name}' has been cleared.")

In [211]:
# Очистка коллекции
clear_milvus_collection(collection_name)

All data from collection 'text_chunks' has been cleared.
