# Парсинг текстов с сайтов и соцсетей

Этот блокнот загружает скрипты из репозитория и помогает быстро получить тексты из:
- статей и страниц сайтов (универсальный парсер)
- отзывов Яндекса
- групп ВК

Ниже — установка зависимостей, импорт функций и интерактивные формы для ввода URL/токенов.


In [None]:
#@title Установка зависимостей
!pip -q install requests beautifulsoup4 lxml ipywidgets pandas shapely pymorphy3 flair folium tqdm osmnx bertopic sentence-transformers plotly hdbscan

import os
import sys
from pathlib import Path

# Укажите репозиторий в формате "owner/repo" при необходимости
REPO_SLUG = os.environ.get("GITHUB_REPO", "Sandrro/digital_identity")
REPO_URL = f"https://github.com/{REPO_SLUG}"
REPO_DIR = Path("/content/digital_identity")

if not REPO_DIR.exists():
    !git clone --depth 1 {REPO_URL} {REPO_DIR}

sys.path.insert(0, str(REPO_DIR))


In [None]:
#@title Импорт парсеров
import sys
sys.path.insert(0, "/content/digital_identity/parsers")
from vk_group_parser import VKGroupParser
from website_parser import parse_websites
from yandex_reviews_parser import fetch_yandex_reviews


In [None]:
#@title Интерактивный запуск
import csv
from datetime import date, datetime, timezone
from pathlib import Path

import ipywidgets as widgets
import pandas as pd
from IPython.display import display

OUTPUT_CSV = Path("/content/digital_identity/parsed_texts.csv")

parser_dropdown = widgets.Dropdown(
    options=[
        ("Сайты", "website"),
        ("Яндекс отзывы", "yandex"),
        ("ВК группа", "vk"),
    ],
    description="Парсер:"
)
url_input = widgets.Text(
    value="",
    description="URL/домен:",
    placeholder="https://...",
    layout=widgets.Layout(width="80%"),
)
token_input = widgets.Password(
    value="",
    description="VK токен:",
    placeholder="Требуется только для ВК",
)
max_items = widgets.IntSlider(
    value=20,
    min=1,
    max=200,
    step=1,
    description="Лимит:"
)
run_button = widgets.Button(description="Запустить")
output = widgets.Output()

def _normalize_date(value):
    if value in (None, ""):
        return None
    if isinstance(value, datetime):
        return value.date().isoformat()
    if isinstance(value, date):
        return value.isoformat()
    parsed = pd.to_datetime(value, errors="coerce", utc=True)
    if pd.isna(parsed):
        return str(value)[:10]
    return parsed.date().isoformat()

def _format_vk_timestamp(ts):
    if ts is None:
        return None
    return datetime.fromtimestamp(ts, tz=timezone.utc).date().isoformat()

def _write_rows(rows):
    if not rows:
        return
    OUTPUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    with OUTPUT_CSV.open("w", newline="", encoding="utf-8") as handle:
        writer = csv.DictWriter(
            handle,
            fieldnames=["source", "url", "title", "timestamp", "text"],
        )
        writer.writeheader()
        writer.writerows(rows)

def run_parser(_):
    output.clear_output()
    with output:
        rows = []
        parser = parser_dropdown.value
        if parser == "website":
            df = parse_websites([url_input.value])
            for _, row in df.iterrows():
                meta = row.get("meta") or {}
                rows.append({
                    "source": row.get("source"),
                    "url": row.get("url"),
                    "title": meta.get("title"),
                    "timestamp": _normalize_date(row.get("date")),
                    "text": row.get("text_clean") or row.get("text_raw"),
                })
        elif parser == "yandex":
            reviews = fetch_yandex_reviews(url_input.value, max_reviews=max_items.value)
            for review in reviews:
                rows.append({
                    "source": "yandex",
                    "url": url_input.value,
                    "title": None,
                    "timestamp": None,
                    "text": review.text,
                })
        else:
            if not token_input.value:
                raise ValueError("Нужен VK токен.")
            vk = VKGroupParser(token_input.value)
            for post in vk.iter_posts(url_input.value, total=max_items.value):
                rows.append({
                    "source": "vk",
                    "url": url_input.value,
                    "title": None,
                    "timestamp": _format_vk_timestamp(post.date),
                    "text": post.text,
                })
        _write_rows(rows)
        if rows:
            display(pd.DataFrame(rows))

