# Краткое описание

Блокнот собирает тексты из VK, сайтов и Яндекс Отзывов и приводит их к единому формату `docs`. Запуск выполняется через UI в следующей ячейке.  
Исходный код расположен в скрытой ячейке между описанием и запуском.  
Для VK через SOIKA обязательно укажите **токен пользователя** в отдельном поле перед запуском.


In [None]:
from __future__ import annotations

import hashlib
import importlib
import importlib.util
import io
import itertools
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse

import pandas as pd
import requests
from bs4 import BeautifulSoup, Comment

import ipywidgets as widgets
from IPython.display import display, Markdown

_vk_spec = importlib.util.find_spec("soika")
if _vk_spec is None:
    VKParser = None  # type: ignore
    _soika_import_error = "SOIKA не установлена"
else:
    soika = importlib.import_module("soika")
    VKParser = getattr(soika, "VKParser", None)
    _soika_import_error = None if VKParser is not None else "В пакете soika нет VKParser"

_is_colab = importlib.util.find_spec("google.colab") is not None
if _is_colab:
    from google.colab import output  # type: ignore

    output.enable_custom_widget_manager()

HTML_TAG_RE = re.compile(r"<[^>]+>")
SPACE_RE = re.compile(r"\s+")
PUNCT_RUN_RE = re.compile(r"([!?.,])\1{2,}")


def clean_text_minimal(text: str) -> str:
    if text is None:
        return ""
    t = str(text)
    t = HTML_TAG_RE.sub(" ", t)
    t = PUNCT_RUN_RE.sub(r"\1\1", t)
    t = SPACE_RE.sub(" ", t).strip()
    return t


def parse_date_safe(s: str) -> Optional[pd.Timestamp]:
    s = (s or "").strip()
    if not s:
        return None
    try:
        return pd.to_datetime(s)
    except Exception:
        return None


# ----------------------------
# Извлечение содержимого из HTML
# ----------------------------

def _strip_noise(soup: BeautifulSoup) -> None:
    for tag in soup(["script", "style", "noscript", "svg", "canvas", "iframe"]):
        tag.decompose()
    for tag in soup.find_all(["nav", "footer", "header", "aside"]):
        tag.decompose()
    for c in soup.find_all(string=lambda x: isinstance(x, Comment)):
        c.extract()


def _get_title(soup: BeautifulSoup) -> Optional[str]:
    for attr, val in [("property", "og:title"), ("name", "twitter:title")]:
        node = soup.find("meta", attrs={attr: val})
        if node and node.get("content"):
            return clean_text_minimal(node["content"])

    if soup.title and soup.title.get_text(strip=True):
        return clean_text_minimal(soup.title.get_text(" ", strip=True))

    h1 = soup.find("h1")
    if h1 and h1.get_text(strip=True):
        return clean_text_minimal(h1.get_text(" ", strip=True))

    return None


def _safe_to_datetime(value: Optional[str]) -> Optional[pd.Timestamp]:
    if not value:
        return None
    dt = pd.to_datetime(value, errors="coerce", utc=True)
    if pd.isna(dt):
        return None
    return dt


def _get_date(soup: BeautifulSoup) -> Optional[pd.Timestamp]:
    meta_candidates = [
        ("property", "article:published_time"),
        ("property", "article:modified_time"),
        ("property", "og:updated_time"),
        ("name", "pubdate"),
        ("name", "publishdate"),
        ("name", "timestamp"),
        ("name", "date"),
        ("name", "DC.date.issued"),
        ("name", "DC.Date"),
        ("itemprop", "datePublished"),
        ("itemprop", "dateModified"),
    ]
    for attr, val in meta_candidates:
        node = soup.find("meta", attrs={attr: val})
        if node and node.get("content"):
            dt = _safe_to_datetime(node["content"])
            if dt is not None:
                return dt

    time_tag = soup.find("time")
    if time_tag:
        dt = _safe_to_datetime(time_tag.get("datetime"))
        if dt is not None:
            return dt
    return None


def _node_text_len(node) -> int:
    if not hasattr(node, "get_text"):
        return 0
    return len(node.get_text(" ", strip=True))


