In [30]:
import itertools
import re
from typing import Any, Callable, Generator, Iterable, List, Optional
from bs4 import BeautifulSoup, SoupStrainer
from lxml import etree

import logging
import os
import re

"""Abstract interface for document loader implementations."""
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, List, Optional, Sequence, Union
from bs4 import BeautifulSoup, Doctype, NavigableString, Tag
from typing import Generator

from __future__ import annotations
from abc import ABC, abstractmethod
from functools import partial
from langchain.load.serializable import Serializable
from langchain.pydantic_v1 import Field

from langchain.load.serializable import Serializable
from langchain.pydantic_v1 import Field
from typing import Any, Literal, Sequence
from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter
from langchain.document_loaders.blob_loaders import Blob

from langchain.utils.html import PREFIXES_TO_IGNORE_REGEX, SUFFIXES_TO_IGNORE_REGEX

In [2]:
logger = logging.getLogger(__name__)

WEAVIATE_URL = os.environ["WEAVIATE_URL"]
WEAVIATE_API_KEY = os.environ["WEAVIATE_API_KEY"]
RECORD_MANAGER_DB_URL = os.environ["RECORD_MANAGER_DB_URL"]

### Classe Document

In [3]:
class Document(Serializable):
    """Class for storing a piece of text and associated metadata."""

    page_content: str
    """String text."""
    metadata: dict = Field(default_factory=dict)
    """Arbitrary metadata about the page content (e.g., source, relationships to other
        documents, etc.).
    """
    type: Literal["Document"] = "Document"

    @classmethod
    def is_lc_serializable(cls) -> bool:
        """Return whether this class is serializable."""
        return True


class BaseDocumentTransformer(ABC):
    """Abstract base class for document transformation systems.

    A document transformation system takes a sequence of Documents and returns a
    sequence of transformed Documents.

    Example:
        .. code-block:: python

            class EmbeddingsRedundantFilter(BaseDocumentTransformer, BaseModel):
                embeddings: Embeddings
                similarity_fn: Callable = cosine_similarity
                similarity_threshold: float = 0.95

                class Config:
                    arbitrary_types_allowed = True

                def transform_documents(
                    self, documents: Sequence[Document], **kwargs: Any
                ) -> Sequence[Document]:
                    stateful_documents = get_stateful_documents(documents)
                    embedded_documents = _get_embeddings_from_stateful_docs(
                        self.embeddings, stateful_documents
                    )
                    included_idxs = _filter_similar_embeddings(
                        embedded_documents, self.similarity_fn, self.similarity_threshold
                    )
                    return [stateful_documents[i] for i in sorted(included_idxs)]

                async def atransform_documents(
                    self, documents: Sequence[Document], **kwargs: Any
                ) -> Sequence[Document]:
                    raise NotImplementedError

    """  # noqa: E501

    @abstractmethod
    def transform_documents(
        self, documents: Sequence[Document], **kwargs: Any
    ) -> Sequence[Document]:
        """Transform a list of documents.

        Args:
            documents: A sequence of Documents to be transformed.

        Returns:
            A list of transformed Documents.
        """
def atransform_documents(self, documents: Sequence[Document], **kwargs: Any) -> Sequence[Document]:
    """Transform a list of documents.

    Args:
        documents: A sequence of Documents to be transformed.

    Returns:
        A list of transformed Documents.
    """
    return self.transform_documents(documents, **kwargs)



"""    async def atransform_documents(
        self, documents: Sequence[Document], **kwargs: Any
    ) -> Sequence[Document]:
      Asynchronously transform a list of documents.

        Args:
            documents: A sequence of Documents to be transformed.

        Returns:
            A list of transformed Documents.
    
        return await asyncio.get_running_loop().run_in_executor(
            None, partial(self.transform_documents, **kwargs), documents
        )"""