run_button.on_click(run_parser)
display(parser_dropdown, url_input, token_input, max_items, run_button, output)


## Геокодирование и визуализация адресов

Ниже добавлен импорт скрипта геокодера и UI для запуска.
В качестве входа используется файл, сохранённый в предыдущей ячейке
(`parsed_texts.csv`). Скрипт извлекает топонимы, геокодирует их через Photon,
сохраняет результаты в GeoJSON и строит интерактивную карту.


In [ ]:
#@title Геокодер: импорт и запуск с UI
from pathlib import Path

import geopandas as gpd
import ipywidgets as widgets
import pandas as pd
from IPython.display import display

import sys
sys.path.insert(0, "/content/digital_identity/parsers")
from geocoder import geocode_texts, build_geojson, save_geojson, bbox_from_area_name

DEFAULT_INPUT = Path("/content/digital_identity/parsed_texts.csv")
UPLOAD_TARGET = Path("/content/digital_identity/uploaded_texts")

file_picker = widgets.Text(
    value=str(DEFAULT_INPUT),
    description="CSV/текст:",
    layout=widgets.Layout(width="80%"),
)
upload_widget = widgets.FileUpload(accept=".csv,.txt", multiple=False)
upload_button = widgets.Button(description="Загрузить файл")
upload_status = widgets.Label()

bbox_input = widgets.Text(
    value="",
    description="BBox:",
    placeholder="minx,miny,maxx,maxy (необязательно)",
    layout=widgets.Layout(width="80%"),
)
bbox_name_input = widgets.Text(
    value="",
    description="Территория:",
    placeholder="Например, Москва",
    layout=widgets.Layout(width="80%"),
)
bbox_resolve = widgets.Button(description="Определить BBox")
bbox_status = widgets.Label()

geojson_output = widgets.Text(
    value="/content/digital_identity/geocoded_points.geojson",
    description="GeoJSON:",
    layout=widgets.Layout(width="80%"),
)
run_geocoder = widgets.Button(description="Геокодировать")
geo_output = widgets.Output()


def _load_texts(path: Path) -> list[str]:
    if path.suffix.lower() == ".txt":
        return [line.rstrip() for line in path.read_text(encoding="utf-8").splitlines()]
    df = pd.read_csv(path)
    if "text" not in df.columns:
        raise ValueError("CSV должен содержать колонку text")
    return df["text"].fillna("").astype(str).tolist()


def _extract_upload_payload():
    upload_value = upload_widget.value
    if isinstance(upload_value, dict):
        if not upload_value:
            return None, None
        return next(iter(upload_value.items()))
    if isinstance(upload_value, (list, tuple)):
        if not upload_value:
            return None, None
        payload = upload_value[0]
        name = payload.get("name") or payload.get("metadata", {}).get("name")
        return name, payload
    return None, None


def _handle_upload(_):
    upload_status.value = ""
    name, payload = _extract_upload_payload()
    if not name or not payload:
        upload_status.value = "Выберите файл для загрузки"
        return
    suffix = Path(name).suffix or ".txt"
    target = UPLOAD_TARGET.with_suffix(suffix)
    target.write_bytes(payload["content"])
    file_picker.value = str(target)
    upload_status.value = f"Файл сохранен: {target}"