def _extract_main_text(
    soup: BeautifulSoup, *, selector: Optional[str] = None, min_chars: int = 400
) -> Tuple[str, Dict[str, Any]]:
    meta: Dict[str, Any] = {"extractor": None}

    if selector:
        nodes = soup.select(selector)
        if nodes:
            parts = [n.get_text("\n", strip=True) for n in nodes]
            text = clean_text_minimal("\n".join(parts))
            meta["extractor"] = f"css:{selector}"
            if len(text) >= min_chars:
                return text, meta
            meta["extractor_fallback"] = "too_short"

    for tag_name in ["article", "main"]:
        node = soup.find(tag_name)
        if node:
            text = clean_text_minimal(node.get_text("\n", strip=True))
            if len(text) >= min_chars:
                meta["extractor"] = tag_name
                return text, meta

    candidates = []
    for key in ["content", "article", "post", "entry", "text", "body", "main"]:
        candidates.extend(soup.find_all(attrs={"class": re.compile(key, re.I)}))
        candidates.extend(soup.find_all(attrs={"id": re.compile(key, re.I)}))

    candidates.extend(soup.find_all(["div", "section"]))

    best = None
    best_len = 0
    for node in candidates:
        l = _node_text_len(node)
        if l > best_len:
            best = node
            best_len = l

    if best is not None and best_len > 0:
        meta["extractor"] = "largest_block"
        text = clean_text_minimal(best.get_text("\n", strip=True))
        return text, meta

    meta["extractor"] = "none"
    return "", meta


# ----------------------------
# HTTP helpers
# ----------------------------

@dataclass
class FetchConfig:
    timeout: int = 20
    user_agent: str = (
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari"
    )
    max_bytes: int = 5_000_000  # 5MB


def _fetch_html(url: str, cfg: FetchConfig, session: requests.Session) -> Tuple[Optional[str], Dict[str, Any]]:
    meta: Dict[str, Any] = {"status_code": None, "final_url": None}
    try:
        r = session.get(
            url,
            headers={"User-Agent": cfg.user_agent, "Accept": "text/html,application/xhtml+xml"},
            timeout=cfg.timeout,
            allow_redirects=True,
        )
        meta["status_code"] = r.status_code
        meta["final_url"] = r.url
        r.raise_for_status()

        content = r.content
        if content and len(content) > cfg.max_bytes:
            meta["error"] = f"response_too_large:{len(content)}"
            return None, meta

        if not r.encoding or r.encoding.lower() in {"iso-8859-1", "latin1", "ascii"}:
            if r.apparent_encoding:
                r.encoding = r.apparent_encoding

        return r.text, meta
    except Exception as e:
        meta["error"] = repr(e)
        return None, meta


def _stable_id(url: str, text: str) -> str:
    h = hashlib.sha1((url + "\n" + (text or "")[:4000]).encode("utf-8", errors="ignore")).hexdigest()[:16]
    return f"web_{h}"


def parse_websites(
    urls: List[str],
    *,
    selector: Optional[str] = None,
    min_chars: int = 400,
    cfg: Optional[FetchConfig] = None,
) -> pd.DataFrame:
    cfg = cfg or FetchConfig()
    rows: List[Dict[str, Any]] = []

    with requests.Session() as session:
        for url in urls:
            url = (url or "").strip()
            if not url:
                continue

            html, fetch_meta = _fetch_html(url, cfg, session=session)
            if not html:
                rows.append({
                    "doc_id": _stable_id(url, ""),
                    "source": "website",
                    "text_raw": "",
                    "date": None,
                    "url": url,
                    "meta": {"fetch": fetch_meta},
                })
                continue

            soup = BeautifulSoup(html, "lxml")
            _strip_noise(soup)

            title = _get_title(soup)
            date = _get_date(soup)
            text_raw, extract_meta = _extract_main_text(soup, selector=selector, min_chars=min_chars)

            meta = {
                "fetch": fetch_meta,
                "title": title,
                "date_extracted": date.isoformat() if isinstance(date, pd.Timestamp) else None,
                "extraction": extract_meta,
                "domain": urlparse(fetch_meta.get("final_url") or url).netloc,
            }

            rows.append({
                "doc_id": _stable_id(fetch_meta.get("final_url") or url, text_raw),
                "source": "website",
                "text_raw": text_raw,
                "date": date,
                "url": fetch_meta.get("final_url") or url,
                "meta": meta,
            })

    df = pd.DataFrame(rows)
    if df.empty:
        df = pd.DataFrame(columns=["doc_id", "source", "text_raw", "date", "url", "meta"])

    df["text_clean"] = df["text_raw"].map(clean_text_minimal)
    return df


