In [82]:
!pip install beautifulsoup4 html5lib

Collecting html5lib
  Downloading html5lib-1.1-py2.py3-none-any.whl (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.2/112.2 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: html5lib
Successfully installed html5lib-1.1


In [84]:
from typing import List, Any, Union
from urllib.parse import urljoin, urlparse, urlunparse
import requests
import html5lib
from bs4 import BeautifulSoup
import asyncio
import backoff
import aiohttp
from aiohttp import ClientSession, ClientError, ClientPayloadError, ServerDisconnectedError
import time
import csv
import numpy as np

In [54]:
def _build_metadata(soup: Any, url: str) -> dict:
    """Build metadata from BeautifulSoup output."""
    metadata = {"source": url}
    if title := soup.find("title"):
        metadata["title"] = title.get_text()
    if description := soup.find("meta", attrs={"name": "description"}):
        metadata["description"] = description.get("content", None)
    if html := soup.find("html"):
        metadata["language"] = html.get("lang", None)
    return metadata

def get_all_urls(url: str, domain: bool = True):
    # Use a session for connection pooling and reusing TCP connections
    with requests.Session() as session:
        try:
            response = session.get(url)
            response.raise_for_status()  # Will raise an HTTPError if the HTTP request returned an unsuccessful status code
        except requests.RequestException as e:
            print(f"Request failed: {e}")
            return []

        # Parse the HTML content
        soup = BeautifulSoup(response.content, 'html.parser')
        sub_links = {link.get('href') for link in soup.find_all('a') if link.get('href')}

        # If domain is True, filter out external links
        if domain:
            sub_links = {link for link in sub_links if not link.startswith(('http:', 'https:', 'ftp:', 'mailto:', 'tel:'))}

        # Prepare the full path for relative links
        main_links = set()
        for link in sub_links:
            full_path = urljoin(url, link)
            # Normalize the URL to remove any double slashes
            parsed_link = urlparse(full_path)
            normalized_link = urlunparse(parsed_link._replace(path=parsed_link.path.replace('//', '/')))
            main_links.add(normalized_link)

        # If you need to check if the URL does not lead to a 404 page, you can do it here
        # Note: This could be a lot of requests and is generally not recommended unless absolutely necessary.
        # main_links = {link for link in main_links if session.get(link).status_code != 404}

    return list(main_links)

class CustomWebBaseLoader(WebBaseLoader):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
        }
        self.semaphore = asyncio.Semaphore(2)  # Adjust the number as appropriate

        
    def parse_soup(self, soup: BeautifulSoup, url: str) -> Document:
        """Extracts text and metadata from soup and returns a Document."""
        
        # Initialize an empty string to hold text
        text = ""
        
        # First, check for accordion-like structure
        accordion = soup.select('.accordion')  # Replace with the actual selector for the accordion
        if accordion:
            # Process each accordion section
            for section in accordion:
                questions = section.select('.question')  # Replace with the actual selector for questions
                answers = section.select('.answer')  # Replace with the actual selector for answers
                for q, a in zip(questions, answers):
                    question_text = q.get_text(strip=True)
                    answer_text = a.get_text(strip=True)
                    text += f"Q: {question_text}\nA: {answer_text}\n\n"
        else:
            # If no accordion structure, fallback to regular text extraction
            paragraphs = soup.select('p, h1, h2, h3, h4, h5, h6, li')  # Grab common text-holding elements
            text = '\n'.join(para.get_text(strip=True) for para in paragraphs if para.get_text(strip=True))

        # Build metadata
        metadata = _build_metadata(soup, url)
        
        # Create and return a Document object
        return Document(page_content=text, metadata=metadata)
    
    def load(self) -> List[Document]:
        """Synchronously load text from the url(s) in web_path."""
        docs = []
        for path in self.web_paths:
            try:
                soup = self._scrape(path)
            except Exception as e:
                print(f"Couldn't scrape: {path}, error occured: {e}")
                
            doc = self.parse_soup(soup, path)
            docs.append(doc)
            time.sleep(1)  # Sleep for 1 second between requests
        return docs
    
    async def aload(self) -> List[Document]:
        """Asynchronously load text from the urls in web_path into Documents."""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in self.web_paths]
            soups = await asyncio.gather(*tasks)
            return [self.parse_soup(soup, url) for soup, url in zip(soups, self.web_paths) if soup]

    async def scrape_all(self, urls: List[str]) -> List[BeautifulSoup]:
        """Fetch all urls asynchronously, then return soups for all results."""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            responses = await asyncio.gather(*tasks, return_exceptions=True)
            return [BeautifulSoup(resp, self.default_parser if not url.endswith(".xml") else "xml")
                    for url, resp in zip(urls, responses) if isinstance(resp, str)]
        