def _resolve_bbox(_):
    bbox_status.value = ""
    name = bbox_name_input.value.strip()
    if not name:
        bbox_status.value = "Введите название территории"
        return
    try:
        bbox_value = bbox_from_area_name(name)
    except Exception as exc:
        bbox_status.value = f"Ошибка: {exc}"
        return
    bbox_input.value = bbox_value
    bbox_status.value = "BBox обновлен"


def _results_to_gdf(results: list) -> gpd.GeoDataFrame:
    rows = [
        {
            "geometry": res.geometry,
            "location": res.location,
            "osm_id": res.osm_id,
            "text": res.source_text,
        }
        for res in results
        if res.geometry is not None
    ]
    return gpd.GeoDataFrame(rows, geometry="geometry", crs="EPSG:4326")


def _run_geocoding(_):
    geo_output.clear_output()
    with geo_output:
        csv_path = Path(file_picker.value).expanduser()
        if not csv_path.exists():
            raise FileNotFoundError(f"Файл не найден: {csv_path}")
        texts = _load_texts(csv_path)
        bbox = bbox_input.value.strip() or None
        bbox_name = bbox_name_input.value.strip() or None
        results = geocode_texts(texts, bbox=bbox, bbox_name=bbox_name)
        geojson_data = build_geojson(results)
        save_geojson(geojson_output.value, geojson_data)
        display({"geojson": geojson_output.value})
        gdf = _results_to_gdf(results)
        if gdf.empty:
            display("Нет точек для отображения на карте.")
        else:
            display(gdf.explore())


upload_button.on_click(_handle_upload)
bbox_resolve.on_click(_resolve_bbox)
run_geocoder.on_click(_run_geocoding)

display(
    file_picker,
    widgets.HBox([upload_widget, upload_button]),
    upload_status,
    bbox_input,
    widgets.HBox([bbox_name_input, bbox_resolve]),
    bbox_status,
    geojson_output,
    run_geocoder,
    geo_output,
)


In [None]:
#@title Классификатор эмоций: импорт и запуск с UI
from pathlib import Path

import geopandas as gpd
import ipywidgets as widgets
import matplotlib.pyplot as plt
import pandas as pd
from IPython.display import display

import sys
sys.path.insert(0, "/content/digital_identity/parsers")
from emotion_classifier import models_initialization, classify_emotions

DEFAULT_INPUT = Path("/content/digital_identity/parsed_texts.csv")
UPLOAD_TARGET = Path("/content/digital_identity/uploaded_texts")

file_picker = widgets.Text(
    value=str(DEFAULT_INPUT),
    description="CSV/GeoJSON:",
    layout=widgets.Layout(width="80%"),
)
upload_widget = widgets.FileUpload(accept=".csv,.geojson,.json", multiple=False)
upload_button = widgets.Button(description="Загрузить файл")
upload_status = widgets.Label()

output_path = widgets.Text(
    value="/content/digital_identity/emotions_output.csv",
    description="Выход:",
    layout=widgets.Layout(width="80%"),
)
run_classifier = widgets.Button(description="Классифицировать")
classifier_output = widgets.Output()


def _default_output_path(input_path: Path) -> Path:
    suffix = input_path.suffix.lower()
    if suffix in {".geojson", ".json"}:
        return input_path.with_name(f"{input_path.stem}_emotions.geojson")
    return input_path.with_name(f"{input_path.stem}_emotions.csv")


def _ensure_output_path(raw_value: str, input_path: Path) -> Path:
    target = Path(raw_value) if raw_value else _default_output_path(input_path)
    suffix = ".geojson" if input_path.suffix.lower() in {".geojson", ".json"} else ".csv"
    if target.suffix.lower() != suffix:
        target = target.with_suffix(suffix)
    return target


