In [4]:
# pip install selenium webdriver-manager beautifulsoup4 lxml

from __future__ import annotations
import csv, time
from dataclasses import dataclass, asdict
from typing import List, Optional

from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

# ==========================
# ВХОДНЫЕ ДАННЫЕ
# ==========================
SEARCH_PARAMS = {
    "inn": "6506007604",   # ИНН участника (или None)
    "org_name": "",      # Название организации (или None)

    "court_name": "",    # Название суда (или None)
    
    "judge": "",         # Судья (или None)

    

    "case_number": "",   # Номер дела (или None)
}

START_URL = "https://kad.arbitr.ru/"
OUTPUT_CSV = "kad_results.csv"


# ==========================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ==========================
def build_driver(headless: bool = False) -> webdriver.Chrome:
    """Запускаем Chrome с анти-детект настройками."""
    opts = Options()
    if headless:
        opts.add_argument("--headless=new")
    opts.add_argument("--window-size=1600,1000")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--lang=ru,en")
    opts.add_experimental_option("excludeSwitches", ["enable-automation"])
    opts.add_experimental_option("useAutomationExtension", False)
    opts.add_argument("--disable-blink-features=AutomationControlled")
    opts.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
    )
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=opts)
    try:
        driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        })
    except Exception:
        pass
    return driver


def wait_clickable(driver, by, sel, t=15):
    return WebDriverWait(driver, t).until(EC.element_to_be_clickable((by, sel)))


def try_click(driver, selectors: List[tuple], timeout_each=3) -> bool:
    """Кликает по первому найденному селектору."""
    for by, sel in selectors:
        try:
            el = WebDriverWait(driver, timeout_each).until(EC.element_to_be_clickable((by, sel)))
            driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
            ActionChains(driver).move_to_element(el).pause(0.05).click(el).perform()
            return True
        except Exception:
            continue
    return False


def safe_type(driver, selectors: List[tuple], text: str, press_enter: bool = False, timeout=12) -> bool:
    """Аккуратно вводит текст в найденное поле."""
    for by, sel in selectors:
        try:
            inp = wait_clickable(driver, by, sel, t=timeout)
            driver.execute_script("arguments[0].scrollIntoView({block:'center'});", inp)
            ActionChains(driver).move_to_element(inp).pause(0.05).click(inp).perform()
            inp.clear()
            inp.send_keys(text)
            if press_enter:
                inp.send_keys(Keys.ENTER)
            driver.execute_script("""
                const el = arguments[0];
                const fire = (t)=>el.dispatchEvent(new Event(t, {bubbles:true}));
                fire('input'); fire('change');
            """, inp)
            inp.send_keys(Keys.TAB)
            return True
        except Exception:
            continue
    return False


def close_possible_modals(driver):
    """Закрывает возможные всплывашки."""
    try_click(driver, [
        (By.CSS_SELECTOR, ".b-promo_notification-popup-close.js-promo_notification-popup-close"),
    ], timeout_each=2)
    try_click(driver, [
        (By.XPATH, "//button[contains(., 'Принять')]"),
        (By.XPATH, "//button[contains(., 'Ок') or contains(., 'OK')]"),
    ], timeout_each=2)


def ensure_cases_tab(driver):
    """Если есть вкладки, активирует 'Дела'."""
    for by, sel in [
        (By.XPATH, "//a[contains(@class,'tab') and contains(., 'Дела')]"),
        (By.XPATH, "//*[@role='tab' and contains(.,'Дела')]"),
        (By.XPATH, "//*[self::a or self::button][contains(.,'Дела')]"),
    ]:
        try:
            tab = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((by, sel)))
            driver.execute_script("arguments[0].scrollIntoView({block:'center'});", tab)
            ActionChains(driver).move_to_element(tab).pause(0.05).click(tab).perform()
            break
        except Exception:
            pass


# ==========================
# ВВОД ПОЛЕЙ ФИЛЬТРА
# ==========================
def fill_filters(driver, params: dict):
    """Заполняем форму поиска."""
    if params.get("case_number"):
        safe_type(driver, [
            (By.CSS_SELECTOR, "input[placeholder*='Номер дела']"),
        ], params["case_number"])

    participant = params.get("inn") or params.get("org_name")
    if participant:
        safe_type(driver, [
            (By.CSS_SELECTOR, "textarea[placeholder*='название, ИНН или ОГРН']"),
        ], participant, press_enter=True)

    if params.get("court_name"):
        safe_type(driver, [
            (By.CSS_SELECTOR, "input[placeholder*='название суда']"),
        ], params["court_name"], press_enter=True)

    if params.get("judge"):
        safe_type(driver, [
            (By.CSS_SELECTOR, "input[placeholder*='фамилия судьи']"),
        ], params["judge"], press_enter=True)


# ==========================
# ПОИСК И ОЖИДАНИЕ #table
# ==========================
def click_search_and_wait_table(driver, timeout=60):
    """
    Нажимает кнопку 'Найти' и ждёт появления/обновления элемента с id='table'.
    """
    # сохраняем старое содержимое таблицы
    try:
        old_html = driver.find_element(By.ID, "table").get_attribute("innerHTML")
    except Exception:
        old_html = ""

    # ищем и кликаем кнопку "Найти"
    try:
        button = WebDriverWait(driver, 15).until(EC.element_to_be_clickable((
            By.XPATH,
            "//div[contains(@class,'b-button-container') and contains(@class,'pushed')]"
            "//*[(@alt='Найти' or normalize-space(text())='Найти')]"
        )))
        driver.execute_script("arguments[0].scrollIntoView({block:'center'});", button)
        ActionChains(driver).move_to_element(button).pause(0.05).click(button).perform()
    except Exception:
        # запасные варианты
        try_click(driver, [
            (By.XPATH, "//button[@alt='Найти' or normalize-space(.)='Найти']"),
            (By.XPATH, "//span[normalize-space(.)='Найти']/ancestor::button"),
            (By.CSS_SELECTOR, "button[type='submit']"),
        ], timeout_each=5)

    # ждём, пока изменится или появится элемент #table
    WebDriverWait(driver, timeout).until(
        lambda d: d.find_element(By.ID, "table").get_attribute("innerHTML") != old_html
    )
    WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "table"))
    )
    time.sleep(1)


