# Загрузка напрямую из документации GigaChat API

In [33]:
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

True

In [36]:
import nest_asyncio
nest_asyncio.apply()

# Загрузка документации GigaChat API с developers.sber.ru

In [52]:
import re
from typing import Optional
from bs4 import BeautifulSoup, SoupStrainer, Tag

from langchain_community.document_loaders.sitemap import SitemapLoader
from typing import Generator
from bs4.element import Doctype, NavigableString

def langchain_docs_extractor(soup: BeautifulSoup) -> str:
    # Remove all the tags that are not meaningful for the extraction.
    SCAPE_TAGS = ["nav", "footer", "aside", "script", "style"]
    [tag.decompose() for tag in soup.find_all(SCAPE_TAGS)]

    def get_text(tag: Tag) -> Generator[str, None, None]:
        for child in tag.children:
            if isinstance(child, Doctype):
                continue

            if isinstance(child, NavigableString):
                yield child
            elif isinstance(child, Tag):
                if child.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
                    yield f"{'#' * int(child.name[1:])} {child.get_text()}\n\n"
                elif child.name == "a":
                    yield f"[{child.get_text(strip=False)}]({child.get('href')})"
                elif child.name == "img":
                    yield f"![{child.get('alt', '')}]({child.get('src')})"
                elif child.name in ["strong", "b"]:
                    yield f"**{child.get_text(strip=False)}**"
                elif child.name in ["em", "i"]:
                    yield f"_{child.get_text(strip=False)}_"
                elif child.name == "br":
                    yield "\n"
                elif child.name == "code":
                    parent = child.find_parent()
                    if parent is not None and parent.name == "pre":
                        classes = parent.attrs.get("class", "")

                        language = next(
                            filter(lambda x: re.match(r"language-\w+", x), classes),
                            None,
                        )
                        if language is None:
                            language = ""
                        else:
                            language = language.split("-")[1]

                        lines: list[str] = []
                        for span in child.find_all("span", class_="token-line"):
                            line_content = "".join(
                                token.get_text() for token in span.find_all("span")
                            )
                            lines.append(line_content)

                        code_content = "\n".join(lines)
                        yield f"```{language}\n{code_content}\n```\n\n"
                    else:
                        yield f"`{child.get_text(strip=False)}`"

                elif child.name == "p":
                    yield from get_text(child)
                    yield "\n\n"
                elif child.name == "ul":
                    for li in child.find_all("li", recursive=False):
                        yield "- "
                        yield from get_text(li)
                        yield "\n\n"
                elif child.name == "ol":
                    for i, li in enumerate(child.find_all("li", recursive=False)):
                        yield f"{i + 1}. "
                        yield from get_text(li)
                        yield "\n\n"
                elif child.name == "div" and "tabs-container" in child.attrs.get(
                    "class", [""]
                ):
                    tabs = child.find_all("li", {"role": "tab"})
                    tab_panels = child.find_all("div", {"role": "tabpanel"})
                    for tab, tab_panel in zip(tabs, tab_panels):
                        tab_name = tab.get_text(strip=True)
                        yield f"{tab_name}\n"
                        yield from get_text(tab_panel)
                elif child.name == "table":
                    thead = child.find("thead")
                    header_exists = isinstance(thead, Tag)
                    if header_exists:
                        headers = thead.find_all("th")
                        if headers:
                            yield "| "
                            yield " | ".join(header.get_text() for header in headers)
                            yield " |\n"
                            yield "| "
                            yield " | ".join("----" for _ in headers)
                            yield " |\n"

                    tbody = child.find("tbody")
                    tbody_exists = isinstance(tbody, Tag)
                    if tbody_exists:
                        for row in tbody.find_all("tr"):
                            yield "| "
                            yield " | ".join(
                                cell.get_text(strip=True) for cell in row.find_all("td")
                            )
                            yield " |\n"

                    yield "\n\n"
                elif child.name in ["button"]:
                    continue
                else:
                    yield from get_text(child)

    joined = "".join(get_text(soup))
    return re.sub(r"\n\n+", "\n\n", joined).strip()


def metadata_extractor(
    meta: dict, soup: BeautifulSoup, title_suffix: Optional[str] = None
) -> dict:
    title_element = soup.find("title")
    description_element = soup.find("meta", attrs={"name": "description"})
    html_element = soup.find("html")
    title = title_element.get_text() if title_element else ""
    if title_suffix is not None:
        title += title_suffix

    return {
        "source": meta["loc"],
        "title": title,
        "description": description_element.get("content", "")
        if description_element
        else "",
        "language": html_element.get("lang", "") if html_element else "",
        **meta,
    }