def read_links_from_upload(upload: widgets.FileUpload) -> List[str]:
    if not upload.value:
        return []
    _, file_info = next(iter(upload.value.items()))
    content = file_info.get("content")
    if content is None:
        return []
    try:
        df = pd.read_excel(io.BytesIO(content))
    except Exception:
        return []

    for col in ["link", "links", "url", "urls", "group", "vk", "page"]:
        if col in df.columns:
            series = df[col]
            break
    else:
        series = df.iloc[:, 0]
    return [str(x).strip() for x in series.dropna().astype(str).tolist() if str(x).strip()]


def normalize_vk_domains(items: List[str]) -> List[str]:
    normalized = []
    for raw in items:
        raw = (raw or "").strip()
        if not raw:
            continue
        raw = raw.lstrip("@").replace("https://", "").replace("http://", "")
        if raw.startswith("vk.com/"):
            raw = raw.split("vk.com/")[-1]
        if "/" in raw:
            raw = raw.split("/")[0]
        normalized.append(raw)
    return normalized


def collect_vk_soika(groups: List[str], token: str, cutoff: Optional[pd.Timestamp], limit: int = 500) -> pd.DataFrame:
    if VKParser is None:
        raise ImportError(_soika_import_error or "SOIKA не установлена")
    cutoff_str = cutoff.strftime("%Y-%m-%d") if cutoff is not None else "1970-01-01"
    frames = []
    parser = VKParser()
    message_id_counter = itertools.count(1)
    for group in groups:
        df_raw = parser.run_parser(domain=group, access_token=token, cutoff_date=cutoff_str, number_of_messages=limit)
        if df_raw is None or df_raw.empty:
            continue
        df = pd.DataFrame(df_raw)
        if df.empty:
            continue
        df = df.rename(columns={"text": "text_raw", "date": "date_raw"})
        df["date"] = pd.to_datetime(df["date_raw"], errors="coerce")

        df["vk_id"] = df.get("id")
        df["vk_parent_id"] = df.get("parent_message_id")
        df["message_id"] = [next(message_id_counter) for _ in range(len(df))]
        df["parent_message_id"] = None

        id_map = {
            vk_id: message_id
            for vk_id, message_id in zip(df["vk_id"], df["message_id"])
            if pd.notna(vk_id)
        }
        df["parent_message_id"] = df["vk_parent_id"].map(id_map)

        df["doc_id"] = df.apply(lambda r: f"vk_{group}_{r.get('vk_id')}", axis=1)
        df["url"] = df.apply(lambda r: f"https://vk.com/wall{r.get('from_id', '')}_{r.get('vk_id', '')}", axis=1)
        df["meta"] = df.apply(lambda r: {
            "group": group,
            "type": r.get("type"),
            "likes": r.get("likes.count"),
            "reposts": r.get("reposts.count"),
            "views": r.get("views.count"),
            "link": r.get("link"),
            "vk_id": r.get("vk_id"),
            "vk_parent_id": r.get("vk_parent_id"),
            "message_id": r.get("message_id"),
            "parent_message_id": r.get("parent_message_id"),
        }, axis=1)
        frames.append(
            df[
                [
                    "doc_id",
                    "text_raw",
                    "date",
                    "url",
                    "meta",
                    "message_id",
                    "parent_message_id",
                    "vk_id",
                    "vk_parent_id",
                ]
            ]
        )
    if not frames:
        return pd.DataFrame(
            columns=["doc_id", "text_raw", "date", "url", "meta", "message_id", "parent_message_id", "vk_id", "vk_parent_id"]
        )
    out = pd.concat(frames, ignore_index=True)
    out["source"] = "vk"
    out["text_clean"] = out["text_raw"].map(clean_text_minimal)
    return out