# ==========================
# ПАРСИНГ #table
# ==========================
@dataclass
class TableRow:
    cols: List[str]

def parse_table(driver) -> List[TableRow]:
    """Парсит содержимое элемента #table (thead + tbody) и возвращает список строк."""
    html = driver.page_source
    soup = BeautifulSoup(html, "lxml")

    table = soup.select_one("#table")
    if not table:
        print("⚠️ Элемент #table не найден.")
        return []

    headers = [th.get_text(strip=True) for th in table.select("thead th")]
    rows_data = []
    for tr in table.select("tbody tr"):
        cells = [td.get_text(" ", strip=True) for td in tr.select("th, td")]
        rows_data.append(TableRow(cells))

    # если нет заголовков — создадим col_1, col_2, ...
    if not headers and rows_data:
        headers = [f"col_{i+1}" for i in range(len(rows_data[0].cols))]

    # сохраняем CSV
    with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(headers)
        for row in rows_data:
            writer.writerow(row.cols)

    print(f"✅ Таблица сохранена: {OUTPUT_CSV} ({len(rows_data)} строк)")
    return rows_data


# ==========================
# MAIN
# ==========================
def main():
    driver = build_driver(headless=False)
    try:
        driver.get(START_URL)
        close_possible_modals(driver)
        ensure_cases_tab(driver)

        WebDriverWait(driver, 20).until(
            EC.presence_of_element_located((By.XPATH, "//*[contains(., 'Номер дела') or contains(., 'Участник')]"))
        )

        fill_filters(driver, SEARCH_PARAMS)
        click_search_and_wait_table(driver, timeout=60)
        parse_table(driver)

    finally:
        driver.quit()


if __name__ == "__main__":
    main()



✅ Таблица сохранена: kad_results.csv (5 строк)


In [65]:
# pip install selenium webdriver-manager beautifulsoup4 lxml

from __future__ import annotations
import csv, time, re
from dataclasses import dataclass, asdict
from typing import List
from urllib.parse import quote_plus, urljoin

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager


# ======================
# НАСТРОЙКИ
# ======================
QUERY = "сбер"
BASE = "https://tass.ru"
START_URL = f"{BASE}/search?search={quote_plus(QUERY)}"
OUTPUT_CSV = "tass_links.csv"
HEADLESS = False
TOTAL_TIME_LIMIT_SEC = 200   # общий лимит
SCROLL_TIME_LIMIT_SEC = 8    # сколько времени даём на сбор ссылок со страницы поиска
MAX_NEWS = 200               # максимум новостей для обхода


# ======================
# ДРАЙВЕР
# ======================
def build_driver(headless: bool = False) -> webdriver.Chrome:
    opts = Options()
    if headless:
        opts.add_argument("--headless=new")
    opts.add_argument("--window-size=1600,1000")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--lang=ru,en")
    opts.add_experimental_option("excludeSwitches", ["enable-automation"])
    opts.add_experimental_option("useAutomationExtension", False)
    opts.add_argument("--disable-blink-features=AutomationControlled")
    opts.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
    )
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=opts)
    try:
        driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        })
    except Exception:
        pass
    return driver


def accept_cookies_if_any(driver):
    for by, sel in [
        (By.XPATH, "//button[contains(., 'Принять')]"),
        (By.XPATH, "//button[contains(., 'Согласен') or contains(., 'Хорошо') or contains(., 'OK') or contains(., 'Ок')]"),
        (By.CSS_SELECTOR, "button.cookie, .cookie button, .cookies-accept, .cookies__btn-accept"),
    ]:
        try:
            WebDriverWait(driver, 2).until(EC.element_to_be_clickable((by, sel))).click()
            break
        except Exception:
            continue


# ======================
# ДАННЫЕ
# ======================
@dataclass
class Row:
    title: str
    lead_50: str
    url: str


def first_words(text: str, n: int = 50) -> str:
    words = re.findall(r"\S+", text or "")
    return " ".join(words[:n])


# ======================
# СБОР ССЫЛОК
# ======================
def collect_news_urls(driver, time_limit_sec: int, max_items: int) -> List[str]:
    """
    Собирает абсолютные URL новостей с выдачи по селектору ссылок EnhancedLink_box__*
    Делает автоскролл внизу, пока растёт число ссылок или не вышло время.
    """
    start = time.time()
    seen = set()

    def grab() -> List[str]:
        js = r"""
        const as = Array.from(document.querySelectorAll("a[class*='EnhancedLink_box__'][href]"));
        return as.map(a => a.href || a.getAttribute('href'));
        """
        urls = driver.execute_script(js) or []
        out = []
        for u in urls:
            if not u:
                continue
            if not u.startswith("http"):
                u = urljoin(BASE, u)
            if u not in seen:
                seen.add(u)
                out.append(u)
        return out

    # начальное ожидание появления первых ссылок
    WebDriverWait(driver, 15).until(
        EC.presence_of_all_elements_located((By.CSS_SELECTOR, "a[class*='EnhancedLink_box__']"))
    )

    collected: List[str] = []
    collected.extend(grab())

    # скроллим, пока растёт список и есть время
    last_len = 0
    while time.time() - start < time_limit_sec and len(collected) < max_items:
        driver.execute_script("window.scrollBy(0, Math.max(600, window.innerHeight));")
        # небольшая задержка для ленивой подгрузки
        time.sleep(0.25)
        new_urls = grab()
        if new_urls:
            collected.extend(new_urls)
        if len(collected) == last_len:
            # достигли дна текущей страницы/больше нет ленивой подгрузки
            break
        last_len = len(collected)

    return collected[:max_items]


