# Исследуем идеи алгоритма PageRank

Есть набор веб-страниц одного сайта.  
У каждой страницы есть URL вида `https://www.site.com/p/<id>`.  
Внутри текста страниц встречаются ссылки на другие страницы того же сайта — в том же формате URL. К сожалению, сайт взломали и добавили бессмысленные спам-страницы, нарушающие корректность переходов между страницами. Мы успели вручную проверить часть страниц и хотим на их основе разобраться с оставшимися.

Таким образом реализуем упрощённый пайплайн индексатора:

1) извлекаем ссылки из текста и строим ориентированный граф ссылок;
2) получаем графовые признаки страниц:
   - **доступность** страниц относительно некоторого верифицированного ядра страниц (seed pages),
   - наличие **циклических ссылок**,
   - глобальную «карту переходов» на малом поднаборе;
3) на основе признаков построить **TrustScore** страницы;
4) на небольшом поднаборе страниц построить kNN «похожие страницы по навигации»
   (ближайшие по числу переходов по ссылкам).



## Почему граф

Ссылка внутри текста — это наблюдаемое действие автора страницы: он явно показывает, куда стоит перейти за уточнением, примером или продолжением.

Мы используем сам факт ссылочного перехода как сигнал связи между страницами и строим ориентированный граф:
- вершина — страница
- ребро `u → v` — на странице `u` встречается ссылка на `v`

Дальше мы исследуем структуру сайта как граф и получаем свойства страниц, которые невозможно извлечь из текста по отдельности.


In [None]:

import os
import random
import pandas as pd
import numpy as np
import re
from typing import List, Dict, Tuple


## Данные

Мы будем работать с синтетическим датасетом `pages.csv`.
Если файла нет, он будет сгенерирован локально.

В генераторе есть:
- `is_seed`: доверенные страницы (ядро)
- `is_spam`: страницы, которые чаще образуют “фермы ссылок” (замкнутые структуры)

Эти поля нужны для эксперимента с TrustScore в конце.


In [None]:

def make_url(i: int) -> str:
    return f"https://www.site.com/p/{i}"

def generate_pages(
    n_pages: int = 200,
    vocab_size: int = 300,
    words_per_page: Tuple[int, int] = (80, 160),
    links_per_page: Tuple[int, int] = (2, 8),
    n_seed: int = 8,
    n_spam_clusters: int = 3,
    spam_cluster_size: int = 10,
    seed: int = 42,
    out_csv: str = "pages.csv",
) -> None:
    """Генератор синтетических страниц с ссылками в тексте."""
    rnd = random.Random(seed)
    vocab = [f"word{j}" for j in range(vocab_size)]

    urls = [make_url(i) for i in range(n_pages)]
    seed_ids = set(rnd.sample(range(n_pages), n_seed))

    # создаём “спам-кластеры” (циклические ссылки)
    spam_nodes = set()
    clusters = []
    candidates = [i for i in range(n_pages) if i not in seed_ids]
    rnd.shuffle(candidates)
    ptr = 0
    for _ in range(n_spam_clusters):
        cluster = candidates[ptr:ptr + spam_cluster_size]
        ptr += spam_cluster_size
        if len(cluster) == spam_cluster_size:
            clusters.append(cluster)
            spam_nodes.update(cluster)

    rows = []
    for i in range(n_pages):
        n_words = rnd.randint(*words_per_page)
        words = [rnd.choice(vocab) for _ in range(n_words)]

        n_links = rnd.randint(*links_per_page)
        out = []

        if i in spam_nodes:
            cluster = next(c for c in clusters if i in c)
            out.append(rnd.choice(cluster))
            while len(out) < n_links:
                if rnd.random() < 0.85:
                    out.append(rnd.choice(cluster))
                else:
                    out.append(rnd.randrange(n_pages))
        else:
            while len(out) < n_links:
                r = rnd.random()
                if r < 0.25:
                    out.append(rnd.choice(list(seed_ids)))
                else:
                    out.append(rnd.randrange(n_pages))

        # вставляем URL прямо в текст (как токены)
        for v in out:
            pos = rnd.randrange(len(words) + 1)
            words.insert(pos, make_url(v))

        rows.append({
            "page_id": i,
            "url": urls[i],
            "is_seed": int(i in seed_ids),
            "is_spam": int(i in spam_nodes),
            "text": " ".join(words),
        })

    pd.DataFrame(rows).to_csv(out_csv, index=False)