'    async def atransform_documents(\n        self, documents: Sequence[Document], **kwargs: Any\n    ) -> Sequence[Document]:\n      Asynchronously transform a list of documents.\n\n        Args:\n            documents: A sequence of Documents to be transformed.\n\n        Returns:\n            A list of transformed Documents.\n    \n        return await asyncio.get_running_loop().run_in_executor(\n            None, partial(self.transform_documents, **kwargs), documents\n        )'

### Classe BaseLoader

In [32]:
class BaseLoader(ABC):
    """Interface for Document Loader.

    Implementations should implement the lazy-loading method using generators
    to avoid loading all Documents into memory at once.

    The `load` method will remain as is for backwards compatibility, but its
    implementation should be just `list(self.lazy_load())`.
    """

    # Sub-classes should implement this method
    # as return list(self.lazy_load()).
    # This method returns a List which is materialized in memory.
    @abstractmethod
    def load(self) -> List[Document]:
        """Load data into Document objects."""

    def load_and_split(
        self, text_splitter: Optional[TextSplitter] = None
    ) -> List[Document]:
        """Load Documents and split into chunks. Chunks are returned as Documents.

        Args:
            text_splitter: TextSplitter instance to use for splitting documents.
              Defaults to RecursiveCharacterTextSplitter.

        Returns:
            List of Documents.
        """
        if text_splitter is None:
            _text_splitter: TextSplitter = RecursiveCharacterTextSplitter()
            print(f'\nBaseLoader - _text_splitter:\n{_text_splitter}\n\n')
        else:
            _text_splitter = text_splitter
        docs = self.load()
        return _text_splitter.split_documents(docs)

    # Attention: This method will be upgraded into an abstractmethod once it's
    #            implemented in all the existing subclasses.
    def lazy_load(
        self,
    ) -> Iterator[Document]:
        """A lazy loader for Documents."""
        raise NotImplementedError(
            f"{self.__class__.__name__} does not implement lazy_load()"
        )


class BaseBlobParser(ABC):
    """Abstract interface for blob parsers.

    A blob parser provides a way to parse raw data stored in a blob into one
    or more documents.

    The parser can be composed with blob loaders, making it easy to reuse
    a parser independent of how the blob was originally loaded.
    """

    @abstractmethod
    def lazy_parse(self, blob: Blob) -> Iterator[Document]:
        """Lazy parsing interface.

        Subclasses are required to implement this method.

        Args:
            blob: Blob instance

        Returns:
            Generator of documents
        """

    def parse(self, blob: Blob) -> List[Document]:
        """Eagerly parse the blob into a document or documents.

        This is a convenience method for interactive development environment.

        Production applications should favor the lazy_parse method instead.

        Subclasses should generally not over-ride this parse method.

        Args:
            blob: Blob instance

        Returns:
            List of documents
        """
        return list(self.lazy_parse(blob))

### Classe WebBaseLoader

In [44]:
# Classe WebBaseLoader sem o Async

#Replace async def with def for methods that are asynchronous.
#Replace await with synchronous calls for things like HTTP requests.
# Remove asyncio specific constructs like asyncio.Semaphore, asyncio.gather, etc.


import requests
from typing import List, Any


default_header_template = {
    "User-Agent": "",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"
    ";q=0.8",
    "Accept-Language": "en-US,en;q=0.5",
    "Referer": "https://www.google.com/",
    "DNT": "1",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
}


def _build_metadata(soup: Any, url: str) -> dict:
    """Build metadata from BeautifulSoup output."""
    metadata = {"source": url}
    if title := soup.find("title"):
        metadata["title"] = title.get_text()
        print(f'\nWebBaseLoader - title:\n{metadata["title"]}\n\n')
    if description := soup.find("meta", attrs={"name": "description"}):
        metadata["description"] = description.get("content", "No description found.")
        print(f'\nWebBaseLoader - description:\n{metadata["description"]}\n\n')
    if html := soup.find("html"):
        metadata["language"] = html.get("lang", "No language found.")
    return metadata



