In [1]:
!python3 -m pip install --upgrade pip
!python3 -m pip install feedparser argon2-cffi beautifulsoup4 selenium webdriver-manager playwright cloudscraper nest_asyncio tabulate
#!playwright install chromium firefox




In [2]:
import feedparser
from datetime import datetime
import requests
from email.utils import parsedate_to_datetime
from argon2.low_level import hash_secret_raw, Type
import base64
import time
import sys
import os
import subprocess
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from playwright.sync_api import sync_playwright
from playwright.async_api import async_playwright
import cloudscraper
import tempfile
import asyncio
from enum import Enum
import re
import html
from tabulate import tabulate
from dataclasses import dataclass, fields, asdict
from typing import List, Optional, Dict, Any
try:
    # Для webdriver_manager >= 3.8.0
    from webdriver_manager.core.os_manager import ChromeType
except ImportError:
    # Для старых версий
    from webdriver_manager.core.utils import ChromeType

    
class ContentType(Enum):
    RAW_RSS = "raw_rss"
    HTML_RSS = "html_rss"
    RAW_ATOM = "raw_atom"         # Чистый Atom/XML без обертки
    HTML_ATOM = "html_atom"       # Atom завернутый в HTML
    ANUBIS = "anubis"             # Страница проверки Anubis
    PLAIN_TEXT = "plain_text"     # Простой текст
    OTHER = "other"               # Другой тип контента

@dataclass
class NewsEntry:
    title: str
    link: str
    id: str
    updated: str
    summary: Optional[str] = None
    content: Optional[str] = None
    @classmethod
    def get_field_names(cls) -> List[str]:
        return [f.name for f in fields(cls)]
    def to_table_row(
        self,
        fields: List[str] = None,
        max_length: int = 50,
        truncate: bool = True
    ) -> List[str]:
        # Если поля не указаны, используем все доступные
        if fields is None:
            fields = [f.name for f in fields(self.__class__)]
        
        row = []
        for field in fields:
            # Получаем значение поля
            value = getattr(self, field, "")
            
            # Обработка None значений
            if value is None:
                row.append("")
                continue
                
            # Преобразование в строку
            value_str = str(value)
            
            # Обрезка длинного текста
            if truncate and len(value_str) > max_length:
                value_str = value_str[:max_length-3] + "..."
                
            row.append(value_str)
        return row

def display_news_entries(
    entries: List[NewsEntry],
    fields: Optional[List[str]] = None,
    max_length: int = 50,
    truncate: bool = True,
    tablefmt: str = "grid",
    show_index: bool = True
) -> str:
    if fields is None:
        fields = ['title', 'link', 'updated']
    
    # Проверяем допустимость полей
    available_fields = NewsEntry.get_field_names()
    for field in fields:
        if field not in available_fields:
            raise ValueError(f"Недопустимое поле: {field}. Допустимые поля: {available_fields}")
    
    table_data = [entry.to_table_row(fields, max_length, truncate) for entry in entries]

    return tabulate(
        table_data,
        headers=fields,
        tablefmt=tablefmt,
        maxcolwidths=max_length,
        showindex=list(range(len(entries))) if show_index else None,
        colalign=("left",) * len(fields)
    )


def try_scraper(url) -> str:
    scraper = cloudscraper.create_scraper()
    return scraper.get(url).text

def is_html_valid(input_str: str) -> bool:
    soup = BeautifulSoup(input_str, 'html.parser')
    return bool(soup.find() or 
                soup.contents or 
                isinstance(soup, BeautifulSoup))


def install_chromium_conda():
    """Устанавливает Chromium через Conda"""
    print("Установка Chromium через Conda...")
    try:
        # Проверяем, запущены ли мы в Jupyter
        if 'ipykernel' in sys.modules:
            get_ipython().system('conda install -c conda-forge chromium-browser -y')
            get_ipython().system('conda install -c conda-forge chromedriver -y')
        else:
            subprocess.run(['conda', 'install', '-c', 'conda-forge', 'chromium-browser', '-y'], check=True)
            subprocess.run(['conda', 'install', '-c', 'conda-forge', 'chromedriver', '-y'], check=True)
        print("Установка завершена")
        return True
    except Exception as e:
        print(f"Ошибка установки: {e}")
        return False
        
