In [None]:
!pip install transformers accelerate bitsandbytes
!pip install beautifulsoup4 sentence_transformers ipywidgets lxml

In [None]:
import torch
print(torch.cuda.is_available(), torch.version.cuda)

In [None]:
"""
Мульти-парсер новостей по темам мошенничества и афер.
Источники: Lenta.ru, RIA Новости, Коммерсантъ, Ведомости, РБК.
Сохраняет результаты в JSON, CSV и SQLite.
"""
import re
import csv
import json
import sqlite3
import argparse
import datetime
import time
import sys
from typing import List, Dict, Any, Tuple
import requests
from bs4 import BeautifulSoup
from dateutil import parser as dateparser
from requests.adapters import HTTPAdapter, Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib3.util.retry import Retry
from urllib.parse import urljoin
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException

In [None]:
# Попытка загрузить библиотеку для семантической фильтрации
try:
    from sentence_transformers import SentenceTransformer
    import numpy as np
    HAS_SEMANTIC = True
except ImportError:
    HAS_SEMANTIC = False
    print(
        "[WARN] пакет 'sentence_transformers' не установлен. Семантическая фильтрация отключена.",
        file=sys.stderr
    )

session = requests.Session()
retry = Retry(
    total=5,
    backoff_factor=1,
    status_forcelist=[429,500,502,503,504],
    allowed_methods=["HEAD", "GET", "OPTIONS"],
    respect_retry_after_header=True
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', HTTPAdapter(max_retries=retry))
session.mount('https://', HTTPAdapter(max_retries=retry))

Article = Dict[str, Any]
UrlMeta = Tuple[str, str, datetime.date]

In [3]:
def daterange(start: datetime.date, end: datetime.date):
    print(f"[DEBUG] Гененируем даты с {start} до {end}")
    cur = start
    while cur <= end:
        yield cur
        cur += datetime.timedelta(days=1)

In [4]:
class BaseSource:
    name: str = "base"
    def get_urls(self, date: datetime.date) -> List[str]:
        raise NotImplementedError
    def parse_article(self, url: str) -> Article:
        raise NotImplementedError

In [None]:
class LentaSource(BaseSource):
    name = 'lenta.ru'
    BASE_URL = 'https://lenta.ru'
    ARTICLE_REGEX = re.compile(r'https?://lenta\.ru/news/\d{4}/\d{2}/\d{2}/[\w\-]+/?')

    def __init__(self):
        print("[LentaSource] Инициализация источника Lenta.ru")

        # Настраиваем сессию с retry
        self.session = requests.Session()
        retries = Retry(
            total=5,                # всего попыток
            backoff_factor=0.5,
            status_forcelist=[500, 502, 503, 504],
            allowed_methods=["GET"]
        )
        adapter = HTTPAdapter(max_retries=retries)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)

        self.session.headers.update({
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/114.0.0.0 Safari/537.36"
            )
        })

    def get_urls(self, date: datetime.date) -> List[str]:
        from urllib.parse import urljoin
        yyyy, mm, dd = date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')
        page = 1
        found = set()

        while True:
            if page == 1:
                url = f"{self.BASE_URL}/news/{yyyy}/{mm}/{dd}/"
            else:
                url = f"{self.BASE_URL}/news/{yyyy}/{mm}/{dd}/page/{page}/"

            try:
                resp = self.session.get(url, timeout=(3.05, 10))
                resp.raise_for_status()
            except Exception as e:
                print(f"[LentaSource] Ошибка при загрузке архива {url}: {e}")
                break

            soup = BeautifulSoup(resp.text, 'lxml')
            for a in soup.find_all('a', href=True):
                full = a['href'] if a['href'].startswith('http') else urljoin(self.BASE_URL, a['href'])
                if self.ARTICLE_REGEX.match(full):
                    found.add(full)

            # если есть кнопка «Дальше» — идём на следующую страницу
            if soup.select_one(".loadmore.js-loadmore"):
                page += 1
                continue
            break

        print(f"[LentaSource] Всего найдено {len(found)} ссылок за {date}")
        return list(found)


    def parse_article(self, url: str) -> Article:
        try:
            resp = self.session.get(url, timeout=(10, 15))
            resp.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"[ERROR] parsing {url}: {e}")
            return {'url': url, 'source': self.name, 'title': '', 'body': ''}

        soup = BeautifulSoup(resp.text, 'lxml')
        title_el = soup.select_one('h1')
        title = title_el.get_text(strip=True) if title_el else ''

        body_el = soup.select_one('div.topic-body__content') or soup.select_one('div.l-text')
        if body_el:
            paras = body_el.find_all('p')
            body = '\n'.join(p.get_text(strip=True) for p in paras) if paras else body_el.get_text(strip=True)
        else:
            body = ''

        return {'url': url, 'source': self.name, 'title': title, 'body': body}