class WebBaseLoader(BaseLoader):
    """Load HTML pages using `urllib` and parse them with `BeautifulSoup'."""

    def __init__(
        self,
        web_path: Union[str, Sequence[str]] = "",
        header_template: Optional[dict] = None,
        verify_ssl: bool = True,
        proxies: Optional[dict] = None,
        continue_on_failure: bool = False,
        autoset_encoding: bool = True,
        encoding: Optional[str] = None,
        web_paths: Sequence[str] = (),
        requests_per_second: int = 2,
        default_parser: str = "html.parser",
        requests_kwargs: Optional[Dict[str, Any]] = None,
        raise_for_status: bool = False,
        bs_get_text_kwargs: Optional[Dict[str, Any]] = None,
        bs_kwargs: Optional[Dict[str, Any]] = None,
        session: Any = None,
    ) -> None:
        """Initialize loader.

        Args:
            web_paths: Web paths to load from.
            requests_per_second: Max number of concurrent requests to make.
            default_parser: Default parser to use for BeautifulSoup.
            requests_kwargs: kwargs for requests
            raise_for_status: Raise an exception if http status code denotes an error.
            bs_get_text_kwargs: kwargs for beatifulsoup4 get_text
            bs_kwargs: kwargs for beatifulsoup4 web page parsing
        """
        # web_path kept for backwards-compatibility.
        if web_path and web_paths:
            raise ValueError(
                "Received web_path and web_paths. Only one can be specified. "
                "web_path is deprecated, web_paths should be used."
            )
        if web_paths:
            self.web_paths = list(web_paths)
        elif isinstance(web_path, str):
            self.web_paths = [web_path]
        elif isinstance(web_path, Sequence):
            self.web_paths = list(web_path)
        else:
            raise TypeError(
                f"web_path must be str or Sequence[str] got ({type(web_path)}) or"
                f" web_paths must be Sequence[str] got ({type(web_paths)})"
            )
        self.requests_per_second = requests_per_second
        self.default_parser = default_parser
        self.requests_kwargs = requests_kwargs or {}
        self.raise_for_status = raise_for_status
        self.bs_get_text_kwargs = bs_get_text_kwargs or {}
        self.bs_kwargs = bs_kwargs or {}
        if session:
            self.session = session
        else:
            session = requests.Session()
            header_template = header_template or default_header_template.copy()
            if not header_template.get("User-Agent"):
                try:
                    from fake_useragent import UserAgent

                    header_template["User-Agent"] = UserAgent().random
                except ImportError:
                    logger.info(
                        "fake_useragent not found, using default user agent."
                        "To get a realistic header for requests, "
                        "`pip install fake_useragent`."
                    )
            session.headers = dict(header_template)
            session.verify = verify_ssl
            if proxies:
                session.proxies.update(proxies)
            self.session = session
        self.continue_on_failure = continue_on_failure
        self.autoset_encoding = autoset_encoding
        self.encoding = encoding

    @property
    def web_path(self) -> str:
        if len(self.web_paths) > 1:
            raise ValueError("Multiple webpaths found.")
        return self.web_paths[0]

    def _fetch(self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5) -> str:
        for i in range(retries):
            try:
                response = requests.get(url, headers=self.session.headers, verify=self.session.verify)
                # print(f'\nWebBaseLoader - responde:\n{response}\n\n')
                if response.status_code == 200:
                    return response.text
            except requests.exceptions.RequestException as e:
                if i == retries - 1:
                    raise
                else:
                    logger.warning(
                        f"Error fetching {url} with attempt "
                        f"{i + 1}/{retries}: {e}. Retrying..."
                    )
                    time.sleep(cooldown * backoff**i)
        raise ValueError("retry count exceeded")


    def fetch_all(self, urls: List[str], max_pages: int = None) -> Any:
        results = []
        url_count = 0  # Contador para manter o controle do número de URLs processadas
        
        for url in urls:
            if max_pages is not None and url_count >= max_pages:
                break  # Sai do loop se o número máximo de páginas for alcançado
                
            try:
                page_content = self._fetch(url)
                print(f'\nWebBaseLoader - page_content:\n{page_content}\n\n')
                results.append(page_content)
                url_count += 1  # Incrementa o contador
            except Exception as e:
                if self.continue_on_failure:
                    logger.warning(f"Error fetching {url}, skipping due to continue_on_failure=True")
                else:
                    raise e
                    
        return results




    @staticmethod
    def _check_parser(parser: str) -> None:
        """Check that parser is valid for bs4."""
        valid_parsers = ["html.parser", "lxml", "xml", "lxml-xml", "html5lib"]
        if parser not in valid_parsers:
            raise ValueError(
                "`parser` must be one of " + ", ".join(valid_parsers) + "."
            )    

    def _scrape(
        self,
        url: str,
        parser: Union[str, None] = None,
        bs_kwargs: Optional[dict] = None,
    ) -> Any:
        from bs4 import BeautifulSoup

        if parser is None:
            if url.endswith(".xml"):
                parser = "xml"
            else:
                parser = self.default_parser

        self._check_parser(parser)

        html_doc = self.session.get(url, **self.requests_kwargs)
        if self.raise_for_status:
            html_doc.raise_for_status()

        if self.encoding is not None:
            html_doc.encoding = self.encoding
        elif self.autoset_encoding:
            html_doc.encoding = html_doc.apparent_encoding
            
        print(f'antes do return do _scrape: {BeautifulSoup(html_doc.text, parser, **(bs_kwargs or {}))}') ##### Bastante conteudo
        return BeautifulSoup(html_doc.text, parser, **(bs_kwargs or {}))
    

    def scrape(self, parser: Union[str, None] = None) -> Any:
        """Scrape data from webpage and return it in BeautifulSoup format."""

        if parser is None:
            parser = self.default_parser

        return self._scrape(self.web_path, parser=parser, bs_kwargs=self.bs_kwargs)




    def scrape_all(self, urls: List[str], parser: Union[str, None] = None) -> List[Any]:
        """Fetch all urls, then return soups for all results."""
        from bs4 import BeautifulSoup

        results = self.fetch_all(urls, max_pages=2)
        final_results = []
        for i, result in enumerate(results):
            url = urls[i]
            if parser is None:
                if url.endswith(".xml"):
                    parser = "xml"
                else:
                    parser = self.default_parser
                self._check_parser(parser)
            final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs))
        
        print(f'\nWebBaseLoader - final_results:\n{final_results}\n\n')
        return final_results





    def lazy_load(self) -> Iterator[Document]:
        """Lazy load text from the url(s) in web_path."""
        for path in self.web_paths:
            soup = self._scrape(path)
            text = soup.get_text(**self.bs_get_text_kwargs)
            metadata = _build_metadata(soup, path)
            yield Document(page_content=text, metadata=metadata)

    def load(self) -> List[Document]:
        """Load text from the url(s) in web_path."""
        return list(self.lazy_load())


    def aload(self) -> List[Document]:
        """Load text from the urls in web_path async into Documents."""
        results = self.scrape_all(self.web_paths)
        docs = []
        for path, soup in zip(self.web_paths, results):
            text = soup.get_text(**self.bs_get_text_kwargs)
            metadata = _build_metadata(soup, path)
            docs.append(Document(page_content=text, metadata=metadata))
            
        print(f'\nWebBaseLoader - docs:\n{docs}\n\n')
        return docs