# ======================
# ОСНОВА
# ======================
def main():
    driver = build_driver(headless=HEADLESS)
    wait = WebDriverWait(driver, 10)
    start_ts = time.time()
    rows: List[Row] = []

    try:
        # 1) поиск
        driver.get(START_URL)
        accept_cookies_if_any(driver)

        # 2) сбор всех ссылок новостей с выдачи
        urls = collect_news_urls(driver, SCROLL_TIME_LIMIT_SEC, MAX_NEWS)
        if not urls:
            print("⚠️ Не нашли ни одной новости по селектору EnhancedLink_box__*")
            return
        print(f"Найдено ссылок: {len(urls)}")

        # 3) обход по списку URL и парсинг карточек
        title_xpath = "//*[contains(@class,'MaterialPageTitle_text__')]"
        lead_xpath = "//*[contains(@class,'PageLead_text__')]"

        for url in urls:
            if time.time() - start_ts >= TOTAL_TIME_LIMIT_SEC:
                break

            driver.get(url)

            # ждём заголовок; лид может отсутствовать — не валимся
            try:
                title_el = wait.until(EC.presence_of_element_located((By.XPATH, title_xpath)))
                title_text = (title_el.text or "").strip()
            except Exception:
                title_text = ""

            lead_text = ""
            try:
                lead_el = WebDriverWait(driver, 4).until(EC.presence_of_element_located((By.XPATH, lead_xpath)))
                lead_text = (lead_el.text or "").strip()
            except Exception:
                pass

            if title_text:
                rows.append(Row(
                    title=title_text,
                    lead_50=first_words(lead_text, 50),
                    url=driver.current_url
                ))

        # 4) CSV
        with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["title", "lead_50", "url"])
            w.writeheader()
            for r in rows:
                w.writerow(asdict(r))

        print(f"✅ Готово: {len(rows)} строк в {OUTPUT_CSV}")

    finally:
        driver.quit()


if __name__ == "__main__":
    main()



Найдено ссылок: 32
✅ Готово: 5 строк в tass_links.csv


In [63]:
# pip install selenium webdriver-manager

from __future__ import annotations
import csv, time, re
from dataclasses import dataclass, asdict
from typing import List
from urllib.parse import quote_plus, urljoin

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager


# ======================
# НАСТРОЙКИ
# ======================
QUERY = "сбер"
BASE = "https://www.rusprofile.ru"
START_URL = f"{BASE}/search?query={quote_plus(QUERY)}&type=ul"
OUTPUT_CSV = "rusprofile_company_info.csv"
HEADLESS = False
TOTAL_TIME_LIMIT_SEC = 120
MAX_COMPANIES = 30


# ======================
# ДРАЙВЕР
# ======================
def build_driver(headless: bool = False) -> webdriver.Chrome:
    opts = Options()
    if headless:
        opts.add_argument("--headless=new")
    opts.add_argument("--window-size=1600,1000")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--lang=ru,en")
    opts.add_experimental_option("excludeSwitches", ["enable-automation"])
    opts.add_experimental_option("useAutomationExtension", False)
    opts.add_argument("--disable-blink-features=AutomationControlled")
    opts.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
    )
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=opts)
    try:
        driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        })
    except Exception:
        pass
    return driver


def accept_cookies_if_any(driver):
    for by, sel in [
        (By.XPATH, "//button[contains(., 'Принять')]"),
        (By.XPATH, "//button[contains(., 'Согласен') or contains(., 'Хорошо') or contains(., 'OK') or contains(., 'Ок')]"),
        (By.CSS_SELECTOR, "button.cookie, .cookie button, .cookies-accept, .cookies__btn-accept"),
    ]:
        try:
            WebDriverWait(driver, 2).until(EC.element_to_be_clickable((by, sel))).click()
            break
        except Exception:
            continue


# ======================
# ДАННЫЕ
# ======================
@dataclass
class Row:
    company_name: str
    company_url: str
    section_title: str
    section_texts: str


# ======================
# ВСПОМОГАТЕЛЬНЫЕ
# ======================
def clean_spaces(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())


def collect_company_links(driver) -> List[str]:
    """
    Возвращает абсолютные ссылки на карточки компаний со страницы поиска.
    Берём только <a.list-element__title href^='/id/'>
    """
    js = r"""
    const as = Array.from(document.querySelectorAll("a.list-element__title[href^='/id/']"));
    return as.map(a => a.href || (location.origin + a.getAttribute('href')));
    """
    urls = driver.execute_script(js) or []
    # Удалим дубликаты и шум
    uniq = []
    seen = set()
    for u in urls:
        if not u:
            continue
        if not u.startswith("http"):
            u = urljoin("https://www.rusprofile.ru", u)
        if u not in seen:
            seen.add(u); uniq.append(u)
    return uniq