def collect_vk_stub(group_ids: List[str], since: Optional[pd.Timestamp], until: Optional[pd.Timestamp]) -> pd.DataFrame:
    rows = [
        {
            "doc_id": "vk_1",
            "source": "vk",
            "text_raw": "Люблю этот район — здесь тихо и много зелени. Но парковки не хватает!!!",
            "date": pd.Timestamp("2025-09-12"),
            "url": None,
            "meta": {"group_id": group_ids[0] if group_ids else None},
        },
        {
            "doc_id": "vk_2",
            "source": "vk",
            "text_raw": "Опять перекопали улицу у станции. Дойти до остановки — квест.",
            "date": pd.Timestamp("2025-10-03"),
            "url": None,
            "meta": {"group_id": group_ids[0] if group_ids else None},
        },
    ]
    return pd.DataFrame(rows)


def collect_websites_stub(urls: List[str], selector: Optional[str], min_chars: int, timeout: int) -> pd.DataFrame:
    if urls:
        return parse_websites(urls, selector=selector, min_chars=min_chars, cfg=FetchConfig(timeout=timeout))
    rows = [
        {
            "doc_id": "web_1",
            "source": "website",
            "text_raw": "<article>Исторический квартал меняется: появляются новые кафе и мастерские.</article>",
            "date": None,
            "url": urls[0] if urls else None,
            "meta": {"title": "Заглушка статьи"},
        }
    ]
    df = pd.DataFrame(rows)
    df["text_clean"] = df["text_raw"].map(clean_text_minimal)
    return df


def collect_yandex_reviews_stub(urls: List[str]) -> pd.DataFrame:
    rows = [
        {
            "doc_id": "ya_1",
            "source": "yandex_reviews",
            "text_raw": "Удобно добираться, но внутри тесно. Персонал норм.",
            "date": None,
            "url": urls[0] if urls else None,
            "meta": {"rating": 3, "place": "Заглушка"},
        }
    ]
    df = pd.DataFrame(rows)
    df["text_clean"] = df["text_raw"].map(clean_text_minimal)
    return df


def standardize_docs(df: pd.DataFrame) -> pd.DataFrame:
    required_cols = ["doc_id", "source", "text_raw", "date", "url", "meta"]
    for c in required_cols:
        if c not in df.columns:
            df[c] = None
    df = df[required_cols].copy()
    df["text_clean"] = df["text_raw"].map(clean_text_minimal)
    return df


docs: Optional[pd.DataFrame] = None