### Classe SitemapLoader

In [45]:
class SitemapLoader(WebBaseLoader):
    """Load a sitemap and its URLs."""

    def __init__(
        self,
        web_path: str,
        filter_urls: Optional[List[str]] = None,
        parsing_function: Optional[Callable] = None,
        blocksize: Optional[int] = None,
        blocknum: int = 0,
        meta_function: Optional[Callable] = None,
        is_local: bool = False,
        continue_on_failure: bool = False,
        **kwargs: Any,
    ):
        """Initialize with webpage path and optional filter URLs.

        Args:
            web_path: url of the sitemap. can also be a local path
            filter_urls: list of strings or regexes that will be applied to filter the
                urls that are parsed and loaded
            parsing_function: Function to parse bs4.Soup output
            blocksize: number of sitemap locations per block
            blocknum: the number of the block that should be loaded - zero indexed.
                Default: 0
            meta_function: Function to parse bs4.Soup output for metadata
                remember when setting this method to also copy metadata["loc"]
                to metadata["source"] if you are using this field
            is_local: whether the sitemap is a local file. Default: False
            continue_on_failure: whether to continue loading the sitemap if an error
                occurs loading a url, emitting a warning instead of raising an
                exception. Setting this to True makes the loader more robust, but also
                may result in missing data. Default: False
        """

        if blocksize is not None and blocksize < 1:
            raise ValueError("Sitemap blocksize should be at least 1")

        if blocknum < 0:
            raise ValueError("Sitemap blocknum can not be lower then 0")

        try:
            import lxml  # noqa:F401
        except ImportError:
            raise ImportError(
                "lxml package not found, please install it with " "`pip install lxml`"
            )

        super().__init__(web_paths=[web_path], **kwargs)

        self.filter_urls = filter_urls
        self.parsing_function = parsing_function or _default_parsing_function
        self.meta_function = meta_function or _default_meta_function
        self.blocksize = blocksize
        self.blocknum = blocknum
        self.is_local = is_local
        self.continue_on_failure = continue_on_failure

    def parse_sitemap(self, soup: Any) -> List[dict]:
        """Parse sitemap xml and load into a list of dicts.

        Args:
            soup: BeautifulSoup object.

        Returns:
            List of dicts.
        """
        els = []
        for url in soup.find_all("url"):
            loc = url.find("loc")
            if not loc:
                continue

            # Strip leading and trailing whitespace and newlines
            loc_text = loc.text.strip()

            if self.filter_urls and not any(
                re.match(r, loc_text) for r in self.filter_urls
            ):
                continue

            els.append(
                {
                    tag: prop.text
                    for tag in ["loc", "lastmod", "changefreq", "priority"]
                    if (prop := url.find(tag))
                }
            )

        for sitemap in soup.find_all("sitemap"):
            loc = sitemap.find("loc")
            print(f'\n- Dentro de Sitemap - loc:\n{loc}\n\n')
            if not loc:
                continue
            soup_child = self.scrape_all([loc.text], "xml")[0]

            els.extend(self.parse_sitemap(soup_child))
        return els

    def load(self) -> List[Document]:
        """Load sitemap."""
        if self.is_local:
            try:
                import bs4
            except ImportError:
                raise ImportError(
                    "beautifulsoup4 package not found, please install it"
                    " with `pip install beautifulsoup4`"
                )
            fp = open(self.web_path)
            soup = bs4.BeautifulSoup(fp, "xml")
        else:
            soup = self._scrape(self.web_path, parser="xml")

        els = self.parse_sitemap(soup)

        if self.blocksize is not None:
            elblocks = list(_batch_block(els, self.blocksize))
            blockcount = len(elblocks)
            if blockcount - 1 < self.blocknum:
                raise ValueError(
                    "Selected sitemap does not contain enough blocks for given blocknum"
                )
            else:
                els = elblocks[self.blocknum]

        results = self.scrape_all([el["loc"].strip() for el in els if "loc" in el])

        return [
            Document(
                page_content=self.parsing_function(results[i]),
                metadata=self.meta_function(els[i], results[i]),
            )
            for i in range(len(results))
        ]