def get_chromium_path():
    """Возвращает путь к Chromium в Conda"""
    conda_prefix = os.environ.get('CONDA_PREFIX', '')
    paths = [
        os.path.join(conda_prefix, 'bin', 'chromium'),
        os.path.join(conda_prefix, 'bin', 'chromium-browser'),
        os.path.join(conda_prefix, 'Library', 'bin', 'chromium.exe'),
    ]
    
    for path in paths:
        if os.path.exists(path):
            return path
    return None

def install_playwright_deps_local():
    """Устанавливает зависимости локально без root-прав"""
    print("Установка локальных зависимостей для Playwright...")
    try:
        # Создаем директорию для зависимостей
        home_dir = os.path.expanduser("~")
        deps_dir = os.path.join(home_dir, "playwright-deps")
        os.makedirs(deps_dir, exist_ok=True)
        
        # Устанавливаем необходимые пакеты в локальную директорию
        deps = [
            "libatk1.0-0", "libatk-bridge2.0-0", "libxcomposite1",
            "libxdamage1", "libatspi2.0-0", "libnss3", "libnspr4",
            "libdrm2", "libxkbcommon0", "libgbm1", "libasound2"
        ]
        
        for dep in deps:
            # Скачиваем пакет
            download_cmd = f"apt-get download {dep}"
            subprocess.run(download_cmd, shell=True, check=True, cwd=deps_dir)
            
            # Находим скачанный файл
            deb_file = next((f for f in os.listdir(deps_dir) if f.endswith(".deb") and dep in f), None)
            if not deb_file:
                print(f"Не удалось найти .deb файл для {dep}")
                continue
                
            # Распаковываем пакет
            deb_path = os.path.join(deps_dir, deb_file)
            extract_cmd = f"dpkg-deb -x {deb_path} {deps_dir}"
            subprocess.run(extract_cmd, shell=True, check=True)
            
            # Удаляем .deb файл после распаковки
            os.remove(deb_path)
        
        # Настраиваем переменные окружения
        lib_path = os.path.join(deps_dir, "usr", "lib", "x86_64-linux-gnu")
        os.environ["LD_LIBRARY_PATH"] = f"{lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}"
        print(f"LD_LIBRARY_PATH установлен: {os.environ['LD_LIBRARY_PATH']}")
        
        return True
    except Exception as e:
        print(f"Ошибка установки зависимостей: {e}")
        return False
        
if not os.environ.get("PLAYWRIGHT_DEPS_INSTALLED"):
    if install_playwright_deps_local():
        os.environ["PLAYWRIGHT_DEPS_INSTALLED"] = "1"
        print("Зависимости успешно установлены!")
    else:
        print("Продолжаем без зависимостей - возможны проблемы")