def _load_dataset(path: Path):
    suffix = path.suffix.lower()
    if suffix in {".geojson", ".json"}:
        gdf = gpd.read_file(path)
        if "text" not in gdf.columns:
            raise ValueError("GeoJSON должен содержать колонку text")
        return gdf, "geojson"
    if suffix == ".csv":
        df = pd.read_csv(path)
        if "text" not in df.columns:
            raise ValueError("CSV должен содержать колонку text")
        return df, "csv"
    raise ValueError("Поддерживаются только CSV или GeoJSON файлы")


def _extract_upload_payload():
    upload_value = upload_widget.value
    if isinstance(upload_value, dict):
        if not upload_value:
            return None, None
        return next(iter(upload_value.items()))
    if isinstance(upload_value, (list, tuple)):
        if not upload_value:
            return None, None
        payload = upload_value[0]
        name = payload.get("name") or payload.get("metadata", {}).get("name")
        return name, payload
    return None, None


def _handle_upload(_):
    upload_status.value = ""
    name, payload = _extract_upload_payload()
    if not name or not payload:
        upload_status.value = "Выберите файл для загрузки"
        return
    suffix = Path(name).suffix or ".csv"
    target = UPLOAD_TARGET.with_suffix(suffix)
    target.write_bytes(payload["content"])
    file_picker.value = str(target)
    upload_status.value = f"Файл сохранен: {target}"


def _plot_emotions(df: pd.DataFrame) -> None:
    counts = df["emotion"].value_counts()
    fig, ax = plt.subplots(figsize=(6, 6))
    ax.pie(counts.values, labels=counts.index, autopct="%1.1f%%")
    ax.set_title("Распределение эмоций")
    display(fig)


def _run_classification(_):
    classifier_output.clear_output()
    with classifier_output:
        input_path = Path(file_picker.value).expanduser()
        if not input_path.exists():
            raise FileNotFoundError(f"Файл не найден: {input_path}")
        data, data_type = _load_dataset(input_path)
        if models_initialization._classification_model is None:
            models_initialization.init_models()
        texts = data["text"].fillna("").astype(str).tolist()
        data["emotion"] = classify_emotions(texts)
        target = _ensure_output_path(output_path.value, input_path)
        output_path.value = str(target)
        if data_type == "geojson":
            data.to_file(target, driver="GeoJSON")
        else:
            data.to_csv(target, index=False)
        display({"output": str(target)})
        display(data.head())
        _plot_emotions(data)


upload_button.on_click(_handle_upload)
run_classifier.on_click(_run_classification)

display(
    file_picker,
    widgets.HBox([upload_widget, upload_button]),
    upload_status,
    output_path,
    run_classifier,
    classifier_output,
)


## Тематическое моделирование (BERTopic)

Модуль принимает CSV/GeoJSON с колонкой `text`, кластеризует тексты с помощью BERTopic, добавляет номер и ключевые слова темы, а также строит визуализации результатов. Если указана колонка времени, дополнительно появляется динамика тем.


In [None]:
#@title BERTopic: кластеризация тем и визуализация
from pathlib import Path

import geopandas as gpd
import ipywidgets as widgets
import pandas as pd
import plotly.io as pio
from IPython.display import display

import sys
sys.path.insert(0, "/content/digital_identity")
from parsers.topic_modeler import attach_topics, train_topic_model

pio.renderers.default = "colab"

DEFAULT_INPUT = Path("/content/digital_identity/parsed_texts.csv")
UPLOAD_TARGET = Path("/content/digital_identity/uploaded_topics")

file_picker = widgets.Text(
    value=str(DEFAULT_INPUT),
    description="CSV/GeoJSON:",
    layout=widgets.Layout(width="80%"),
)
upload_widget = widgets.FileUpload(accept=".csv,.geojson,.json", multiple=False)
upload_button = widgets.Button(description="Загрузить файл")
upload_status = widgets.Label()

text_column = widgets.Text(
    value="text",
    description="Колонка текста:",
    layout=widgets.Layout(width="80%"),
)
timestamp_column = widgets.Text(
    value="",
    description="Колонка времени:",
    layout=widgets.Layout(width="80%"),
)