In [16]:
def _default_parsing_function(content: Any) -> str:
    return str(content.get_text())


def _default_meta_function(meta: dict, _content: Any) -> dict:
    return {"source": meta["loc"], **meta}


def _batch_block(iterable: Iterable, size: int) -> Generator[List[dict], None, None]:
    it = iter(iterable)
    while item := list(itertools.islice(it, size)):
        yield item

<h3><mark> langchain_docs_extractor </mark></h3>

In [None]:
def langchain_docs_extractor(soup: BeautifulSoup) -> str:
    # Remove all the tags that are not meaningful for the extraction.
    SCAPE_TAGS = ["nav", "footer", "aside", "script", "style"]
    [tag.decompose() for tag in soup.find_all(SCAPE_TAGS)]

    def get_text(tag: Tag) -> Generator[str, None, None]:
        for child in tag.children:
            if isinstance(child, Doctype):
                continue

            if isinstance(child, NavigableString):
                yield child
            elif isinstance(child, Tag):
                if child.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
                    yield f"{'#' * int(child.name[1:])} {child.get_text()}\n\n"
                elif child.name == "a":
                    yield f"[{child.get_text(strip=False)}]({child.get('href')})"
                elif child.name == "img":
                    yield f"![{child.get('alt', '')}]({child.get('src')})"
                elif child.name in ["strong", "b"]:
                    yield f"**{child.get_text(strip=False)}**"
                elif child.name in ["em", "i"]:
                    yield f"_{child.get_text(strip=False)}_"
                elif child.name == "br":
                    yield "\n"
                elif child.name == "code":
                    parent = child.find_parent()
                    if parent is not None and parent.name == "pre":
                        classes = parent.attrs.get("class", "")

                        language = next(
                            filter(lambda x: re.match(r"language-\w+", x), classes),
                            None,
                        )
                        if language is None:
                            language = ""
                        else:
                            language = language.split("-")[1]

                        lines: list[str] = []
                        for span in child.find_all("span", class_="token-line"):
                            print(f'\nDentro de doc_extractor - span:\n{span}\n\n')
                            line_content = "".join(
                                token.get_text() for token in span.find_all("span")
                            )
                            lines.append(line_content)

                        code_content = "\n".join(lines)
                        yield f"```{language}\n{code_content}\n```\n\n"
                    else:
                        yield f"`{child.get_text(strip=False)}`"

                elif child.name == "p":
                    yield from get_text(child)
                    yield "\n\n"
                elif child.name == "ul":
                    for li in child.find_all("li", recursive=False):
                        yield "- "
                        yield from get_text(li)
                        yield "\n\n"
                elif child.name == "ol":
                    for i, li in enumerate(child.find_all("li", recursive=False)):
                        yield f"{i + 1}. "
                        yield from get_text(li)
                        yield "\n\n"
                elif child.name == "div" and "tabs-container" in child.attrs.get(
                    "class", [""]
                ):
                    tabs = child.find_all("li", {"role": "tab"})
                    tab_panels = child.find_all("div", {"role": "tabpanel"})
                    for tab, tab_panel in zip(tabs, tab_panels):
                        tab_name = tab.get_text(strip=True)
                        yield f"{tab_name}\n"
                        yield from get_text(tab_panel)
                elif child.name == "table":
                    thead = child.find("thead")
                    header_exists = isinstance(thead, Tag)
                    if header_exists:
                        headers = thead.find_all("th")
                        if headers:
                            yield "| "
                            yield " | ".join(header.get_text() for header in headers)
                            yield " |\n"
                            yield "| "
                            yield " | ".join("----" for _ in headers)
                            yield " |\n"

                    tbody = child.find("tbody")
                    tbody_exists = isinstance(tbody, Tag)
                    if tbody_exists:
                        for row in tbody.find_all("tr"):
                            yield "| "
                            yield " | ".join(
                                cell.get_text(strip=True) for cell in row.find_all("td")
                                
                            )
                            yield " |\n"

                    yield "\n\n"
                elif child.name in ["button"]:
                    continue
                else:
                    yield from get_text(child)

    joined = "".join(get_text(soup))
    print("Dentro da funçao langchain_docs_extractor:")
    
    print(re.sub(r"\n\n+", "\n\n", joined).strip())
    print()
    return re.sub(r"\n\n+", "\n\n", joined).strip()