Установка локальных зависимостей для Playwright...
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libatk1.0-0 amd64 2.36.0-3build1 [51.9 kB]
Fetched 51.9 kB in 0s (202 kB/s)
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libatk-bridge2.0-0 amd64 2.38.0-3 [66.6 kB]
Fetched 66.6 kB in 0s (273 kB/s)
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxcomposite1 amd64 1:0.4.5-1build2 [7,192 B]
Fetched 7,192 B in 0s (55.8 kB/s)
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libxdamage1 amd64 1:1.1.5-2build2 [7,154 B]
Fetched 7,154 B in 0s (63.7 kB/s)
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 libatspi2.0-0 amd64 2.44.0-3 [80.9 kB]
Fetched 80.9 kB in 1s (138 kB/s)
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libnss3 amd64 2:3.98-0ubuntu0.22.04.2 [1,347 kB]
Fetched 1,347 kB in 0s (2,808 kB/s)
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 libnspr4 amd64 2:4.35-0ubuntu0.22.04.1 [119 kB]
Fetched 119 kB in 1s (18

In [83]:
def detect_content_type(content: str) -> ContentType:
    """
    Определяет тип контента
    :param content: Строка с контентом
    :return: Тип контента как ContentType
    """
    if not content:
        return ContentType.OTHER
    
    # Детектор Anubis
    if is_anubis_page(content):
        return ContentType.ANUBIS
    

    if is_raw_atom(content):
        return ContentType.RAW_ATOM
    if is_raw_rss(content):
        return ContentType.RAW_RSS

    if is_html_wrapped_rss(content):
        return ContentType.HTML_RSS
    if is_html_wrapped_atom(content):
        return ContentType.HTML_ATOM
    
    # Детектор plain text
    if is_plain_text(content):
        return ContentType.PLAIN_TEXT
    
    return ContentType.OTHER

def is_anubis_page(content: str) -> bool:
    """Проверяет, является ли контент страницей Anubis"""
    content = html.unescape(content)
    anubis_indicators = [
        "Making sure you're not a bot!",
        "Protected by Anubis",
        "id=\"progress\"",
        "id=\"status\"",
        "anubis_version",
        "/anubis/static/img/"
    ]
    return any(indicator in content for indicator in anubis_indicators)

def is_raw_rss(content: str) -> bool:
    content = html.unescape(content)
    return content.lstrip().startswith(('<?xml', '<rss')) and ("<rss" in content)
def is_html_wrapped_rss(content: str) -> bool:
    html_indicators = ['<html', '<head', '<body']
    return (not is_raw_rss(content)) and ("<rss" in content) and any(indicator in content for indicator in html_indicators)
    
def is_raw_atom(content: str) -> bool:
    content = html.unescape(content)
    """Проверяет, является ли контент чистым Atom без HTML-обертки"""
    # Проверяем начало документа
    if not content.lstrip().startswith(('<?xml', '<feed')):
        return False
    
    # Проверяем основные элементы Atom
    atom_indicators = [
        '<feed', '</feed>',
        'xmlns="http://www.w3.org/2005/Atom"',
        '<entry>', '</entry>',
        '<id>', '</id>',
        '<title>', '</title>',
        '<updated>', '</updated>'
    ]
    
    return all(indicator in content for indicator in atom_indicators)

def is_html_wrapped_atom(content: str) -> bool:
    content = html.unescape(content)
    """Проверяет, содержит ли HTML-контент Atom в обертке"""
    # Должны быть признаки HTML
    html_indicators = ['<html', '<head', '<body', '<meta', '<div', '<pre']
    if not any(indicator in content for indicator in html_indicators):
        return False
    
    # И при этом признаки Atom внутри
    atom_indicators = ['<feed', '</feed>', '<entry>', 'xmlns="http://www.w3.org/2005/Atom"']
    return any(indicator in content for indicator in atom_indicators)

def is_plain_text(content: str) -> bool:
    content = html.unescape(content)
    """Проверяет, является ли контент простым текстом"""
    # Если контент содержит менее 5% HTML-тегов
    tags = re.findall(r'<[^>]+>', content)
    tag_ratio = len(tags) / (len(content) / 100) if content else 0
    
    # Дополнительные признаки текста
    text_indicators = [
        len(content.splitlines()) > 10,  # Много строк
        '\n' in content,                 # Переносы строк
        tag_ratio < 5,                   # Мало тегов
        not content.startswith('<')       # Не начинается с тега
    ]
    
    return all(text_indicators)
def extract_xml(html_str: str) -> str:
    html_str = html.unescape(html_str)
    # Определяем маркеры начала и конца
    start_marker = "<?xml"
    end_marker = "</feed>"
    
    # Ищем начало XML
    start_idx = html_str.find(start_marker)
    if start_idx == -1:
        return ""  # Начало не найдено
    
    # Ищем конец корневого элемента
    end_idx = html_str.find(end_marker, start_idx)
    
    # Извлекаем подстроку (с учётом длины end_marker)
    if end_idx == -1:
        xml_escaped = html_str[start_idx:]  # До конца строки
    else:
        xml_escaped = html_str[start_idx:end_idx + len(end_marker)]
    
    return xml_escaped


In [85]:
async def get_content_with_anubis(url, timeout=60):
    """
    Получает контент страницы после прохождения проверки Anubis
    :param url: URL для загрузки
    :param timeout: максимальное время ожидания в секундах
    :return: HTML-контент страницы
    """
    p = await async_playwright().start()
    # Запускаем Chromium в headless-режиме
    browser = await p.chromium.launch(headless=True)
    context = await browser.new_context(
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
        viewport={"width": 1920, "height": 1080}
    )
    page = await context.new_page()
    
    # Переходим на страницу
    print(f"Загрузка URL: {url}")
    await page.goto(url, wait_until="domcontentloaded")
    
    try:
        # Ожидание появления элементов Anubis
        initial_content = await page.content()
        initial_content = html.unescape(initial_content)
        c_type = detect_content_type(initial_content)
        if c_type == ContentType.HTML_ATOM:
            x = extract_xml(initial_content)
            c = detect_content_type(x)
            if c != ContentType.RAW_ATOM:
                raise ValueError(f"cannot extract atom from [[ {initial_content[:256]} ]] is not an atom [[ {x[:256]} ]] detected as {c.name}")
            return x, ContentType.RAW_ATOM
        elif c_type != ContentType.ANUBIS:
            return initial_content, c_type
            

        print("Ожидание проверки Anubis...")
        await page.wait_for_selector("text=Making sure you're not a bot!", timeout=timeout*1000)
        
        # Ожидание завершения progress bar
        try:
            progress_bar = page.locator("#progress")
            print("Ожидание завершения progress bar...")
            
            # Мониторим изменение progress bar
            start_time = time.time()
            while time.time() - start_time < timeout:
                class_list = progress_bar.get_attribute("class") or ""
                if "hidden" in class_list or "invisible" in class_list:
                    print("Progress bar скрыт!")
                    break
                await asyncio.sleep(0.5)
            else:
                print("Таймаут ожидания progress bar")
        except:
            print("Progress bar не найден, используем fallback ожидание")
            await page.wait_for_timeout(15000)  # 15 секунд
        
        # Дополнительное ожидание выполнения скриптов
        await page.wait_for_load_state("networkidle")
        
        # Проверка результата
        if  await page.query_selector("text=Making sure you're not a bot!"):
            print("Проверка Anubis не пройдена!")
            return None
        
        print("Проверка Anubis пройдена!")
        return await page.content()
    
    except Exception as e:
        print(f"Ошибка: {str(e)}")
        await page.screenshot(path="anubis_error.png")
        print("Скриншот сохранен как anubis_error.png")
        return None
    finally:
        await browser.close()

def parse_atom(xml_content: str) -> List[NewsEntry]:
    # Парсим XML с помощью feedparser
    feed = feedparser.parse(xml_content)
    entries = []
    
    for entry in feed.entries:
        # Обработка контента (может быть в нескольких форматах)
        content_value = None
        if hasattr(entry, 'content'):
            for item in entry.content:
                if hasattr(item, 'value'):
                    content_value = item.value
                    break
        
        # Обработка даты обновления
        updated_str = ""
        if hasattr(entry, 'updated_parsed'):
            updated_str = time.strftime('%Y-%m-%dT%H:%M:%SZ', entry.updated_parsed)
        elif hasattr(entry, 'updated'):
            updated_str = entry.updated
        
        # Создаем объект записи
        entries.append(NewsEntry(
            title=entry.title,
            link=entry.link,
            id=entry.id,
            updated=updated_str,
            summary=entry.get('summary'),
            content=content_value
        ))
    
    return entries

# Пример использования
async def main():
    URL = "https://lore.kernel.org/all/new.atom"
    content, content_type = await  get_content_with_anubis(URL)
    if not content:
        raise ValueError("Не удалось загрузить контент")
    if content_type == ContentType.RAW_ATOM:
        news = parse_atom(content)
        print(display_news_entries(news))
            
await main()

Загрузка URL: https://lore.kernel.org/all/new.atom
+----+----------------------------------------------------+----------------------------------------------------+----------------------+
|    | title                                              | link                                               | updated              |
| 0  | Re: [PATCH 4/5] staging: vchiq_arm: Create keep... | https://lore.kernel.org/all/0c2840c2-241c-4f98-... | 2025-06-28T22:34:18Z |
+----+----------------------------------------------------+----------------------------------------------------+----------------------+
| 1  | [PATCH] wifi: rtw88: enable TX reports for the ... | https://lore.kernel.org/all/20250628223048.3597... | 2025-06-28T22:34:05Z |
+----+----------------------------------------------------+----------------------------------------------------+----------------------+
| 2  | Re: [PULL REQUEST] i2c-for-6.16-rc4                | https://lore.kernel.org/all/175114999690.230075... | 2025-06-28T22:32:51Z