In [None]:
class RiaSource(BaseSource):
    name = 'ria.ru'
    DATE_FMT = '%Y%m%d'
    ARTICLE_REGEX = re.compile(r'https?://ria\.ru/\d{8}/[\w\-]+\.html')
    TITLE_SELECTORS = ['div.article__title', 'h1.article__title']
    BODY_SELECTOR = 'div.article__body'

    def __init__(self, driver=None):
        print("[RiaSource] Инициализация драйвера Selenium")
        self.driver = driver or self._init_webdriver()

    def _init_webdriver(self):
        opts = Options()
        opts.add_argument('--headless')
        opts.add_argument('--no-sandbox')
        opts.add_argument('--disable-dev-shm-usage')
        return webdriver.Chrome(options=opts)

    def _load_full_page(self, url: str, scroll_pause=1.0,
                        click_timeout=10, load_timeout=15) -> str:
        print(f"[RiaSource] Загрузка полной страницы архива: {url}")
        self.driver.get(url)
        time.sleep(scroll_pause)
        prev = self.driver.execute_script("return document.body.scrollHeight")
        wait_click = WebDriverWait(self.driver, click_timeout)
        wait_load = WebDriverWait(self.driver, load_timeout)
        while True:
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(scroll_pause)
            try:
                more = wait_click.until(EC.element_to_be_clickable((
                    By.CSS_SELECTOR,
                    "div.list-more.color-btn-second-hover[data-url]"
                )))
                self.driver.execute_script("arguments[0].click();", more)
                wait_load.until(lambda d: d.execute_script("return document.body.scrollHeight") > prev)
                prev = self.driver.execute_script("return document.body.scrollHeight")
            except TimeoutException:
                break
        return self.driver.page_source

    def get_urls(self, date: datetime.date) -> List[str]:
        url = f'https://ria.ru/{date.strftime(self.DATE_FMT)}/'
        print(f"[RiaSource] Получение ссылок за {date}: {url}")
        html = self._load_full_page(url)
        soup = BeautifulSoup(html, 'lxml')

        links = set()
        for a in soup.find_all('a', href=True):
            raw = a['href']
            full = raw if raw.startswith('http') else urljoin('https://ria.ru', raw)
            if self.ARTICLE_REGEX.match(full):
                links.add(full)

        print(f"[RiaSource] Найдено {len(links)} статей за {date}")
        return list(links)

    def parse_article(self, url: str) -> Article:
        # пытаемся получить контент, обрабатываем 429
        for attempt in range(1, 6):
            resp = requests.get(url, timeout=10)
            if resp.status_code == 429:
                retry_after = resp.headers.get("Retry-After")
                wait = int(retry_after) if retry_after and retry_after.isdigit() else attempt * 2
                print(f"[RiaSource] Статус 429, ожидание {wait} сек (попытка {attempt})")
                time.sleep(wait)
                continue
            resp.raise_for_status()
            break

        soup = BeautifulSoup(resp.text, 'lxml')

        # Заголовок
        title = ""
        for sel in self.TITLE_SELECTORS:
            if (el := soup.select_one(sel)):
                title = el.get_text(strip=True)
                break

        # Тело статьи
        body_parts: List[str] = []
        for block in soup.select('div.article__block[data-type="text"]'):
            if (text_el := block.select_one('div.article__text')):
                paras = text_el.find_all('p')
                if paras:
                    body_parts.extend(p.get_text(strip=True) for p in paras)
                else:
                    body_parts.append(text_el.get_text(strip=True))
        if not body_parts:
            if (body_el := soup.select_one(self.BODY_SELECTOR)):
                paras = body_el.find_all('p')
                body_parts = [p.get_text(strip=True) for p in paras] if paras else [body_el.get_text(strip=True)]

        body = "\n\n".join(body_parts).strip()

        # Извлекаем дату публикации из URL
        date = None
        m = re.match(r'https?://ria\.ru/(\d{8})/', url)
        if m:
            try:
                date = datetime.datetime.strptime(m.group(1), self.DATE_FMT).date()
            except ValueError:
                date = None

        print(f"[RiaSource] Получено название: {title[:60]}")
        print(f"[RiaSource] Длина статьи: {len(body)} символов")

        result: Article = {
            'url': url,
            'source': self.name,
            'title': title,
            'body': body
        }
        if date:
            result['date'] = date.isoformat()
        return result

    def close(self):
        print("[RiaSource] Закрытие драйвера Selenium")
        self.driver.quit()