In [63]:
logger = logging.getLogger(__name__)


def _metadata_extractor(raw_html: str, url: str) -> dict:
    """Extract metadata from raw html using BeautifulSoup."""
    metadata = {"source": url}

    try:
        from bs4 import BeautifulSoup
    except ImportError:
        logger.warning(
            "The bs4 package is required for default metadata extraction. "
            "Please install it with `pip install bs4`."
        )
        return metadata
    soup = BeautifulSoup(raw_html, "html.parser")
    if title := soup.find("title"):
        metadata["title"] = title.get_text()
    if description := soup.find("meta", attrs={"name": "description"}):
        metadata["description"] = description.get("content", None)
    if html := soup.find("html"):
        metadata["language"] = html.get("lang", None)
    return metadata

### <mark>Classe RecursiveURLLoader</mark>

In [62]:
from langchain.utils.html import extract_sub_links

class RecursiveUrlLoader(BaseLoader):
    """Load all child links from a URL page."""

    def __init__(
        self,
        url: str,
        max_depth: Optional[int] = 2,
        extractor: Optional[Callable[[str], str]] = None,
        metadata_extractor: Optional[Callable[[str, str], str]] = None,
        exclude_dirs: Optional[Sequence[str]] = (),
        timeout: Optional[int] = 10,
        prevent_outside: Optional[bool] = True,
        link_regex: Union[str, re.Pattern, None] = None,
        headers: Optional[dict] = None,
        check_response_status: bool = False,
    ) -> None:

        self.url = url
        self.max_depth = max_depth if max_depth is not None else 2
        self.extractor = extractor if extractor is not None else lambda x: x
        self.metadata_extractor = (
            metadata_extractor
            if metadata_extractor is not None
            else _metadata_extractor
        )
        self.exclude_dirs = exclude_dirs if exclude_dirs is not None else ()
        self.timeout = timeout
        self.prevent_outside = prevent_outside if prevent_outside is not None else True
        self.link_regex = link_regex
        self.headers = headers
        self.check_response_status = check_response_status

    def _get_child_links_recursive(
        self, url: str, visited: Set[str], *, depth: int = 0
    ) -> Iterator[Document]:
          
        if depth >= self.max_depth:
            return

        # Get all links that can be accessed from the current URL
        visited.add(url)
        try:
            response = requests.get(url, timeout=self.timeout, headers=self.headers)
            if self.check_response_status and 400 <= response.status_code <= 599:
                raise ValueError(f"Received HTTP status {response.status_code}")
        except Exception as e:
            logger.warning(
                f"Unable to load from {url}. Received error {e} of type "
                f"{e.__class__.__name__}"
            )
            return
        content = self.extractor(response.text)
        if content:
            yield Document(
                page_content=content,
                metadata=self.metadata_extractor(response.text, url),
            )

        # Store the visited links and recursively visit the children
        sub_links = extract_sub_links(
            response.text,
            url,
            base_url=self.url,
            pattern=self.link_regex,
            prevent_outside=self.prevent_outside,
            exclude_prefixes=self.exclude_dirs,
        )
        max_link = 5
        url_link_count = 0
        for link in sub_links:
            print(f'\nurl_link_count: {url_link_count}\n\n')
            #if url_link_count >= max_link:
                #break  # Sai do loop
            # Check all unvisited links
            if link not in visited:
                yield from self._get_child_links_recursive(
                    link, visited, depth=depth + 1
                )
            url_link_count += 1    

    def lazy_load(self) -> Iterator[Document]:
        visited: Set[str] = set()
        return self._get_child_links_recursive(self.url, visited)

    def load(self) -> List[Document]:
        """Load web pages."""
        return list(self.lazy_load())