def build_ui() -> None:
    global docs

    source_dd = widgets.Dropdown(
        options=[("VK (стены групп)", "vk"), ("Сайты", "website"), ("Яндекс Отзывы", "yandex_reviews")],
        value="vk",
        description="Источник:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="420px"),
    )

    input_ta = widgets.Textarea(
        value="",
        placeholder="""Для VK: ссылки на группы или домены через запятую/строку
Для сайтов: URL-ы по строкам
Для Яндекс Отзывов: URL-ы по строкам""",
        description="Ввод вручную:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="820px", height="140px"),
    )

    single_link_txt = widgets.Text(
        value="",
        placeholder="Быстрая вставка одной ссылки",
        description="Одна ссылка:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="560px"),
    )

    links_upload = widgets.FileUpload(
        accept=".xlsx",
        multiple=False,
        description="XLSX со ссылками",
        style={"description_width": "initial"},
    )

    since_txt = widgets.Text(
        value="",
        placeholder="YYYY-MM-DD (необязательно)",
        description="Период с:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="260px"),
    )

    until_txt = widgets.Text(
        value="",
        placeholder="YYYY-MM-DD (необязательно)",
        description="Период по:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="260px"),
    )

    vk_token_txt = widgets.Password(
        value="",
        placeholder="Токен VK для SOIKA (https://dev.vk.com/api/access-token)",
        description="VK токен:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="480px"),
    )

    selector_main = widgets.Text(
        value="",
        placeholder="Для сайтов: article / .post-content / main",
        description="CSS селектор:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="420px"),
    )

    min_chars_main = widgets.IntSlider(
        value=400,
        min=100,
        max=4000,
        step=50,
        description="Мин. длина текста:",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="420px"),
    )

    timeout_main = widgets.IntSlider(
        value=20,
        min=5,
        max=120,
        step=5,
        description="Таймаут (сек):",
        style={"description_width": "initial"},
        layout=widgets.Layout(width="420px"),
    )

    demo_btn = widgets.Button(description="Загрузить DEMO-корпус", button_style="info")
    run_btn = widgets.Button(description="Запустить сбор", button_style="primary")

    out = widgets.Output()

    website_opts = widgets.Accordion(children=[widgets.VBox([selector_main, min_chars_main, timeout_main])])
    website_opts.set_title(0, "Опции веб-парсера (для источника 'Сайты')")

    vk_opts = widgets.Accordion(children=[vk_token_txt])
    vk_opts.set_title(0, "Параметры VK / SOIKA")

    upload_box = widgets.VBox(
        [
            widgets.HBox([links_upload, single_link_txt]),
            input_ta,
        ]
    )

    main_controls = widgets.VBox(
        [
            source_dd,
            upload_box,
            widgets.HBox([since_txt, until_txt]),
            vk_opts,
            website_opts,
            widgets.HBox([demo_btn, run_btn]),
            out,
        ]
    )

    def get_inputs() -> Dict[str, Any]:
        src = source_dd.value
        manual_raw = input_ta.value.strip()
        single = single_link_txt.value.strip()
        since = parse_date_safe(since_txt.value)
        until = parse_date_safe(until_txt.value)

        manual_items: List[str]
        if src == "vk":
            manual_items = [x.strip() for x in re.split(r"[\n,;]+", manual_raw) if x.strip()]
        else:
            manual_items = [x.strip() for x in manual_raw.splitlines() if x.strip()]

        items = manual_items
        if single:
            items.append(single)
        items.extend(read_links_from_upload(links_upload))

        return {"source": src, "items": items, "since": since, "until": until}

    def load_demo(_=None) -> None:
        global docs
        with out:
            out.clear_output()
            demo = pd.concat(
                [
                    collect_vk_stub(["demo_group"], None, None),
                    collect_websites_stub(
                        ["https://example.com/article"],
                        selector_main.value or None,
                        min_chars_main.value,
                        timeout_main.value,
                    ),
                    collect_yandex_reviews_stub(["https://example.com/reviews"]),
                ],
                ignore_index=True,
            )
            docs = standardize_docs(demo)
            display(Markdown("✅ Загружен DEMO-корпус. Ниже — первые строки `docs`."))
            display(docs.head(10))

    def run_pipeline(_=None) -> None:
        global docs
        cfg = get_inputs()
        src = cfg["source"]
        items = cfg["items"]

        with out:
            out.clear_output()
            display(
                Markdown(
                    f"**Источник:** `{src}`  \n"
                    + f"**Элементы ввода:** {len(items)}  \n"
                    + f"**Период:** {cfg['since']} — {cfg['until']}"
                )
            )

            try:
                if src == "vk":
                    if VKParser is None:
                        raise RuntimeError(_soika_import_error or "SOIKA недоступна")
                    token = vk_token_txt.value.strip()
                    if not token:
                        raise ValueError("Для VK через SOIKA укажи токен пользователя в отдельном поле.")
                    groups = normalize_vk_domains(items)
                    if not groups:
                        raise ValueError("Список групп VK пуст. Добавь домены или ссылки на группы.")
                    df = collect_vk_soika(groups, token, cfg["since"])
                elif src == "website":
                    if not items:
                        raise ValueError("Список URL пуст. Добавь ссылки или загрузи XLSX.")
                    df = parse_websites(
                        items,
                        selector=selector_main.value or None,
                        min_chars=int(min_chars_main.value),
                        cfg=FetchConfig(timeout=int(timeout_main.value)),
                    )
                else:
                    if not items:
                        raise ValueError("Список URL пуст. Добавь ссылки или загрузи XLSX.")
                    df = collect_yandex_reviews_stub(items)

                docs = standardize_docs(df)
                display(Markdown("✅ Получена таблица `docs` (в формате для дальнейших разделов)."))
                display(docs)
            except Exception as e:
                display(Markdown(f"❌ Ошибка: `{e}`"))
                return

    demo_btn.on_click(load_demo)
    run_btn.on_click(run_pipeline)

    display(main_controls)


In [None]:
build_ui()