In [None]:
if not os.path.exists("pages.csv"):
    generate_pages()

df = pd.read_csv("pages.csv")

required = {"page_id", "url", "is_seed", "is_spam", "text"}

In [None]:
assert required.issubset(df.columns), f"Missing columns: {required - set(df.columns)}"
assert df["page_id"].is_unique
assert df["page_id"].min() == 0
assert df["page_id"].max() == len(df) - 1

In [None]:

df.head()

## Шаг 1. Извлечь ссылки из текста

Ссылки имеют формат `https://www.site.com/p/<id>`.
Мы извлекаем все такие ссылки из текста каждой страницы и получаем список исходящих ссылок `out_links`.

Дальше именно `out_links` станет основой для построения графа.


In [None]:

URL_RE = re.compile(r"https://www\.site\.com/p/(\d+)")

def extract_out_links(text: str) -> List[int]:
    """TODO: извлечь из текста все id страниц, на которые есть ссылка."""
    # Подсказка: URL_RE.findall(text) вернёт список строковых чисел.
    raise NotImplementedError


def unique_in_range(xs: List[int], n: int) -> List[int]:
    """TODO: убрать дубликаты, сохранить порядок, выбросить ссылки вне диапазона [0, n-1]."""
    raise NotImplementedError


n = len(df)
df = df.copy()
df["out_links_raw"] = df["text"].astype(str).map(extract_out_links)
df["out_links"] = df["out_links_raw"].map(lambda xs: unique_in_range(xs, n))

# проверки
assert df["out_links_raw"].map(lambda x: isinstance(x, list)).all()
assert df["out_links_raw"].map(lambda xs: all(isinstance(v, int) for v in xs)).all()
assert df["out_links"].map(lambda xs: all(0 <= v < n for v in xs)).all()

df[["page_id", "out_links"]].head()


## Шаг 2. Построить граф ссылок

Мы будем использовать два представления одного графа:

- **Список смежности**: для каждой страницы список страниц, на которые она ссылается.
  Это удобно для обходов и локального анализа.
- **Матрица смежности**: `A[u, v] = 1`, если есть ссылка `u → v`.


In [None]:
def build_adj_list(df: pd.DataFrame) -> List[List[int]]:
    """TODO: построить список смежности по df['out_links']"""
    raise NotImplementedError


def build_adj_matrix(adj: List[List[int]]) -> np.ndarray:
    """TODO: построить матрицу смежности A (0/1) по списку смежности."""
    raise NotImplementedError


In [None]:
adj = build_adj_list(df)
A = build_adj_matrix(adj)

In [None]:
assert isinstance(adj, list) and len(adj) == n
assert isinstance(A, np.ndarray) and A.shape == (n, n)
assert set(np.unique(A)).issubset({0, 1})

In [None]:
# базовые степени
out_degree = np.array([len(vs) for vs in adj], dtype=int)
in_degree = A.sum(axis=0).astype(int)

df["out_degree"] = out_degree
df["in_degree"] = in_degree

In [None]:
df[["page_id", "in_degree", "out_degree"]].head()

## Шаг 3. Доступность доверенного ядра (расстояния по ссылкам)

На сайте обычно есть набор страниц, которым можно доверять заранее (seed pages).
Мы хотим измерить **сколько шагов по ссылкам** отделяет каждую страницу от доверенного ядра.

Интерпретация простая: чем меньше переходов нужно, чтобы попасть к доверенным страницам,
тем “ближе” страница к здоровой структуре сайта.


In [None]:

from collections import deque
import math

def reverse_graph(adj: List[List[int]]) -> List[List[int]]:
    """TODO: построить обратный граф."""
    raise NotImplementedError


def dist_to_seeds_by_links(adj: List[List[int]], seeds: List[int]) -> List[float]:
    """TODO: расстояние (число переходов) от каждой страницы ДО множества seed-страниц.

    Рекомендация: удобнее считать BFS по обратному графу, стартуя из seeds.
    """
    raise NotImplementedError


In [None]:
seeds = df.loc[df["is_seed"] == 1, "page_id"].tolist()
dist_seed = dist_to_seeds_by_links(adj, seeds)