language_dropdown = widgets.Dropdown(
    options=[
        ("Многоязычный", "multilingual"),
        ("Русский", "russian"),
        ("English", "english"),
    ],
    value="multilingual",
    description="Язык:",
)
embedding_model = widgets.Text(
    value="",
    description="Embedding модель:",
    placeholder="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
    layout=widgets.Layout(width="80%"),
)

cluster_method = widgets.ToggleButtons(
    options=[("HDBSCAN", "hdbscan"), ("k-means", "kmeans")],
    value="hdbscan",
    description="Кластеризация:",
)
min_topic_size = widgets.IntSlider(
    value=10,
    min=2,
    max=50,
    step=1,
    description="Мин. размер темы:",
    layout=widgets.Layout(width="70%"),
)
min_samples = widgets.IntSlider(
    value=5,
    min=1,
    max=50,
    step=1,
    description="min_samples:",
    layout=widgets.Layout(width="70%"),
)
n_clusters = widgets.IntSlider(
    value=10,
    min=2,
    max=100,
    step=1,
    description="k-means k:",
    layout=widgets.Layout(width="70%"),
)

nr_topics = widgets.IntText(
    value=0,
    description="Число тем (0=auto):",
)
top_n_keywords = widgets.IntSlider(
    value=5,
    min=3,
    max=15,
    step=1,
    description="Ключевые слова:",
    layout=widgets.Layout(width="70%"),
)
reduce_frequent_words = widgets.Checkbox(
    value=True,
    description="Фильтровать частотные слова (TF-IDF)",
)

output_path = widgets.Text(
    value="/content/digital_identity/topics_output.csv",
    description="Выход:",
    layout=widgets.Layout(width="80%"),
)

visualizations = widgets.SelectMultiple(
    options=[
        ("Карта тем", "topics"),
        ("Бархарт", "barchart"),
        ("Иерархия", "hierarchy"),
        ("Тепловая карта", "heatmap"),
        ("Темы во времени", "over_time"),
    ],
    value=("topics", "barchart"),
    description="Визуализации:",
    layout=widgets.Layout(width="70%"),
)

run_model = widgets.Button(description="Построить темы")
model_output = widgets.Output()


def _default_output_path(input_path: Path) -> Path:
    suffix = input_path.suffix.lower()
    if suffix in {".geojson", ".json"}:
        return input_path.with_name(f"{input_path.stem}_topics.geojson")
    return input_path.with_name(f"{input_path.stem}_topics.csv")


def _ensure_output_path(raw_value: str, input_path: Path) -> Path:
    target = Path(raw_value) if raw_value else _default_output_path(input_path)
    suffix = ".geojson" if input_path.suffix.lower() in {".geojson", ".json"} else ".csv"
    if target.suffix.lower() != suffix:
        target = target.with_suffix(suffix)
    return target


def _load_dataset(path: Path):
    suffix = path.suffix.lower()
    if suffix in {".geojson", ".json"}:
        gdf = gpd.read_file(path)
        return gdf, "geojson"
    if suffix == ".csv":
        df = pd.read_csv(path)
        return df, "csv"
    raise ValueError("Поддерживаются только CSV или GeoJSON файлы")


def _extract_upload_payload():
    upload_value = upload_widget.value
    if isinstance(upload_value, dict):
        if not upload_value:
            return None, None
        return next(iter(upload_value.items()))
    if isinstance(upload_value, (list, tuple)):
        if not upload_value:
            return None, None
        payload = upload_value[0]
        name = payload.get("name") or payload.get("metadata", {}).get("name")
        return name, payload
    return None, None


def _handle_upload(_):
    upload_status.value = ""
    name, payload = _extract_upload_payload()
    if not name or not payload:
        upload_status.value = "Выберите файл для загрузки"
        return
    suffix = Path(name).suffix or ".csv"
    target = UPLOAD_TARGET.with_suffix(suffix)
    target.write_bytes(payload["content"])
    file_picker.value = str(target)
    upload_status.value = f"Файл сохранен: {target}"