### <mark> Ingest </mark>

In [71]:
def metadata_extractor(meta: dict, soup: BeautifulSoup) -> dict:
    title = soup.find("title")
    print(f'\nIngest - metadata_extractor - title:\n{title}\n\n')
    description = soup.find("meta", attrs={"name": "description"})
    print(f'\nIngest - metadata_extractor - description:\n{description}\n\n')
    html = soup.find("html")
    return {
        "source": meta["loc"],
        "title": title.get_text() if title else "",
        "description": description.get("content", "") if description else "",
        "language": html.get("lang", "") if html else "",
        **meta,
    }



# https://python.langchain.com/sitemap.xml   / https://python.langchain.com/

def load_langchain_docs():
    return SitemapLoader(
        "https://transparencia.mesquita.rj.gov.br/",
        filter_urls=["https://transparencia.mesquita.rj.gov.br/"],
        parsing_function=langchain_docs_extractor,
        default_parser="lxml",
        bs_kwargs={
            "parse_only": SoupStrainer(
                name=("article", "title", "html", "lang", "content")
            ),
        },
        meta_function=metadata_extractor,
    ).load()

def simple_extractor(html: str) -> str:
    soup = BeautifulSoup(html, "lxml")
    return re.sub(r"\n\n+", "\n\n", soup.text).strip()

