# Домашнее задание: Реализация алгоритмов PC (Peter‑Clark) и Greedy Equivalence Search (GES)

В этом задании необходимо реализовать два классических алгоритма построения причинно‑следственных графов:

* **PC‑алгоритм** (Peter‑Clark) — constraint‑based метод, основанный на статистических тестах.
* **Greedy Equivalence Search (GES)** — score‑based метод, основанный на жадном поиске по эквивалентным классам DAG.

Неплохой обзор на эти и другие подходы к восстановлению причинно-следственных связей в данных можно найти [здесь](https://www.frontiersin.org/journals/genetics/articles/10.3389/fgene.2019.00524/full).

In [49]:
import numpy as np
import pandas as pd
import networkx as nx
from itertools import combinations
from collections import defaultdict

## Часть 1. PC‑алгоритм


Алгоритм PC состоит из трех частей:

1. **Построение скелета графа** — итеративно удаляем ребра если находим (условную) независимость между переменными $i$ и $j$ относительно некоторого множества $S \subset V$ \ $\{i, j\}$ .
2. **Поиск v-структур** — ориентируем некоторые тройки вершин i - j - k (unshielded triple) как i -> j <- k (v-structure).
2. **Ориентация рёбер относительно правил Мика** — применяем набор эвристических правил для получения ориентированного ациклического графа.

Реализуйте [z-тест Фишера](https://en.wikipedia.org/wiki/Partial_correlation) на условную независимость переменных X и Y относительно множества Z -- данная функция понадобится в 1 и 2 частях:

In [50]:
from scipy.stats import norm

In [51]:
def indep_test(X: str, Y: str, Z: set[str], data: pd.DataFrame, alpha: float = 0.05):
    """Возвращает `True`, если X ⟂ Y | Z."""
    
    # Для вычисления выборочной частной корреляции для некоторых данных воспользуемся инверсией корреляционной матрицы
    
    # Подготовим данные:
    columns = [X, Y] + list(Z) 
    df = data[columns].dropna() # Тут мы удаляем все строки с пропусками
    N = df.shape[0]

    # Проверочка на возможность применения z-тест Фишера:
    if N - len(Z) - 3 <= 0:
        return False

    
    if not Z:
        # При отсутствии Z просто считаем обычную корреляцию Пирсона:
        # Corr(X, Y) = Cov(X, Y)/(sigma_X * sigma_Y)
        r = df[X].corr(df[Y])
    else:
        # Находим корреляционную матрицу для X, Y, Z; инвертируем её получая матрицу точности 
        corr_matrix = df.corr().to_numpy()
        precision_matrix = np.linalg.inv(corr_matrix)

        # Индексы X и Y
        x, y = 0, 1

        # Воспользуемся основной формулой частичной корреляции через матрицу точности:
        # r_x,y = - (ro_x,y)/(sqrt(ro_x,x * ro_y,y))
        r = - precision_matrix[x, y] / np.sqrt(precision_matrix[x, x] * precision_matrix[y, y])



    # Воспользуемся z-преобразование Фишера:
    # z(r_x,y) = 1/2 * ln((1 + r_x,y)/(1 - r_x,y))
    z_transform = 0.5 * np.log((1 + r) / (1 - r))

    # Считаем тестовую статистику:
    # sqrt(N - |Z| - 3) * |z_transform|
    test_stat = np.sqrt(N - len(Z) - 3) * abs(z_transform)

    # Теперь высчитаем критическое значение нормального распределения:
    # Fi^(-1)((1 - alpha) / 2)
    critical_value = norm.ppf(1 - alpha / 2)

    # Проверка гипотезы
    # Мы можем отклонить H_0, если тестовая статистика будет больше критического значения норм. распред.
    if test_stat > critical_value:
        return False
    else:
        return True


### 1.1 Функция `estimate_skeleton`

На данном этапе формируется скелет графа путём последовательного удаления рёбер из исходного полносвязного графа. Для каждой пары вершин $X$ и $Y$ выполняется тест на условную независимость $X$ ⊥ $Y$ ∣ $Z$: если найдётся хотя бы одно множество $Z$ при котором эта независимость подтверждается, то ребро $X$ - $Y$ удаляется.

In [52]:

def estimate_skeleton(data: pd.DataFrame, indep_test, alpha: float = 0.05):
    """Оценивает неориентированный скелет графа.

    Parameters
    ----------
    data : pd.DataFrame
        Входные данные, столбцы — переменные.
    indep_test : callable
        Функция для проверки условной независимости (см. ниже).
    alpha : float
        Порог значимости для теста.

    Returns
    -------
    skeleton : nx.Graph
        Неориентированный граф после удаления зависимых рёбер.
    """
    # Первый шаг - построение полного неориентированного графа:
    variables = list(data.columns)
    graph = nx.Graph()
    graph.add_nodes_from(variables)
    graph.add_edges_from(combinations(variables, 2))

    # Основная идея:
    # Необходимо сформировать полный неориентированных граф 
    # После, мы будем устранять ребра между переменными, которые оказываются условно независимыми при некотором множестве других переменных (Z).
    
    # sep_sets - тут у нас все множества Z, при которых выполнилось условие о независимости переменных
    # size_of_Z_set - размер ;)
    sep_sets = {} 
    size_of_Z_set = 0      
    
    while True:
        # Флаг, просто флаг
        any_candidate = False
        edges = list(graph.edges())

        for X, Y in edges:
            neighbours = (set(graph.neighbors(X)) | set(graph.neighbors(Y))) - {X, Y}

            # Условие о размерности множества Z и кол-ве самих соседей 
            if len(neighbours) < size_of_Z_set:
                continue

            any_candidate = True

            for Z in combinations(neighbours, size_of_Z_set):
                if indep_test(X, Y, set(Z), data, alpha):
                    graph.remove_edge(X, Y)
                    # Запомни Z мн-во, которое разделило ребра X и Y
                    sep_sets[frozenset((X, Y))] = set(Z)
                    break

        # Условие выхода из цикла:
        if not any_candidate:
            break
        size_of_Z_set += 1
    
    # Прикрепим sep_sets 
    graph.graph["sep_sets"] = sep_sets
    
    return graph, sep_sets


### 1.2 Функция `find_colliders`

Этот шаг фиксирует направления причинно-следственных связей, которые однозначно следуют из статистических зависимостей. Для каждой тройки $X$ - $Z$ - $Y$ мы проверяем, что $X$ **условно зависит от** $Y$ **по любым** $S$, где $S$ ⊆ $V\backslash\{X, Y\}$ ∪ $Z$ - если это так, то ориентируем тройку как $X$ -> $Z$ <- $Y$ 

In [53]:

def find_colliders(skeleton: nx.Graph, indep_test, alpha: float = 0.05):
    """Ориентирует рёбра относительно найденных на графе / в данных v-структур.

    Returns
    -------
    dag : nx.DiGraph
        Частично оринетированный граф.
    """
    
    # pdag - частично ориентированный граф
    # Суть функции:
    # Создаем пустой граф, добавим узлы и ребра, так, чтобы имитировать неориентированный граф
    # Пройдем по всем вершинам рассматривая их как Z - центр v-структуры
    
    # graph.graph["sep_sets"] = sep_sets - сыграло свою роль 
    sep_sets = skeleton.graph.get("sep_sets", {})
    
    pdag = nx.DiGraph()
    pdag.add_nodes_from(skeleton.nodes)
    for u, v in skeleton.edges:
        pdag.add_edge(u, v)
        pdag.add_edge(v, u)

    for Z in skeleton.nodes:
        neighbors = list(skeleton.neighbors(Z))
        if len(neighbors) < 2:
            continue

        # X -> Z <- Y =
        for X, Y in combinations(neighbors, 2):
            # Проверка, образуют ли X, Z, Y переменные v-структуру
            if skeleton.has_edge(X, Y):
                continue
            
            # Проверяем является ли Z - разделяющим множеством
            # Если нет - X и Y зависимы при условии Z => v-структура
            if Z not in sep_sets.get(frozenset((X, Y)), set()):
                if pdag.has_edge(Z, X):
                    pdag.remove_edge(Z, X)
                if pdag.has_edge(Z, Y):
                    pdag.remove_edge(Z, Y)

    return pdag


### 1.3 Функция `orient_edges`

На последнем этапе мы ориентируем ребра относительно допущений алгоритма Петера-Кларка. Полный список ориентационных правил (Meek rules) можно найти в [оригинальной статье](https://arxiv.org/pdf/1302.4972).


In [54]:
def orient_edges(g: nx.Graph, sep_sets):
    """Ориентирует рёбра относительно правил Мика.

    Returns
    -------
    dag : nx.DiGraph
        Частично оринетированный граф.
    """
    
    # Пока находятся рёбра, которые можно ориентировать:
    changed = True
    while changed:
        changed = False

        # R1: X→Y, Y–Z, нет ребра между X и Z  ⇒  Y→Z
        # Выбираем ребра с направлением X→Y, ищем неориентированное ребро Z-Y, и если X и Z - не связаны, то ориентируем Y→Z.
        for X, Y in list(g.edges):
            if g.has_edge(Y, X):
                continue
                
            for Z in list(g.predecessors(Y)):
                if not g.has_edge(Y, Z):
                    continue
                if g.has_edge(X, Z) or g.has_edge(Z, X):
                    continue
                g.remove_edge(Z, Y)
                changed = True

        # R2: X–Y и есть X→Z→Y  ⇒  X→Y
        # Находим неориентированное ребро X-Y, ищем такое Z, что X→Z и Z→Y, после чего ориентируем X→Y
        for X, Y in list(g.edges):
            if not g.has_edge(Y, X):
                continue
            for Z in list(g.successors(X)):
                if Z == Y or g.has_edge(Z, X):
                    continue
                if g.has_edge(Z, Y) and not g.has_edge(Y, Z):
                    g.remove_edge(Y, X)
                    changed = True
                    break
        
        # R3: X→Y, X–W, W–Y, W–Z, Z→Y ⇒ W→Y
        # Находим ориентированное ребро X→Y, ищем такое W, что Y–W и X–W, после чего ищем такие Z, что W–Z и Z→Y, если все нашли, то ориентируем ребро W→Y
        for X, Y in list(g.edges):
            if not g.has_edge(Y, X):
                for W in list(g.neighbors(Y)):
                    if W in (X, Y):
                        continue
                    if g.has_edge(Y, W) and g.has_edge(W, Y):
                        if g.has_edge(X, W) and g.has_edge(W, X):
                            for Z in list(g.neighbors(W)):
                                if Z in (X, Y, W):
                                    continue
                                if g.has_edge(W, Z) and g.has_edge(Z, W):
                                    if g.has_edge(Z, Y) and not g.has_edge(Y, Z):
                                        g.remove_edge(W, Y)
                                        changed = True

        # R4: W–X, W–Y, W–Z, Z→Y, Y→X ⇒ W→X
        # Находим неориентированные ребра W–X, W–Y, W–Z, и если находим Z → Y и Y → X, то ориентируем W→X
        for W, X in list(g.edges):
            if not g.has_edge(X, W):
                continue
            for Y in list(g.neighbors(W)):
                if Y in (W, X):
                    continue
                if not (g.has_edge(W, Y) and g.has_edge(Y, W)):
                    continue
                for Z in list(g.neighbors(W)):
                    if Z in (W, X, Y):
                        continue
                    if not (g.has_edge(W, Z) and g.has_edge(Z, W)):
                        continue
                    if g.has_edge(Z, Y) and not g.has_edge(Y, Z):
                        if g.has_edge(Y, X) and not g.has_edge(X, Y):
                            g.remove_edge(X, W)
                            changed = True
        
    return g
    

### 1.4 Обертка `pc_algorithm`

Соберите отдельные части алгоритма Петера-Кларка внутри одной функции:

In [55]:

def pc_algorithm(data: pd.DataFrame, indep_test, alpha: float = 0.05):
    """Полный PC‑алгоритм."""
    skeleton, sep_sets = estimate_skeleton(data, indep_test, alpha)
    pdag = find_colliders(skeleton, indep_test, alpha)
    pdag = orient_edges(pdag, sep_sets)
    
    # Теперь необходимо приведение меток узлов к int, а то будет ошибка
    index_map = {col: i for i, col in enumerate(data.columns)}
    pdag_int = nx.relabel_nodes(pdag, index_map, copy=True)

    return pdag_int


## Часть 2. Greedy Equivalence Search (GES)


GES выполняется в два этапа:

* **Forward‑phase** — жадно добавляет ребра, улучшая значение функции оценки.
* **Backward‑phase** — жадно удаляет ребра, также оптимизируя оценку.

Центральной компонентой алгоритма является скоринговая функция, максимизируемая на каждом из этапов GES. В данном домашнем задании это вариант [BIC](https://en.wikipedia.org/wiki/Bayesian_information_criterion), рассчитываемый по следующей формуле:

$$
\operatorname{BIC}(G , \mathcal{D})=\sum_{i=1}^d \operatorname{BIC}_i, \\
\mathrm{BIC}_i=\ell_i\left(\hat{\theta}_i ; \mathcal{D}\right)-\frac{k_i}{2} \ln n,
$$
где $\ell_i$ - максимизированное лог-правдоподобие узла $X_i$, $k_i$ = $\mathrm{Pa}_i$, $n$ - объе̄м выборки.

Максимизированное лог-правдоподобие

$$
\ell_i\left(\hat{\theta}_i; \mathcal{D}\right)=-\frac{n}{2}\left(\ln \hat{\sigma}_i^2+1+\ln (2 \pi)\right)
$$

расчитывается посредством построения регрессии родителей вершины $i$ ($\mathrm{Pa}_i$) относительно каждой вершины $i$:

$$
X_i=\beta_{i 0}+\sum_{j \in \mathrm{Pa}_i} \beta_{i j} X_j+\varepsilon_i, \quad \varepsilon_i \sim \mathcal{N}\left(0, \sigma_i^2\right).
$$
Оценив значения $\beta_{i j}$, получим 

$$
\hat{\sigma}_i^2=\mathrm{\sum(y-\hat{y})^2} / n.
$$


In [56]:
def bic_score(dag: nx.DiGraph, data: pd.DataFrame):
    """Вычисляет BIC‑скор поданного на вход DAG."""
    
    # Реализуем bic_score для оценки качества DAG графа
    
    
    # total - BIC(G,D)
    # n - объем выборки
    # ln_n - часть штрафа
    total = 0.0
    n = len(data)
    ln_n = 1/2 * np.log(n)
    
    # Пройдемся по всем вершинам, определим вектор значений (y) каждой из них, а также найдем их родителей (parents)
    for node in dag.nodes:
        y = data[node].to_numpy()
        parents = list(dag.predecessors(node))

        # Если у узла есть родители, то построим регрессию родителей
        # X_i = β_0 + SUM_j∈Pa_i (β_j * X_j) + ε_i 
        # y_pred = X * β
        # Если родителей нет, то просто предсказываем средним значением
        # y_pred = 1/n SUM_n,i=1 (y_i) 
        if parents:
            X = data[parents].to_numpy()
            X = np.column_stack([np.ones(n), X]) 
            beta = np.linalg.lstsq(X, y, rcond=None)[0]
            y_pred= X @ beta
        else:
            y_pred = np.full_like(y, y.mean())
            
        # Среднеквадратичная ошибка или оценка дисперсии остатков sigma2
        # sigma2 = 1/n SUM_n,i=1 (y_i - y_pred)^2
        sigma2 = np.mean((y - y_pred) ** 2)

        # Вычислим лог-правдоподобие
        # ll = - n/2 * (ln(sigma2) + 1 + ln(2 * pi))
        ll = - n/2 * (np.log(sigma2) + 1 + np.log(2 * np.pi))

        # Рассчитаем BIC узла
        # bic_i = ll - k_i/2 * ln(n)
        k_i = len(parents)
        bic_i = ll - k_i/2 * np.log(n)
        total += bic_i

    return total


### 2.1 Функция `ges_forward_phase`

Forward-фаза алгоритма GES заключается в пошаговом добавлении рёбер к пустому графу таким образом, чтобы максимизировать прирост BIC на каждом шаге; процесс продолжается, пока ни одно возможное изменение структуры графа не улучшит BIC.

In [57]:
def ges_forward_phase(data: pd.DataFrame, score_func):
    """Возвращает CPDAG после forward фазы."""
    
    # Жадно добавляет рёбра в пустой граф, каждое из которых наиболее улучшает BIC-оценку
    # Создадим пустой граф, вычислим BIC
    variables = list(data.columns)
    dag = nx.DiGraph()
    dag.add_nodes_from(variables)
    
    current_score = score_func(dag, data)

    while True:
        best_edge = None
        best_delta = 0.0

        # Будем искать covered дугу с лучшей BIC оценкой
        for X in variables:
            for Y in variables:
                if X == Y or dag.has_edge(X, Y):
                    continue
                # Проверка на цикл
                if nx.has_path(dag, Y, X):
                    continue
                
                # Добавляем, считаем BIC_i, смотрим прирост, удаляем
                dag.add_edge(X, Y)
                new_score = score_func(dag, data)
                delta = new_score - current_score
                dag.remove_edge(X, Y)

                if delta > best_delta:
                    best_delta = delta
                    best_edge = (X, Y)

        if best_delta > 0 and best_edge:
            dag.add_edge(*best_edge)
            current_score += best_delta
        else:
            break

    return dag
    

### 2.2 Функция `ges_backward_phase`

Backward-фаза начинается с графа, полученного после forward-фазы -- на данном этапе ребра удаляются до тех пор, пока изменение структуры графа не приводит к ухудшению BIC.

In [58]:
def ges_backward_phase(cpdag: nx.Graph, data: pd.DataFrame, score_func):
    """Возвращает CPDAG после backward фазы."""
    
    # Жадное удаление рёбер, если это улучшает BIC
    # Начинаем с графа после forward и удаляем ребро, если это увеличивает BIC
    dag = cpdag.copy()
    current_score = score_func(dag, data)

    while True:
        best_edge = None
        best_delta = 0.0
        
        # Перебираем все ориентированные ребра, временно убираем ребро, если не образовалось циклов, высчитываем BIC, смотрим прирост, возвращаем удаленное ребро
        for X, Y in list(dag.edges):
            dag.remove_edge(X, Y)
            
            # Проверка на циклы
            if nx.is_directed_acyclic_graph(dag):
                new_score = score_func(dag, data)
                delta = new_score - current_score
                if delta > best_delta:
                    best_delta = delta
                    best_edge = (X, Y)
            dag.add_edge(X, Y)

        if best_delta > 0 and best_edge:
            dag.remove_edge(*best_edge)
            current_score += best_delta
        else:
            break

    return dag


### 2.3 Обертка `ges_algorithm`

Соберите отдельные части алгоритма GES внутри одной функции:

In [59]:

def ges_algorithm(data: pd.DataFrame, score_func):
    """Полная реализация GES."""
    forward_dag = ges_forward_phase(data, score_func)
    final_dag = ges_backward_phase(forward_dag, data, score_func)
    return final_dag


## Проверка корректности

В этом разделе мы протестируем реализованную вами версию PC на синтетическом датасете с известной структурой + сравним с реализацией из пакета **causal‑learn**:

In [60]:
!pip install -q causal-learn 

In [61]:
from numpy.random import randn

import numpy as np
import pandas as pd
import networkx as nx

from causallearn.search.ConstraintBased.PC import pc
from causallearn.search.ScoreBased.GES import ges

In [62]:
def simulate(n_samples):   
    x1 = randn(n_samples)
    x2 = randn(n_samples)
    x3 = randn(n_samples)
    x5 = x1 + x2 + randn(n_samples)
    x4 = x5 + randn(n_samples)
    x6 = 0.8*x5 + x2 + x3 + randn(n_samples)
    x7 = 0.8 * x6 + randn(n_samples)
    return pd.DataFrame({ "x1": x1, "x2": x2, "x3": x3,"x4": x4, "x5":x5, "x6":x6, "x7":x7})

n_samples = 10000
data = simulate(n_samples)

In [63]:
graph_gt = nx.DiGraph([
    ['x1', 'x5'],
    ['x2', 'x5'],
    ['x5', 'x4'],
    ['x5', 'x6'],
    ['x2', 'x6'],
    ['x3', 'x6'],
    ['x6', 'x7']
])

Зафиксируем уровень значимости:

In [64]:
alpha = 0.05

Результаты PC из causal-learn:

In [65]:
alpha = 0.05

pc_graph_cl = pc(data.to_numpy(), alpha=alpha, node_names=data.columns)
pc_graph_cl.to_nx_graph()

g = pc_graph_cl.nx_graph
print(g)
print(g.edges)

  0%|          | 0/7 [00:00<?, ?it/s]

DiGraph with 7 nodes and 7 edges
[(0, np.int64(4)), (1, np.int64(4)), (1, np.int64(5)), (2, np.int64(5)), (4, np.int64(3)), (4, np.int64(5)), (5, np.int64(6))]


Результаты PC, реализованного вами:

In [66]:
# Запуск PC-алгоритма, реализованного вручную
pc_graph = pc_algorithm(data, indep_test, alpha)
print(pc_graph)
print(pc_graph.edges)

# Сравнение с causal-learn
assert set(pc_graph.edges) == set(g.edges)

DiGraph with 7 nodes and 7 edges
[(0, 4), (1, 4), (1, 5), (2, 5), (4, 3), (4, 5), (5, 6)]


In [67]:
assert set(pc_graph.edges) == set(g.edges)