def parse_company_info(driver) -> List[Row]:
    """
    На странице компании собирает пары:
      .company-info__title -> все следующие .company-info__text до следующего title.
    """
    # имя компании
    name = ""
    for sel in ["h1[itemprop='name']", ".company-header__name", ".company-name", "title"]:
        try:
            el = driver.find_element(By.CSS_SELECTOR, sel)
            name = clean_spaces(el.text or el.get_attribute("textContent"))
            if name:
                break
        except Exception:
            continue
    if not name:
        name = clean_spaces(driver.title)

    url = driver.current_url

    js = r"""
    const blocks = [];
    const titles = Array.from(document.querySelectorAll('.company-info__title'));
    for (const t of titles) {
        const titleText = (t.innerText || '').trim();
        const texts = [];
        let n = t.nextElementSibling;
        while (n && !n.classList.contains('company-info__title')) {
            if (n.classList && n.classList.contains('company-info__text')) {
                const txt = (n.innerText || '').trim();
                if (txt) texts.push(txt);
            }
            n = n.nextElementSibling;
        }
        blocks.push({title: titleText, texts});
    }
    return blocks;
    """
    blocks = driver.execute_script(js) or []
    out: List[Row] = []
    for b in blocks:
        title = clean_spaces(b.get("title", ""))
        texts = [clean_spaces(x) for x in (b.get("texts") or []) if clean_spaces(x)]
        if not title and not texts:
            continue
        out.append(Row(
            company_name=name,
            company_url=url,
            section_title=title,
            section_texts=", ".join(texts),
        ))
    return out


# ======================
# MAIN
# ======================
def main():
    driver = build_driver(headless=HEADLESS)
    wait = WebDriverWait(driver, 12)
    start = time.time()
    rows: List[Row] = []

    try:
        driver.get(START_URL)
        accept_cookies_if_any(driver)

        # ждём результаты поиска
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "a.list-element__title")))

        # немного прокрутки, если лениво подгружается
        for _ in range(2):
            driver.execute_script("window.scrollBy(0, window.innerHeight);")
            time.sleep(0.2)

        # собираем URL-адреса карточек (не кликаем)
        urls = collect_company_links(driver)
        if not urls:
            print("⚠️ Не нашли ни одной ссылки .list-element__title")
            return

        # ограничим объём
        urls = urls[:MAX_COMPANIES]

        # обходим карточки прямыми переходами
        for url in urls:
            if time.time() - start > TOTAL_TIME_LIMIT_SEC:
                break

            driver.get(url)
            try:
                # ждём, пока появится структура карточки
                WebDriverWait(driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, ".company-info__title, .company-info__text"))
                )
            except Exception:
                # всё равно попробуем распарсить, вдруг контент уже есть
                pass

            rows.extend(parse_company_info(driver))

        # CSV
        with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["company_name", "company_url", "section_title", "section_texts"])
            w.writeheader()
            for r in rows:
                w.writerow(asdict(r))

        print(f"✅ Готово: {len(rows)} строк в {OUTPUT_CSV}")

    finally:
        driver.quit()


if __name__ == "__main__":
    main()


✅ Готово: 582 строк в rusprofile_company_info.csv


In [1]:
# pip install selenium webdriver-manager

from __future__ import annotations
import csv, time, re
from dataclasses import dataclass, asdict
from typing import List, Tuple
from urllib.parse import quote_plus

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager


# ======================
# НАСТРОЙКИ
# ======================
SEARCH = "инкриз"         # поисковая строка
LIMIT = 15                # элементов на страницу
MAX_PAGES = 1             # сколько страниц обходить (offset = page * LIMIT)
HEADLESS = False

PAGE_READY_TIMEOUT    = 40   # ожидание полной загрузки списка
FIND_LINKS_TIMEOUT    = 30   # ожидание появления ссылок "Вся информация"
DETAIL_READY_TIMEOUT  = 30   # первый признак карточки
DETAIL_STABILIZE_MAX  = 20   # макс. секунд на «стабилизацию» содержимого карточки
DETAIL_STABLE_HITS    = 3    # сколько подряд одинаковых замеров считаем стабильностью
DETAIL_POLL_INTERVAL  = 0.4  # шаг опроса при стабилизации
TOTAL_TIME_LIMIT      = 240  # общий лимит

BASE = "https://bankrot.fedresurs.ru"
LIST_URL_TPL = BASE + "/bankrupts?searchString={q}&regionId=all&isActiveLegalCase=null&offset={offset}&limit={limit}"
OUTPUT_CSV = "fedresurs_bankrupts_listname.csv"


# ======================
# ДРАЙВЕР
# ======================
def build_driver(headless=False):
    o = Options()
    if headless:
        o.add_argument("--headless=new")
    o.add_argument("--window-size=1600,1000")
    o.add_argument("--no-sandbox")
    o.add_argument("--disable-gpu")
    o.add_argument("--disable-dev-shm-usage")
    o.add_argument("--lang=ru,en")
    o.add_experimental_option("excludeSwitches", ["enable-automation"])
    o.add_experimental_option("useAutomationExtension", False)
    o.add_argument("--disable-blink-features=AutomationControlled")
    o.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
    )
    # Можно попробовать замедлить навигацию, чтобы SPA точно успевала
    o.page_load_strategy = "normal"
    d = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=o)
    try:
        d.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        })
    except Exception:
        pass
    return d


def wait_page_fully_loaded(driver, timeout=PAGE_READY_TIMEOUT):
    WebDriverWait(driver, timeout).until(
        lambda d: d.execute_script("return document.readyState") == "complete"
    )
    time.sleep(2.0)  # Angular дорисовывает


# ======================
# ДАННЫЕ
# ======================
@dataclass
class Row:
    list_name: str
    field_name: str
    field_value: str
    entity_url: str


def clean(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip())