In [10]:
class FincultSource(BaseSource):
    name = 'fincult.info'
    BASE_URL = 'https://fincult.info'
    LIST_URL = BASE_URL + '/articles/'
    CARD_SELECTOR = 'div.card__detail h4 a'
    SHOW_MORE_SELECTOR = '.card-list__show-all-button'

    def __init__(self, driver=None):
        print("[FincultSource] Инициализация драйвера Selenium")
        from selenium import webdriver
        from selenium.webdriver.chrome.options import Options
        opts = Options()
        opts.add_argument('--headless')
        opts.add_argument('--no-sandbox')
        opts.add_argument('--disable-dev-shm-usage')
        opts.add_experimental_option("excludeSwitches", ["enable-automation"])
        opts.add_experimental_option('useAutomationExtension', False)
        opts.add_argument(
            "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/114.0.0.0 Safari/537.36"
        )
        self.driver = driver or webdriver.Chrome(options=opts)

    def _load_full_list(self) -> str:
        from selenium.webdriver.common.by import By
        from selenium.webdriver.support.ui import WebDriverWait
        from selenium.webdriver.support import expected_conditions as EC
        import time

        print(f"[FincultSource] Загрузка списка статей: {self.LIST_URL}")
        self.driver.get(self.LIST_URL)
        # Ждём, когда кнопка «Показать еще» появится в DOM
        try:
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, self.SHOW_MORE_SELECTOR))
            )
        except:
            print("[FincultSource] Кнопка 'Показать еще' не появилась —, возможно, контента мало")

        # Кликаем, пока кнопка доступна
        while True:
            try:
                # Скроллим вниз, чтобы кнопка оказалась в видимой области
                self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
                time.sleep(0.5)
                btn = WebDriverWait(self.driver, 5).until(
                    EC.element_to_be_clickable((By.CSS_SELECTOR, self.SHOW_MORE_SELECTOR))
                )
                print("[FincultSource] Нажимаем 'Показать еще'")
                btn.click()
                time.sleep(1)
            except Exception:
                print("[FincultSource] Больше нет кнопки 'Показать еще'")
                break

        return self.driver.page_source

    def get_urls(self, date: datetime.date) -> List[str]:
        html = self._load_full_list()
        soup = BeautifulSoup(html, 'lxml')
        links = set()
        for a in soup.select(self.CARD_SELECTOR):
            href = a.get('href')
            if href:
                full = href if href.startswith('http') else self.BASE_URL + href
                links.add(full)
        print(f"[FincultSource] Найдено {len(links)} ссылок")
        return list(links)

    def parse_article(self, url: str) -> Article:
        from selenium.webdriver.common.by import By
        from selenium.webdriver.support.ui import WebDriverWait
        from selenium.webdriver.support import expected_conditions as EC
        import time

        print(f"[FincultSource] parse_article: {url}")
        try:
            # Загружаем страницу и ждём, когда появится хотя бы один параграф
            self.driver.get(url)
            #WebDriverWait(self.driver, 30).until(
            #    EC.presence_of_element_located((By.CSS_SELECTOR, "div.article__paragraph"))
            #)
            WebDriverWait(self.driver, 30).until(
                lambda d: d.find_element(By.CSS_SELECTOR, 'h1') and 
                          d.find_element(By.CSS_SELECTOR, 'div.article__paragraph')
            )
            # Скроллим вниз, чтобы весь контент подгрузился
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(1)

            soup = BeautifulSoup(self.driver.page_source, 'lxml')

            # Заголовок: первый h1 внутри header-контейнера или первый h1 на странице
            hdr = soup.find('div', class_='article-header__container')
            if hdr and hdr.find('h1'):
                title = hdr.find('h1').get_text(strip=True)
            else:
                h1 = soup.find('h1')
                title = h1.get_text(strip=True) if h1 else ''

            # Тело: собираем текст из всех div.article__paragraph
            body_parts = [
                block.get_text(" ", strip=True)
                for block in soup.find_all('div', class_='article__paragraph')
                if block.get_text(strip=True)
            ]
            body = "\n\n".join(body_parts).strip()

            # Дата публикации
            date_el = soup.select_one(
                'section.article-publish-date.article__text div.article__paragraph'
            )
            if date_el:
                ds = date_el.get_text(strip=True)
                try:
                    pub_date = datetime.datetime.strptime(ds, '%d.%m.%Y %H:%M').isoformat()
                except:
                    pub_date = ds
            else:
                pub_date = ''

            return {
                'url':    url,
                'source': self.name,
                'title':  title,
                'body':   body,
                'date':   pub_date
            }

        except Exception as e:
            print(f"[FincultSource][ERROR] {url}: {type(e).__name__}: {e}", file=sys.stderr)
            return {'url': url, 'source': self.name, 'title': '', 'body': '', 'date': ''}

    def close(self):
        print("[FincultSource] Closing Selenium driver")
        self.driver.quit()