def _parse_timestamps(data: pd.DataFrame, column_name: str):
    if not column_name:
        return None
    if column_name not in data.columns:
        raise ValueError(f"Колонка времени '{column_name}' не найдена")
    timestamps = pd.to_datetime(data[column_name], errors="coerce")
    if timestamps.isna().all():
        raise ValueError("Не удалось распознать временные метки")
    return timestamps


def _run_topic_model(_):
    model_output.clear_output()
    with model_output:
        input_path = Path(file_picker.value).expanduser()
        if not input_path.exists():
            raise FileNotFoundError(f"Файл не найден: {input_path}")
        data, data_type = _load_dataset(input_path)
        text_col = text_column.value.strip() or "text"
        if text_col not in data.columns:
            raise ValueError(f"Колонка текста '{text_col}' не найдена")
        texts = data[text_col].fillna("").astype(str).tolist()
        timestamps = _parse_timestamps(data, timestamp_column.value.strip())

        result = train_topic_model(
            texts,
            language=language_dropdown.value,
            min_topic_size=min_topic_size.value,
            nr_topics=nr_topics.value or None,
            embedding_model=embedding_model.value.strip() or None,
            cluster_method=cluster_method.value,
            n_clusters=n_clusters.value if cluster_method.value == "kmeans" else None,
            min_samples=min_samples.value if cluster_method.value == "hdbscan" else None,
            reduce_frequent_words=reduce_frequent_words.value,
        )
        data = attach_topics(
            data,
            result.topics,
            result.model,
            top_n=top_n_keywords.value,
        )

        target = _ensure_output_path(output_path.value, input_path)
        output_path.value = str(target)
        if data_type == "geojson":
            data.to_file(target, driver="GeoJSON")
        else:
            data.to_csv(target, index=False)

        display({"output": str(target)})
        display(data.head())
        topic_info = result.model.get_topic_info()
        display(topic_info.head(10))

        available_topics = topic_info[topic_info["Topic"] != -1]
        topic_count = len(available_topics)

        selected = set(visualizations.value)
        if "topics" in selected:
            display(result.model.visualize_topics())
        if "barchart" in selected:
            display(result.model.visualize_barchart(top_n_topics=20))
        if "hierarchy" in selected:
            if topic_count < 2:
                display("Недостаточно тем для иерархии (нужно минимум 2).")
            else:
                display(result.model.visualize_hierarchy())
        if "heatmap" in selected:
            if topic_count < 2:
                display("Недостаточно тем для тепловой карты (нужно минимум 2).")
            else:
                display(result.model.visualize_heatmap())
        if "over_time" in selected:
            if timestamps is None:
                display("Для динамики тем укажите колонку времени.")
            else:
                topics_over_time = result.model.topics_over_time(
                    texts, timestamps, nr_bins=20
                )
                display(result.model.visualize_topics_over_time(topics_over_time))


def _toggle_cluster_controls(change=None):
    is_kmeans = cluster_method.value == "kmeans"
    n_clusters.layout.display = "" if is_kmeans else "none"
    min_samples.layout.display = "none" if is_kmeans else ""


cluster_method.observe(_toggle_cluster_controls, names="value")
_toggle_cluster_controls()

upload_button.on_click(_handle_upload)
run_model.on_click(_run_topic_model)

display(
    file_picker,
    widgets.HBox([upload_widget, upload_button]),
    upload_status,
    text_column,
    timestamp_column,
    language_dropdown,
    embedding_model,
    cluster_method,
    min_topic_size,
    min_samples,
    n_clusters,
    nr_topics,
    top_n_keywords,
    reduce_frequent_words,
    output_path,
    visualizations,
    run_model,
    model_output,
)