# ======================
# ВСПОМОГАТЕЛЬНЫЕ ДЛЯ КАРТОЧКИ
# ======================
def _collect_pairs_now(driver) -> List[Tuple[str, str]]:
    """Единоразово собрать пары (name, value) с карточки без ожиданий."""
    pairs: List[Tuple[str, str]] = []

    # Пытаемся собрать по контейнерам сначала (надёжнее)
    containers = driver.find_elements(By.CSS_SELECTOR, ".info-item, .info-row, .info__row, .info-item-container")
    if containers:
        for c in containers:
            try:
                fname = clean(c.find_element(By.CSS_SELECTOR, ".info-item-name").get_attribute("textContent"))
            except Exception:
                fname = ""
            try:
                fval = clean(c.find_element(By.CSS_SELECTOR, ".info-item-value").get_attribute("textContent"))
            except Exception:
                fval = ""
            if fname or fval:
                pairs.append((fname, fval))

    # Фолбэк по индексам
    if not pairs:
        names = driver.find_elements(By.CSS_SELECTOR, ".info-item-name")
        vals  = driver.find_elements(By.CSS_SELECTOR, ".info-item-value")
        m = max(len(names), len(vals))
        for i in range(m):
            n = clean(names[i].get_attribute("textContent")) if i < len(names) else ""
            v = clean(vals[i].get_attribute("textContent")) if i < len(vals) else ""
            if n or v:
                pairs.append((n, v))
    return pairs


def _text_mass(pairs: List[Tuple[str, str]]) -> int:
    """Грубая оценка 'массы' текста (для детекта прироста)."""
    return sum(len(a) + len(b) for a, b in pairs)


def wait_card_fully_loaded_and_collect(driver) -> List[Tuple[str, str]]:
    """
    Ждём первый признак карточки, затем «стабилизацию» контента:
    скроллим вниз, пока число/объём пар не растут некоторое число итераций подряд,
    либо пока не выйдет DETAIL_STABILIZE_MAX.
    """
    # 1) дождаться появления хотя бы каких-то полей
    try:
        WebDriverWait(driver, DETAIL_READY_TIMEOUT).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, ".info-item-name, .info-item-value"))
        )
    except Exception:
        time.sleep(1.0)

    stable_hits = 0
    best_pairs: List[Tuple[str, str]] = []
    best_mass = -1
    start = time.time()

    last_count = -1
    last_mass = -1

    while time.time() - start < DETAIL_STABILIZE_MAX and stable_hits < DETAIL_STABLE_HITS:
        # пролистываем вниз (вдруг ленивая подгрузка)
        driver.execute_script("window.scrollBy(0, Math.max(800, window.innerHeight));")
        time.sleep(0.1)
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(0.15)

        pairs = _collect_pairs_now(driver)
        cnt = len(pairs)
        mass = _text_mass(pairs)

        # обновляем лучший снимок
        if mass > best_mass:
            best_mass = mass
            best_pairs = pairs

        # проверка на стабильность
        if cnt == last_count and mass == last_mass:
            stable_hits += 1
        else:
            stable_hits = 0

        last_count, last_mass = cnt, mass
        time.sleep(DETAIL_POLL_INTERVAL)

    return best_pairs or _collect_pairs_now(driver)


# ======================
# MAIN
# ======================
def main():
    driver = build_driver(headless=HEADLESS)
    start_all = time.time()
    out_rows: List[Row] = []

    try:
        for page in range(MAX_PAGES):
            if time.time() - start_all > TOTAL_TIME_LIMIT:
                break

            list_url = LIST_URL_TPL.format(q=quote_plus(SEARCH), offset=page * LIMIT, limit=LIMIT)
            print("[INFO] Открываем список:", list_url)
            driver.get(list_url)
            wait_page_fully_loaded(driver)

            # Селектор ссылки по тексту "Вся информация"
            link_xpath = "//a[.//div[normalize-space()='Вся информация'] or contains(normalize-space(.), 'Вся информация')]"

            try:
                WebDriverWait(driver, FIND_LINKS_TIMEOUT).until(
                    EC.presence_of_element_located((By.XPATH, link_xpath))
                )
            except Exception:
                # автоскролл и повтор
                for _ in range(5):
                    driver.execute_script("window.scrollBy(0, Math.max(800, window.innerHeight));")
                    time.sleep(0.4)
                WebDriverWait(driver, 8).until(
                    EC.presence_of_element_located((By.XPATH, link_xpath))
                )

            idx = 0
            while True:
                if time.time() - start_all > TOTAL_TIME_LIMIT:
                    break
                links = driver.find_elements(By.XPATH, link_xpath)
                if idx >= len(links) or not links:
                    break

                el = links[idx]

                # ---- имя из соседнего блока на странице списка ----
                list_name = ""
                try:
                    name_span = el.find_element(
                        By.XPATH,
                        ".//ancestor::*[contains(@class,'u-card-result')][1]"
                        "//div[contains(@class,'u-card-result__name')]/span"
                    )
                    list_name = clean(name_span.get_attribute("textContent"))
                except Exception:
                    try:
                        name_span = el.find_element(
                            By.XPATH,
                            "./ancestor::*[1]//div[contains(@class,'u-card-result__name')]/span"
                        )
                        list_name = clean(name_span.get_attribute("textContent"))
                    except Exception:
                        list_name = ""

                # прокрутка к ссылке
                try:
                    driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
                    ActionChains(driver).move_to_element(el).pause(0.05).perform()
                except Exception:
                    pass

                # --- открываем в новой вкладке/или той же ---
                current = driver.current_window_handle
                before_handles = set(driver.window_handles)

                # клик
                clicked = False
                for click_try in (
                    lambda: el.click(),
                    lambda: driver.execute_script("arguments[0].click();", el),
                    lambda: ActionChains(driver).move_to_element(el).click().perform(),
                    lambda: el.send_keys(Keys.ENTER),
                ):
                    try:
                        click_try()
                        clicked = True
                        break
                    except Exception:
                        continue

                if not clicked:
                    idx += 1
                    continue

                # ждём новую вкладку или загрузку
                try:
                    WebDriverWait(driver, 12).until(
                        lambda d: len(d.window_handles) > len(before_handles) or d.current_window_handle != current
                    )
                except Exception:
                    pass

                new_handles = list(set(driver.window_handles) - before_handles)

                try:
                    if new_handles:
                        driver.switch_to.window(new_handles[0])

                    # ======= ВАЖНО: ждём стабильного контента и только потом парсим =======
                    pairs = wait_card_fully_loaded_and_collect(driver)
                    entity_url = driver.current_url

                    for fname, fvalue in pairs:
                        out_rows.append(Row(
                            list_name=list_name,
                            field_name=fname,
                            field_value=fvalue,
                            entity_url=entity_url
                        ))

                finally:
                    # закрываем вкладку карточки и возвращаемся на список
                    if new_handles:
                        driver.close()
                        driver.switch_to.window(current)
                        WebDriverWait(driver, 15).until(
                            EC.presence_of_element_located((By.XPATH, link_xpath))
                        )
                        time.sleep(0.3)
                    else:
                        try:
                            driver.back()
                            wait_page_fully_loaded(driver)
                            WebDriverWait(driver, 10).until(
                                EC.presence_of_element_located((By.XPATH, link_xpath))
                            )
                            time.sleep(0.3)
                        except Exception:
                            break

                idx += 1

        # ===== CSV =====
        with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["list_name", "field_name", "field_value", "entity_url"])
            w.writeheader()
            for r in out_rows:
                w.writerow(asdict(r))

        print(f"✅ Готово: {len(out_rows)} строк в {OUTPUT_CSV}")

    finally:
        try:
            driver.quit()
        except Exception:
            pass