In [None]:
def run_parser(start_date: str, end_date: str, do_filter: bool = False):
    start = dateparser.parse(start_date).date()
    end   = dateparser.parse(end_date).date()
    sources = [
        LentaSource(),
        RiaSource(),
        FincultSource(),
    ]

    # Разбиваем задачи на «простые» (requests+BS4) и «Selenium»
    simple_tasks: List[UrlMeta] = []
    selenium_tasks: List[Tuple[str, BaseSource]] = []

    for src in sources:
        if hasattr(src, 'driver'):
            # Selenium-источники: FinCult, RIA и т.п.
            for dt in daterange(start, end):
                try:
                    urls = src.get_urls(dt)
                    for u in urls:
                        selenium_tasks.append((u, src))
                except Exception as e:
                    print(f"[ERROR] {src.name}: {e}", file=sys.stderr)
        else:
            # Простые HTTP-источники: Lenta, Kommersant, Vedomosti, RBC
            for dt in daterange(start, end):
                try:
                    urls = src.get_urls(dt)
                    for u in urls:
                        simple_tasks.append((u, src, dt))
                except Exception as e:
                    print(f"[ERROR] {src.name} {dt}: {e}", file=sys.stderr)

    articles: List[Article] = []

    # 1) Многопоточно парсим «простые» статьи
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_meta = {
            executor.submit(src.parse_article, url): (url, src, dt)
            for url, src, dt in simple_tasks
        }
        for fut in as_completed(future_to_meta):
            url, src, dt = future_to_meta[fut]
            try:
                art = fut.result()
                # Устанавливаем дату из календарного цикла
                art["date"] = dt.isoformat()
                articles.append(art)
            except Exception as e:
                print(f"[ERROR] parsing {url}: {e}", file=sys.stderr)

    # 2) Последовательно парсим Selenium-источники
    for url, src in selenium_tasks:
        try:
            art = src.parse_article(url)
            # Фильтруем по дате публикации, если парсер вернул валидную дату
            try:
                pub = dateparser.parse(art["date"]).date()
                print(f"ДАТА = {pub}")
            except:
                pub = None
            if pub and start <= pub <= end:
                articles.append(art)
        except Exception as e:
            print(f"[ERROR] parsing {url}: {e}", file=sys.stderr)

    # Закрываем все драйверы
    for src in sources:
        if hasattr(src, "close"):
            src.close()

    # Семантическая фильтрация (по желанию)
    if do_filter and HAS_SEMANTIC:
        print("[MAIN] Running semantic filter")
        model = SentenceTransformer("ai-forever/sbert_large_mt_nlu_ru", device='cuda:0')
        corpus = [a["title"] + " " + a["body"] for a in articles]
        emb = model.encode(corpus, batch_size=32, convert_to_numpy=True, show_progress_bar=True)
        anchor_texts = [
            "Статья о банковском мошенничестве: кредитные аферы и поддельные кредиты.",
            "Новость о хищении денег со счёта через фишинг.",
            "Описание схемы скимминга и кражи данных банковских карт."
        ]
        # 1. Кодируем все фразы одним вызовом
        anchor_embs = model.encode(
            anchor_texts,
            batch_size=len(anchor_texts),
            convert_to_numpy=True,
            normalize_embeddings=True,
            show_progress_bar=False
        )
        
        # 2. Строим «композитный» вектор (mean pooling)
        q_emb = anchor_embs.mean(axis=0)
        sims = (emb @ q_emb) / (np.linalg.norm(emb, axis=1) * np.linalg.norm(q_emb))
        filtered = []
        for art, score in zip(articles, sims):
            if score >= 0.5:
                art["score"] = float(score)
                filtered.append(art)
        articles = filtered

    # Сохраняем результаты
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    base = f"results_{start.strftime('%Y%m%d')}_{end.strftime('%Y%m%d')}_{timestamp}"

    with open(base + ".json", "w", encoding="utf-8") as f:
        json.dump(articles, f, ensure_ascii=False, indent=2)

    with open(base + ".csv", "w", newline="", encoding="utf-8") as f:
        if articles:
            writer = csv.DictWriter(f, fieldnames=articles[0].keys())
            writer.writeheader()
            writer.writerows(articles)

    print(f"[MAIN] Finished. Files: {base}.json, {base}.csv")

In [12]:
def main():
    parser = argparse.ArgumentParser(description="Мульти парсер новостей")
    parser.add_argument("--start", required=True, help="Дата начала YYYY-MM-DD")
    parser.add_argument("--end",   required=True, help="Дата конца   YYYY-MM-DD")
    parser.add_argument("--filter", action="store_true", help="Включить семантический фильтр")
    args = parser.parse_args()
    run_parser(args.start, args.end, args.filter)

In [None]:
run_parser("2025-01-15", "2025-01-30", True)