df["dist_to_seed"] = dist_seed
df["reachable_3"] = [d <= 3 for d in dist_seed]
df["reachable_5"] = [d <= 5 for d in dist_seed]

In [None]:
assert df["dist_to_seed"].map(lambda x: (x == math.inf) or (isinstance(x, (int, float)) and x >= 0)).all()

In [None]:
df[["page_id", "is_seed", "dist_to_seed", "reachable_3", "reachable_5"]].head()

## Шаг 4. Петли и замкнутые области (циклы)

Если переходя по ссылкам можно вернуться к уже посещённой странице,
значит в графе есть замкнутая структура.

В документации и навигации такие области часто сигнализируют о проблемах, например если теорема 1 доказыватся через теорему 2, а теорема 2 доказывается через теорему 1, то на лицо явная проблема в логике. С страницами абсолютно тоже самое.

Поэтому мы добавим признак `in_cycle`: участвует ли страница хотя бы в одной петле.


In [None]:
def in_any_cycle(adj: List[List[int]]) -> List[bool]:
    """TODO: вернуть список длины N: True, если вершина участвует хотя бы в одном цикле.

    Можно реализовать через DFS со стеком рекурсии (цвета 0/1/2).
    """
    raise NotImplementedError


In [None]:
df["in_cycle"] = in_any_cycle(adj)
assert df["in_cycle"].map(lambda x: isinstance(x, bool)).all()

In [None]:
df[["page_id", "in_cycle"]].head()

## Шаг 5. Карта навигации на малом поднаборе (полные попарные расстояния)

Построение полной матрицы расстояний для всех страниц обычно слишком дорогое по ассимптотике.
Поэтому мы берём небольшой поднабор важных страниц (например, seed + страницы с высоким in-degree)
и строим для него матрицу переходов: минимальное число переходов между любой парой.

Эта матрица позволяет:
- оценить, какие страницы составляют центр кластеров (в среднем ближе к другим),
- строить навигационные рекомендации: k ближайших страниц по числу переходов.


In [None]:
INF = 10**9

def choose_subset(df: pd.DataFrame, max_p: int = 180) -> List[int]:
    """Готовая функция выбора поднабора (не является целью задания)."""
    seed_pages = df.loc[df["is_seed"] == 1, "page_id"].tolist()
    top = df.sort_values("in_degree", ascending=False)["page_id"].tolist()

    picked = []
    seen = set()
    for x in seed_pages + top:
        x = int(x)
        if x not in seen:
            picked.append(x)
            seen.add(x)
        if len(picked) >= max_p:
            break
    return picked

In [None]:
P = choose_subset(df, max_p=180)
P_set = set(P)
idx = {pid: i for i, pid in enumerate(P)}
m = len(P)
assert m <= 180

In [None]:
def build_dist_matrix_on_subset(adj: List[List[int]], P: List[int]) -> np.ndarray:
    """TODO: построить матрицу dist0 на поднаборе P:
    - dist0[i,i]=0
    - dist0[i,j]=1 если есть ребро (P[i] -> P[j])
    - иначе INF
    """
    raise NotImplementedError


def floyd_warshall(dist0: np.ndarray) -> np.ndarray:
    """TODO: алгоритм Флойда–Уоршелла для матрицы dist0."""
    raise NotImplementedError


def graph_knn_from_dist(dist: np.ndarray, k: int) -> List[List[int]]:
    """TODO: для каждой вершины i вернуть k ближайших j (по dist[i,j]), исключая i и INF."""
    raise NotImplementedError

In [None]:
D0 = build_dist_matrix_on_subset(adj, P)
D = floyd_warshall(D0)
knn_idx = graph_knn_from_dist(D, k=5)
knn_pages = [[P[j] for j in neigh] for neigh in knn_idx]

In [None]:
assert D.shape == (m, m)
assert (np.diag(D) == 0).all()
assert len(knn_pages) == m

### Признаки из карты навигации (на поднаборе)

По матрице расстояний можно получить простые агрегаты:
- среднее расстояние до достижимых страниц (proxy “центральности”)
- расстояние до ближайшего seed (если seed входит в поднабор)


In [None]:

avg_dist = []
min_dist_to_seed = []