if __name__ == "__main__":
    main()


[INFO] Открываем список: https://bankrot.fedresurs.ru/bankrupts?searchString=%D0%B8%D0%BD%D0%BA%D1%80%D0%B8%D0%B7&regionId=all&isActiveLegalCase=null&offset=0&limit=15
✅ Готово: 50 строк в fedresurs_bankrupts_listname.csv


In [12]:
# pip install selenium webdriver-manager

from __future__ import annotations
import csv, time
from dataclasses import dataclass, asdict
from typing import List
from urllib.parse import urlencode, urlparse, parse_qs, urlunparse

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager


# ======================
# НАСТРОЙКИ
# ======================
BASE_URL = "https://www.dvnovosti.ru/find/"
QUERY = "фургал"
PAGES = 5  # сколько страниц обойти
HEADLESS = True
OUTPUT_CSV = "dvnovosti_results.csv"


# ======================
# ДРАЙВЕР
# ======================
def build_driver(headless: bool = True) -> webdriver.Chrome:
    opts = Options()
    if headless:
        opts.add_argument("--headless=new")
    opts.add_argument("--window-size=1400,900")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--lang=ru,en")
    opts.add_experimental_option("excludeSwitches", ["enable-automation"])
    opts.add_experimental_option("useAutomationExtension", False)
    opts.add_argument("--disable-blink-features=AutomationControlled")
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=opts)
    try:
        driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined});"
        })
    except Exception:
        pass
    return driver


# ======================
# УТИЛИТЫ
# ======================
def make_page_url(base: str, q: str, page: int) -> str:
    """
    Конструирует URL вида /find/?q=...&page=N (страница 1 — без page).
    """
    parsed = urlparse(base)
    qs = parse_qs(parsed.query, keep_blank_values=True)
    qs["q"] = [q]
    if page > 1:
        qs["page"] = [str(page)]
    elif "page" in qs:
        del qs["page"]

    # Правильное формирование query-параметров
    query_params = []
    for key, vals in qs.items():
        for val in vals:
            query_params.append((key, val))
    new_query = urlencode(query_params)

    return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", new_query, ""))


@dataclass
class Item:
    title: str
    description: str
    url: str


# ======================
# MAIN
# ======================
def main():
    driver = build_driver(headless=HEADLESS)
    wait = WebDriverWait(driver, 15)
    items: List[Item] = []

    try:
        for page in range(1, PAGES + 1):
            url = make_page_url(BASE_URL, QUERY, page)
            print(f"[INFO] Открываю страницу {page}: {url}")
            driver.get(url)

            try:
                wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "a.story__title-link")))
            except Exception:
                print(f"⚠️  Не найдено новостей на странице {page}")
                continue

            title_links = driver.find_elements(By.CSS_SELECTOR, "a.story__title-link")
            descr_links = driver.find_elements(By.CSS_SELECTOR, "a.story__description-link")

            n = min(len(title_links), len(descr_links))
            for i in range(n):
                t_el = title_links[i]
                d_el = descr_links[i]
                title = (t_el.text or t_el.get_attribute("textContent") or "").strip()
                descr = (d_el.text or d_el.get_attribute("textContent") or "").strip()
                href = t_el.get_attribute("href") or ""
                items.append(Item(title=title, description=descr, url=href))

            time.sleep(0.2)

        with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["title", "description", "url"])
            w.writeheader()
            for it in items:
                w.writerow(asdict(it))

        print(f"✅ Готово: {len(items)} записей сохранено в {OUTPUT_CSV}")

    finally:
        driver.quit()


if __name__ == "__main__":
    main()


[INFO] Открываю страницу 1: https://www.dvnovosti.ru/find/?q=%D1%84%D1%83%D1%80%D0%B3%D0%B0%D0%BB
[INFO] Открываю страницу 2: https://www.dvnovosti.ru/find/?q=%D1%84%D1%83%D1%80%D0%B3%D0%B0%D0%BB&page=2
[INFO] Открываю страницу 3: https://www.dvnovosti.ru/find/?q=%D1%84%D1%83%D1%80%D0%B3%D0%B0%D0%BB&page=3
[INFO] Открываю страницу 4: https://www.dvnovosti.ru/find/?q=%D1%84%D1%83%D1%80%D0%B3%D0%B0%D0%BB&page=4
[INFO] Открываю страницу 5: https://www.dvnovosti.ru/find/?q=%D1%84%D1%83%D1%80%D0%B3%D0%B0%D0%BB&page=5
✅ Готово: 50 записей сохранено в dvnovosti_results.csv