def simple_extractor(html: str | BeautifulSoup) -> str:
    if isinstance(html, str):
        soup = BeautifulSoup(html, "lxml")
    elif isinstance(html, BeautifulSoup):
        soup = html
    else:
        raise ValueError(
            "Input should be either BeautifulSoup object or an HTML string"
        )
    return re.sub(r"\n\n+", "\n\n", soup.text).strip()

def load_gigachat_docs():
    return SitemapLoader(
        "https://developers.sber.ru/docs/sitemap.xml",
        filter_urls=[r".*/gigachat/.*"],
        parsing_function=simple_extractor,
        default_parser="lxml",
        bs_kwargs={"parse_only": SoupStrainer(name=("article", "title"))},
        meta_function=lambda meta, soup: metadata_extractor(
            meta, soup, title_suffix=" | GigaChat"
        ),
    ).load()


def load_langchain_gigachat_docs():
    return SitemapLoader(
        "https://developers.sber.ru/docs/sitemap.xml",
        filter_urls=[r".*/gigachain/.*"],
        parsing_function=simple_extractor,
        default_parser="lxml",
        bs_kwargs={"parse_only": SoupStrainer(name=("article", "title"))},
        meta_function=lambda meta, soup: metadata_extractor(
            meta, soup, title_suffix=" | langchain-gigachat"
        ),
    ).load()
    
def load_langchain_docs():
    return SitemapLoader(
        "https://python.langchain.com/sitemap.xml",
        filter_urls=["https://python.langchain.com/"],
        parsing_function=langchain_docs_extractor,
        default_parser="lxml",
        bs_kwargs={
            "parse_only": SoupStrainer(
                name=("article", "title", "html", "lang", "content")
            ),
        },
        meta_function=metadata_extractor,
    ).load()

def load_langgraph_docs():
    return SitemapLoader(
        "https://langchain-ai.github.io/langgraph/sitemap.xml",
        parsing_function=simple_extractor,
        default_parser="lxml",
        bs_kwargs={"parse_only": SoupStrainer(name=("article", "title"))},
        meta_function=lambda meta, soup: metadata_extractor(
            meta, soup, title_suffix=" | 🦜🕸️LangGraph"
        ),
    ).load()

In [41]:
gigachat_docs = load_gigachat_docs()
len(gigachat_docs)

Fetching pages: 100%|##########| 58/58 [00:14<00:00,  4.07it/s]


58

In [40]:
gigachain_docs = load_langchain_gigachat_docs()
len(gigachain_docs)

Fetching pages: 100%|##########| 22/22 [00:05<00:00,  4.34it/s]


22

In [47]:
langggraph_docs = load_langgraph_docs()
len(langggraph_docs)

Fetching pages: 100%|##########| 173/173 [00:32<00:00,  5.25it/s]


173

In [53]:
langchain_docs = load_langchain_docs()
len(langchain_docs)

Fetching pages: 100%|##########| 1421/1421 [03:39<00:00,  6.48it/s]


1421

In [54]:
all_docs = []
all_docs.extend(gigachat_docs)
all_docs.extend(gigachain_docs)
all_docs.extend(langggraph_docs)
all_docs.extend(langchain_docs)
len(all_docs)

1674

In [55]:
from langchain.text_splitter import MarkdownTextSplitter

text_splitter = MarkdownTextSplitter(chunk_size=2000, chunk_overlap=500)
doc_splits = text_splitter.split_documents(all_docs)
print(f"Documents splited. Count: {len(doc_splits)}")

Documents splited. Count: 24279


In [56]:
import os
import tqdm
from pinecone import Pinecone, ServerlessSpec

pinecone_api_key = os.environ.get("PINECONE_API_KEY")
pc = Pinecone(api_key=pinecone_api_key)

In [71]:
import time

index_name = os.environ.get("PINECONE_INDEX_NAME", "gigachain-test-gigar-newdb-giant")
pc.create_index(
    name=index_name,
    dimension=3072,
    metric="cosine",
    spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
while not pc.describe_index(index_name).status["ready"]:
    time.sleep(1)
    
index = pc.Index(index_name)

In [72]:
# from langchain_community.embeddings.gigachat import GigaChatEmbeddings
# embeddings = GigaChatEmbeddings(model="EmbeddingsGigaR")
from langchain_openai.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

In [73]:
from langchain_pinecone import PineconeVectorStore
vector_store = PineconeVectorStore(index=index, embedding=embeddings)

## Загрузка

In [74]:
from uuid import uuid4
uuids = [str(uuid4()) for _ in range(len(doc_splits))]
vector_store.add_documents(documents=doc_splits, ids=uuids)
retriever = vector_store.as_retriever()
print("OK")

OK