#     @backoff.on_exception(backoff.expo,
#                           (aiohttp.ClientError, asyncio.TimeoutError, aiohttp.ServerDisconnectedError, aiohttp.ClientPayloadError),
#                           max_tries=8)
#     async def fetch(self, session: aiohttp.ClientSession, url: str) -> str:
#         """Fetch a single URL using the provided session."""
#         try:
#             async with session.get(url, headers=self.headers, timeout=60) as response:
#                 response.raise_for_status()
#                 print(response.status_code)
#                 print(reponse)
#                 if response.content_length is None or response.content.at_eof():
#                     raise aiohttp.ClientPayloadError("Response payload is not completed")
#                 return await response.read()
#         except (aiohttp.ClientError, asyncio.TimeoutError, aiohttp.ServerDisconnectedError) as e:
#             print(f"Request to {url} failed: {e}")
#             return ''  # Return an empty string on failure

    async def fetch(self, session: ClientSession, url: str) -> BeautifulSoup:
        """Fetch a single URL using the provided session with a delay."""
        try:
            async with self.semaphore:  # Acquire semaphore
                async with session.get(url, headers=self.headers) as response:
                    response.raise_for_status()
                    text = await response.text()
                    await asyncio.sleep(1)  # Sleep for 1 second between requests
                    return BeautifulSoup(text, self.default_parser if not url.endswith(".xml") else "xml")
        except (ClientError, TimeoutError, ServerDisconnectedError, ClientPayloadError) as e:
            print(f"Request to {url} failed: {e}")
            return None  # Return None on failure

In [94]:
# selenium

In [101]:
from functools import reduce

from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time

In [100]:
!pip install chromedriver_autoinstaller

Collecting chromedriver_autoinstaller
  Downloading chromedriver_autoinstaller-0.6.2-py3-none-any.whl (7.4 kB)
Installing collected packages: chromedriver_autoinstaller
Successfully installed chromedriver_autoinstaller-0.6.2


In [102]:
import chromedriver_autoinstaller
from selenium import webdriver

opt = webdriver.ChromeOptions()
opt.add_argument("--start-maximized")

chromedriver_autoinstaller.install()
driver = webdriver.Chrome(options=opt)
driver.get('https://stackoverflow.com/')



In [111]:
driver = webdriver.Chrome()
# driver = webdriver.Chrome(r"D:\PrFiles\chromedriver_win32\chromedriver.exe")
driver.set_window_size(1000, 900)

driver.get("https://alfabank.ru/help/faq/")

time.sleep(3)

elements = driver.find_elements(By.CSS_SELECTOR, 'a.button__component_1mgd7.button__xs_1mgd7.button__component_zsrtz.button__secondary_zsrtz.button__withRightAddons_1mgd7.a3SCh.c3SCh')
hrefs = [element.get_attribute('href') for element in elements]

In [112]:
href_texts = {}

for href in hrefs:
    driver.get(href)
    time.sleep(1)
    elements = driver.find_elements(By.CSS_SELECTOR, 'div.c3IDK.n3IDK')

    texts = []
    for element in elements:
        # нужно несколько раз прожать, потому что бывает не срабатывает клик
        while True:
            try:
                element.click()
                texts.append(f"Q: {element.text} - ")
                print(element.text)
                break
            except Exception as _:
                continue
        time.sleep(1)
        try:
            text_elements = driver.find_elements(By.CSS_SELECTOR, 'div.o3IDK p.a1jIK')
            if len(text_elements) == 1:
                texts.append(f"A: {text_elements[0].text}\n\n")
            else:
                elem_texts = [te.text for te in text_elements]
                _tmp = reduce(lambda x, y: x + y + '\n', elem_texts, '')
                texts.append( f"A: {_tmp}\n\n" )
        except Exception as e:
            print(e)
            text_elements = driver.find_elements(By.CSS_SELECTOR, 'div.b2Tco.k2Tco.n2Tco p.a1jIK.mG2mw.GG2mw.__G2mw.aiG2mw')


    href_texts[href] = texts

driver.quit()

print(href_texts)

Что такое кредитная карта
Где можно платить кредитной картой
Как получить карту
У меня есть карта, что дальше
Что такое льготный период
Сколько дней длится льготный период
Как узнать, что льготный период заканчивается
Когда обновится льготный период после внесения денег
Если льготный период закончился
Что такое обязательный платёж
Как бесплатно снимать наличные с кредитной карты
Как пополнить кредитную карту
Как переводить деньги с кредитной карты
Есть ли комиссия за обслуживание кредитной карты
Как получить справку по кредитной карте
Как закрыть кредитную карту
На каких условиях можно закрыть кредитную карту
Сколько дней закрывается кредитный договор
Будет ли списываться комиссия за обслуживание карты, пока расторгается кредитный договор?
Что будет с текущими счетами при расторжении договора по кредитной карте
Что такое дебетовая карта
Чем дебетовая карта отличается от кредитной
Для чего нужна дебетовая карта
Где можно платить дебетовой картой
Как заказать карту
Как активировать карту

In [115]:
with open("alphaFAQ.json", "w") as file:
    json.dump(href_texts, file)

In [None]:
def _build_metadata(soup: Any, url: str) -> dict:
    """Build metadata from BeautifulSoup output."""
    metadata = {"source": url}
    if title := soup.find("title"):
        metadata["title"] = title.get_text()
    if description := soup.find("meta", attrs={"name": "description"}):
        metadata["description"] = description.get("content", None)
    if html := soup.find("html"):
        metadata["language"] = html.get("lang", None)
    return metadata

Document(page_content=text, metadata=metadata)