In [3]:
# -*- coding: utf-8 -*-
# pip install selenium webdriver-manager

from __future__ import annotations
import asyncio, csv, time, re, os, pathlib
from dataclasses import dataclass, asdict
from typing import List, Set, Tuple

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

import threading
# ======================
# НАСТРОЙКИ
# ======================
CHANNEL_URLS = [
    "https://t.me/mash",
    "https://t.me/rian_ru",
    "https://t.me/s/rbc_news",
    "https://t.me/s/dvhab_novosti",
]
KEYWORD        = "но"          # слово/фраза для поиска (регистр не важен)
DURATION_SEC   = 120               # сколько секунд мониторить каждую ссылку
HEADLESS       = False             # можно False
CONCURRENCY    = min(4, len(CHANNEL_URLS))  # одновременных драйверов

PAGE_READY_TIMEOUT  = 30
MSG_APPEAR_TIMEOUT  = 20
SCROLL_INTERVAL_SEC = 0.35
OUTPUT_DIR          = "tg_results"  # в эту папку сложим отдельные CSV


# ======================
# ДРАЙВЕР (с антиперехватом "Open in app")
# ======================
def build_driver(headless: bool=False) -> webdriver.Chrome:
    o = Options()
    if headless:
        o.add_argument("--headless=new")
    o.add_argument("--window-size=1500,1000")
    o.add_argument("--no-sandbox")
    o.add_argument("--disable-gpu")
    o.add_argument("--disable-dev-shm-usage")
    o.add_argument("--lang=ru,en")

    # Меньше следов автоматизации
    o.add_experimental_option("excludeSwitches", ["enable-automation", "enable-logging"])
    o.add_experimental_option("useAutomationExtension", False)
    o.add_argument("--disable-blink-features=AutomationControlled")

    # Ключевой флаг: отключает Chrome Intent Picker ("Открыть в приложении")
    o.add_argument("--disable-features=IntentPickerBubble")
    # Глушим всплывашки
    o.add_argument("--disable-notifications")
    o.add_argument("--disable-popup-blocking")

    # Жёсткая политика контента
    prefs = {
        "profile.default_content_setting_values.notifications": 2,
        "profile.default_content_setting_values.protocol_handlers": 2,
        "protocol_handler.excluded_schemes": {
            "tg": True, "intent": True, "whatsapp": True, "line": True, "viber": True,
        },
    }
    o.add_experimental_option("prefs", prefs)

    # Если будут алерты — сразу dismiss
    o.set_capability("unhandledPromptBehavior", "dismiss")

    # Десктопный User-Agent
    o.add_argument(
        "--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"
    )

    d = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=o)

    # Патчи ДО загрузки страниц
    try:
        d.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": r"""
// Прячем кнопки "Open in app"
(() => {
  const style = document.createElement('style');
  style.textContent = `
    a.tgme_action_button_new,
    .tgme_action_button,
    .tgme_page_context_btn,
    .tgme_header_mobile_download,
    .tgme_page_widget .tgme_action_button_new {
      display: none !important;
      visibility: hidden !important;
    }
  `;
  document.documentElement.appendChild(style);
})();

// Блокируем tg://, intent:// и др.
(() => {
  const isBlockedScheme = (u) => /^(tg:|intent:|whatsapp:|line:|viber:)/i.test(String(u || ''));
  const origOpen = window.open;
  window.open = function(url, name, specs) {
    if (isBlockedScheme(url)) return null;
    return origOpen ? origOpen.call(window, url, name, specs) : null;
  };
  document.addEventListener('click', (e) => {
    const a = e.target.closest && e.target.closest('a[href]');
    if (!a) return;
    const href = a.getAttribute('href') || '';
    if (isBlockedScheme(href)) {
      e.preventDefault();
      e.stopImmediatePropagation();
      return false;
    }
  }, true);
})();