seed_in_P = [idx[s] for s in df.loc[df["is_seed"] == 1, "page_id"].tolist() if s in idx]

for i in range(m):
    row = D[i]
    finite = row[row < INF]
    avg_dist.append(float(finite.mean()) if len(finite) else float("inf"))
    if seed_in_P:
        min_dist_to_seed.append(int(row[seed_in_P].min()))
    else:
        min_dist_to_seed.append(INF)

df["avg_dist_in_subset"] = np.nan
df["min_dist_to_seed_in_subset"] = np.nan
df["knn_pages_in_subset"] = None

for i, pid in enumerate(P):
    df.loc[df["page_id"] == pid, "avg_dist_in_subset"] = avg_dist[i]
    df.loc[df["page_id"] == pid, "min_dist_to_seed_in_subset"] = min_dist_to_seed[i]
    df.loc[df["page_id"] == pid, "knn_pages_in_subset"] = str(knn_pages[i])

df.loc[df["page_id"].isin(P), ["page_id", "avg_dist_in_subset", "min_dist_to_seed_in_subset", "knn_pages_in_subset"]].head()


## Дополнение: достижимость за ≤ K шагов (булева матричная степень)

Иногда важно не точное расстояние, а факт: можно ли добраться за не более чем K переходов.
Это отражает реальное UX-ограничение: пользователь редко делает много кликов подряд.


In [None]:
def reachability_upto_k(A_bool: np.ndarray, K: int) -> np.ndarray:
    """Готово: R[i,j]=True, если существует путь длины <= K."""
    assert A_bool.dtype == bool
    R = A_bool.copy()
    P = A_bool.copy()
    for _ in range(2, K + 1):
        P = (P @ A_bool)
        R = R | P
    return R

# пример на поднаборе
A_sub = (D0 == 1)
R_le_3 = reachability_upto_k(A_sub, K=3)
assert R_le_3.shape == (m, m)


## Шаг 6. TrustScore как ML-задача

У страницы нет одного признака, который надёжно определяет доверие.
Например, высокая входящая степень бывает и у справки, и у мусорной страницы.
Поэтому мы комбинируем несколько сигналов и обучаем простую модель.

В генераторе есть синтетическая разметка:
- `is_seed=1` считаем “доверенной” страницей
- `is_spam=1` считаем “недоверенной”

Остальные страницы в обучение не берём — они остаются для предсказания.


In [None]:

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

df = df.copy()
df["trust_label"] = np.where(df["is_seed"] == 1, 1, np.where(df["is_spam"] == 1, 0, np.nan))

feature_cols = [
    "in_degree",
    "out_degree",
    "dist_to_seed",
    "reachable_3",
    "reachable_5",
    "in_cycle",
    "avg_dist_in_subset",
    "min_dist_to_seed_in_subset",
]

work = df.copy()

# чистим inf/NaN
work["dist_to_seed"] = work["dist_to_seed"].replace([np.inf], np.nan)
work["dist_to_seed"] = work["dist_to_seed"].fillna(work["dist_to_seed"].max() + 1)

work["avg_dist_in_subset"] = work["avg_dist_in_subset"].fillna(work["avg_dist_in_subset"].median())
work["min_dist_to_seed_in_subset"] = work["min_dist_to_seed_in_subset"].fillna(work["min_dist_to_seed_in_subset"].median())

# bool -> int
for c in ["reachable_3", "reachable_5", "in_cycle"]:
    work[c] = work[c].astype(int)

labeled = work.dropna(subset=["trust_label"]).copy()
X = labeled[feature_cols].to_numpy(dtype=float)
y = labeled["trust_label"].astype(int).to_numpy()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.35, random_state=42, stratify=y)

model = LogisticRegression(max_iter=2000)
model.fit(X_train, y_train)

proba_test = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, proba_test)
auc


## Результат

- `trust_score` — оценка доверия страницы по структуре ссылок (и производным признакам)





In [None]:

X_all = work[feature_cols].to_numpy(dtype=float)
work["trust_score"] = model.predict_proba(X_all)[:, 1]

# 10 самых подозрительных (низкий trust_score), среди не-seed
work[work["is_seed"] == 0].sort_values("trust_score").head(10)[
    ["page_id", "trust_score", "is_spam", "in_cycle", "dist_to_seed", "in_degree", "out_degree"]
]