In [65]:
def load_api_docs():
    return RecursiveUrlLoader(
        url="https://api.python.langchain.com/en/latest/",
        max_depth=8,
        extractor=simple_extractor,
        prevent_outside=True,
        timeout=600,
        # Drop trailing / to avoid duplicate pages.
        link_regex=(
            f"href=[\"']{PREFIXES_TO_IGNORE_REGEX}((?:{SUFFIXES_TO_IGNORE_REGEX}.)*?)"
            r"(?:[\#'\"]|\/[\#'\"])"
        ),
        check_response_status=True,
        exclude_dirs=(
            "https://api.python.langchain.com/en/latest/_sources",
            "https://api.python.langchain.com/en/latest/_modules",
        ),
    ).load()

In [67]:
def ingest_docs_jupyter():
    docs_from_documentation = load_langchain_docs()
    logger.info(f"Loaded {len(docs_from_documentation)} docs from documentation")
    # docs_from_api = load_api_docs()
    # logger.info(f"Loaded {len(docs_from_api)} docs from API")

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200)
    docs_transformed = text_splitter.split_documents(
        docs_from_documentation
    )
    
    #docs_from_documentation + docs_from_api

    # We try to return 'source' and 'title' metadata when querying vector store and
    # Weaviate will error at query time if one of the attributes is missing from a
    # retrieved document.
    num_docs = 0
    max_doc = 10
    for doc in docs_transformed:
        if num_docs >= max_doc:
            break
        print(f'\ndoc.metadata["source"]: {doc.metadata["source"]}')
        print(f'\ndoc.metadata["title"]: {doc.metadata["title"]}')
        if "source" not in doc.metadata:
            doc.metadata["source"] = ""
        if "title" not in doc.metadata:
            doc.metadata["title"] = ""
        num_docs += 1
    # client = weaviate.Client(
    #     url=WEAVIATE_URL,
    #     auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY),
    # )
    # embedding = OpenAIEmbeddings(
    #     chunk_size=200,
    # )  # rate limit
    # vectorstore = Weaviate(
    #     client=client,
    #     index_name=WEAVIATE_DOCS_INDEX_NAME,
    #     text_key="text",
    #     embedding=embedding,
    #     by_text=False,
    #     attributes=["source", "title"],
    # )

    # record_manager = SQLRecordManager(
    #     f"weaviate/{WEAVIATE_DOCS_INDEX_NAME}", db_url=RECORD_MANAGER_DB_URL
    # )
    # record_manager.create_schema()

    # indexing_stats = index(
    #     docs_transformed,
    #     record_manager,
    #     vectorstore,
    #     cleanup="full",
    #     source_id_key="source",
    # )

    # logger.info("Indexing stats: ", indexing_stats)
    # logger.info(
    #     "LangChain now has this many vectors: ",
    #     client.query.aggregate(WEAVIATE_DOCS_INDEX_NAME).with_meta_count().do(),
    # )

In [72]:
ingest_docs_jupyter()

antes do return do _scrape: <?xml version="1.0" encoding="utf-8"?>


WebBaseLoader - final_results:
[]