// Скрываем след вебдрайвера
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
"""
        })
    except Exception:
        pass

    return d


# ======================
# ДАННЫЕ
# ======================
@dataclass
class Hit:
    msg_id: str
    date_url: str
    text: str


def clean(s: str) -> str:
    import re as _re
    return _re.sub(r"\s+", " ", (s or "").strip())


# ======================
# ЛОГИКА СБОРА
# ======================
def click_preview_channel(driver):
    """Жмём кнопку Preview channel, если есть."""
    wait = WebDriverWait(driver, 12)
    try:
        btn = wait.until(EC.element_to_be_clickable(
            (By.XPATH, "//a[contains(@class,'tgme_page_context_link')][contains(., 'Preview channel')]")
        ))
        driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
        ActionChains(driver).move_to_element(btn).pause(0.1).click(btn).perform()
    except Exception:
        pass  # уже на превью-странице


def ensure_messages_loaded(driver):
    WebDriverWait(driver, MSG_APPEAR_TIMEOUT).until(
        EC.presence_of_element_located(
            (By.CSS_SELECTOR, ".tgme_widget_message_text.js-message_text, .tgme_widget_message")
        )
    )
    time.sleep(0.8)


def get_new_messages(driver, seen_ids: Set[str]) -> List[Tuple[str, str, str]]:
    out: List[Tuple[str, str, str]] = []
    msg_containers = driver.find_elements(By.CSS_SELECTOR, ".tgme_widget_message")
    for box in msg_containers:
        try:
            msg_id = box.get_attribute("data-post") or box.get_attribute("data-message-id") or ""
            if not msg_id:
                try:
                    date_link = box.find_element(By.CSS_SELECTOR, "a.tgme_widget_message_date")
                    msg_id = date_link.get_attribute("href") or ""
                except Exception:
                    msg_id = ""
        except Exception:
            msg_id = ""

        if not msg_id or msg_id in seen_ids:
            continue

        try:
            text_el = box.find_element(By.CSS_SELECTOR, ".tgme_widget_message_text.js-message_text")
            txt = clean(text_el.text or text_el.get_attribute("textContent") or "")
        except Exception:
            txt = ""

        try:
            date_link = box.find_element(By.CSS_SELECTOR, "a.tgme_widget_message_date")
            date_url = date_link.get_attribute("href") or ""
        except Exception:
            date_url = ""

        seen_ids.add(msg_id)
        out.append((msg_id, date_url, txt))
    return out


def scroll_up(driver):
    driver.execute_script("window.scrollBy(0, -Math.max(800, window.innerHeight));")


def monitor_channel(driver, keyword: str, duration_sec: int) -> List[Hit]:
    patt = re.compile(re.escape(keyword), re.IGNORECASE)
    hits: List[Hit] = []
    seen_ids: Set[str] = set()
    end_time = time.time() + duration_sec

    ensure_messages_loaded(driver)

    while time.time() < end_time:
        new_msgs = get_new_messages(driver, seen_ids)
        for mid, durl, txt in new_msgs:
            if txt and patt.search(txt):
                hits.append(Hit(msg_id=mid, date_url=durl, text=txt))
                print(f"[MATCH] {mid} -> {durl} :: {txt[:120]}")
        scroll_up(driver)
        time.sleep(SCROLL_INTERVAL_SEC)

    return hits


def slug_from_url(url: str) -> str:
    # t.me/<handle>[/...] -> handle
    try:
        part = url.rstrip("/").split("/")[-1]
        # если ссылка вида t.me/s/<handle>
        if part == "s" and len(url.rstrip("/").split("/")) >= 2:
            part = url.rstrip("/").split("/")[-2]
        return re.sub(r"[^a-zA-Z0-9_\-\.]+", "_", part or "channel")
    except Exception:
        return "channel"


def scrape_one_channel(url: str, keyword: str, duration_sec: int, headless: bool) -> str:
    """
    Синхронная функция: заходит на канал, собирает хиты и сохраняет CSV.
    Возвращает путь к CSV.
    """
    out_dir = pathlib.Path(OUTPUT_DIR)
    out_dir.mkdir(parents=True, exist_ok=True)
    slug = slug_from_url(url)
    output_csv = str(out_dir / f"{slug}.csv")

    driver = build_driver(headless=headless)
    try:
        driver.get(url)
        WebDriverWait(driver, PAGE_READY_TIMEOUT).until(
            lambda d: d.execute_script("return document.readyState") == "complete"
        )
        time.sleep(0.8)

        click_preview_channel(driver)
        ensure_messages_loaded(driver)

        print(f"[INFO] {slug}: ищу '{keyword}' {duration_sec} сек…")
        hits = monitor_channel(driver, keyword, duration_sec)

        with open(output_csv, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["msg_id", "date_url", "text"])
            w.writeheader()
            for h in hits:
                w.writerow(asdict(h))

        print(f"[DONE] {slug}: совпадений {len(hits)} — файл: {output_csv}")
        return output_csv
    finally:
        try:
            driver.quit()
        except Exception:
            pass


# ======================
# АСИНХРОННЫЙ ОРКЕСТРАТОР
# ======================
async def run_all(urls: List[str], keyword: str, duration_sec: int, headless: bool, concurrency: int):
    sem = asyncio.Semaphore(concurrency)

    async def _task(u: str):
        async with sem:
            # Гоним синхронный Selenium в отдельном потоке
            return await asyncio.to_thread(
                scrape_one_channel, u, keyword, duration_sec, headless
            )

    results = await asyncio.gather(*[_task(u) for u in urls], return_exceptions=True)
    ok, failed = [], []
    for u, r in zip(urls, results):
        if isinstance(r, Exception):
            failed.append((u, r))
        else:
            ok.append((u, r))

    print("\n================ SUMMARY ================")
    for u, p in ok:
        print(f"[OK]   {u} -> {p}")
    for u, e in failed:
        print(f"[FAIL] {u} -> {e!r}")
    print("=========================================\n")


def run_coro_safely(coro):
    """
    Запускает корутину в любом окружении:
    - если цикл не запущен -> asyncio.run(coro)
    - если цикл уже идёт (Jupyter и т.п.) -> запускаем в отдельном потоке с собственным циклом
    """
    try:
        asyncio.get_running_loop()  # есть активный цикл?
    except RuntimeError:
        # цикла нет — обычный путь
        return asyncio.run(coro)

    # цикл уже запущен — уводим в отдельный поток
    result_holder = {}
    error_holder = {}

    def _runner():
        try:
            result_holder["value"] = asyncio.run(coro)
        except BaseException as e:
            error_holder["error"] = e

    t = threading.Thread(target=_runner, daemon=True)
    t.start()
    t.join()

    if "error" in error_holder:
        raise error_holder["error"]
    return result_holder.get("value")

def main():
    return run_coro_safely(
        run_all(
            CHANNEL_URLS,
            KEYWORD,
            DURATION_SEC,
            HEADLESS,
            CONCURRENCY
        )
    )


if __name__ == "__main__":
    main()



[FAIL] https://t.me/mash -> ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
[FAIL] https://t.me/rian_ru -> TimeoutException()
[FAIL] https://t.me/s/rbc_news -> ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
[FAIL] https://t.me/s/dvhab_novosti -> ProtocolError('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))



KeyboardInterrupt: 