# Разрез современного Генеративного машинного обучения

**темы:** основные понятия и механизмы генеративного ИИ, обзор современных методов обучения генеративного ИИ, этика применения, споры и юридические аспекты 

**дата:** 02.02.2026

**Автор:** Федоров Артем Максимович

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

## Вероятностное моделирование "in general"

Самой популярной парадигмой современного машинного обучения является вероятностное моделирование, подразумевающее, что наблюдаемые данные порождаются некоторым законом распределения, существующим «в природе» – сам закон нам не известен, но мы видим, какие данные могут им порождаться по наблюдаемой обучающей выборке.

Таким образом мы формализуем неопределённость и шум реального мира: вместо “жёстких” правил мы строим модель, какие наблюдения типичны и насколько. Где выделяются "частоты"  явления, там совершенно натуральным образом появляется и определение вероятности – чем чаще объекты (или их окрестности) встречаются в данных, тем выше должна быть их “масса” в распределении. Такая постановка делает методы:

- более робастными (устойчивыми к шуму в данных)
- интерпретируемыми (теперь модель выделяет вероятность "правильного" ответа – своей уверенности)
- более гибкими (появляется конструктивный подход построения/модернизции моделей, учитывающих разные аспекты данных/доменов)


> **Пример распределения изображений** из двух классов (котики и собачки). Здесь показано, что в некоторой области расположены два концентрированных облака точек, каждое из которых определено под соответствующий класс изображений; обратите внимание, что точки существуют не только в центрах облаков, но и в удалении или между ними – это так же валидные объекты, просто менее похожие на "идеальных" представителей (усредненных) классов. 
<p align="center">
  <img src="images/lecture_1/cats_n_dogs_distribution.png" style="width:60%;">
</p>




#### Понятие распределения

Пусть мы так или иначе изучаем объекты $x$, принадлежащие (погруженные) в множество `носитель` $x \in \mathcal{X}$ $\Rightarrow$ любой наблюдаемый нами объект существует внутри данного множетсва $\mathcal{X}$. Потребность в явном выделении такого понятия как `носитель` может быть неочевидно, однако оно полезно, когда $\mathcal{X}$ сам по себе сложный объект. Это может быть:

| Носитель $\mathcal{X}$ | Определение | Примеры использования в ML |
| --- | --- | --- |
| $\mathcal{X} \equiv \mathbb{R}^d$ | Евклидово пространство признаков | эмбеддинги текста/изображений, векторизованные тензоры (матрицы) |
| $\mathcal{X} \equiv \mathbb{H}^d$ | Гиперболическое пространство признаков | эмбеддинги объектов, сохраняющие доменную иерархию |
| $\mathcal{X} \equiv \mathbb{S}^d_{++}$ | Множество симметричных положительно определённых матриц | ковариационные матрицы в сигналов/EEG/fMRI, матрицы рассеяния в CV, Riemannian ML в оптимизации |
| $\mathcal{X} \equiv \mathcal{G}$ | Множество графов | молекулярная генерация (генеративные GNN), генерация/дополнение knowledge graphs, моделирование социальных/транспортных сетей |
| $\mathcal{X} \equiv \mathcal{M}^n$ | Множество ранжировок/перестановок | learning-to-rank (поисковая выдача), рекомендательные системы (перестановки товаров), генерация упорядоченных списков при наличии комбинаторных ограничений |
| $\mathcal{X} \subset \Delta^{K-1}=\left\{p\in\mathbb{R}^K_{\ge 0}:\sum_{k=1}^K p_k=1\right\}$ | Вероятностный симплекс (векторы вероятностей) | тематическое моделирование (LDA/Dirichlet), смеси распределений, распределения над действиями в RL (policy) |

Для фиксированного `носителя` $\mathcal{X}$ мы можем ввести понятие `распределение` – закон, сопоставляющий каждой области $A \subseteq \mathcal{X}$ число $\mathbb{P}(A) \in [0, 1]$: вероятность того, что случайно выбранный объект $x$ окажется в этой области $$\mathbb{P}(A) = \mathbb{P}(\textrm{Randomly Sampled:} \,\, x \in A)$$

Мы конструктивно определяем, в каких областях объекты существует и их появление/наблюдение там не редко (типично), а в каких областях $\mathcal{X}$ концентрация минимальна (или вообще 0). У такого правила есть три базовых свойства:

1) Неотрицательность: $\mathbb{P}(A) \leq 0$
2) Нормировка: $\mathbb{P}(\mathcal{X})=1$ (вся масса распределения живёт на `носителе`)
3) Аддитивность для непересекающихся областей: если $A \cap B = \varnothing$, то $\mathbb{P}(A\cup B)=\mathbb{P}(A)+\mathbb{P}(B)$

Во многих задачах удобно описывать `распределение` не через вероятности областей напрямую, а через `плотность` $p(x)$ – мы требуем от `распределения` абсолютной непрерывности. Интуитивно $p(x)$ всюду на `носителе` $\mathcal{X}$ определена и ее можно понимать как 'интенсивность массы' распределения в окрестности точки $x$ – с какой скоростью будет расти вероятность при увеличении маленькой области с центром в данной точке.
Тогда вероятность того, что случайный объект $X$ попадёт в область $A \subseteq \mathcal{X}$, выражается интегрированием плотности по этой области:
$$\mathbb{P}(X \in A) \;=\; \int_{A} p(x)\,dx \quad\mid\quad \int_{\mathcal{X}} p(x)\,dx \;=\; 1 $$

> Пример бимодального (две вершины) абсолютно непрерывного распределения


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401

def mvn_pdf_grid(X, Y, mean, cov):
    """2D Gaussian density evaluated on a meshgrid (X, Y)."""
    mean = np.asarray(mean, dtype=float).reshape(2,)
    cov  = np.asarray(cov, dtype=float).reshape(2, 2)

    pos  = np.stack([X, Y], axis=-1)               # (..., 2)
    diff = pos - mean                              # (..., 2)

    inv_cov = np.linalg.inv(cov)
    det_cov = np.linalg.det(cov)

    quad = np.einsum("...i,ij,...j->...", diff, inv_cov, diff)
    norm = 1.0 / (2.0 * np.pi * np.sqrt(det_cov))
    return norm * np.exp(-0.5 * quad)

def mvn_pdf_points(P, mean, cov):
    """2D Gaussian density evaluated at points P of shape (N,2)."""
    mean = np.asarray(mean, dtype=float).reshape(2,)
    cov  = np.asarray(cov, dtype=float).reshape(2, 2)

    P = np.asarray(P, dtype=float)
    diff = P - mean[None, :]

    inv_cov = np.linalg.inv(cov)
    det_cov = np.linalg.det(cov)

    quad = np.einsum("ni,ij,nj->n", diff, inv_cov, diff)
    norm = 1.0 / (2.0 * np.pi * np.sqrt(det_cov))
    return norm * np.exp(-0.5 * quad)

# --- Mixture parameters ---
w1, w2 = 0.5, 0.5
mu1 = np.array([-1.5, -1.0])
mu2 = np.array([ 1.5,  1.2])

# updated covariances (your values)
cov1 = np.array([[0.6,  0.2],
                 [0.2,  0.8]])

cov2 = np.array([[1.12, -0.25],
                 [-0.25, 1.15]])

# --- Grid for surface ---
xmin, xmax = -5, 5
ymin, ymax = -5, 5
n = 260

x = np.linspace(xmin, xmax, n)
y = np.linspace(ymin, ymax, n)
X, Y = np.meshgrid(x, y)

Z = w1 * mvn_pdf_grid(X, Y, mu1, cov1) + w2 * mvn_pdf_grid(X, Y, mu2, cov2)

# --- Sampling 200 points from the mixture ---
rng = np.random.default_rng(42)
N = 200
comp = rng.choice([0, 1], size=N, p=[w1, w2])

samples = np.empty((N, 2))
n1 = np.sum(comp == 0)
n2 = N - n1

# --- Plot ---
fig = plt.figure(figsize=(11, 8), dpi=150)
ax = fig.add_subplot(111, projection="3d")

# surface (transparent)
ax.plot_surface(
    X, Y, Z,
    rstride=1, cstride=1,
    linewidth=0,
    antialiased=True,
    alpha=0.55,
    shade=True
)

# wireframe overlay (grid visible on modes too)
ax.plot_wireframe(
    X, Y, Z,
    rstride=10, cstride=10,
    linewidth=0.6,
    alpha=0.75
)

z0 = 0.0
ax.contour(X, Y, Z, zdir="z", offset=z0, levels=12, alpha=0.55)

# labels / limits / view
ax.set_title("Bimodal Gaussian Mixture Density (2 Gaussians)")
ax.set_xlabel("x")
ax.set_ylabel("y")
ax.set_zlabel("p(x, y)", labelpad=14)   # <- help z-label readability

ax.set_xlim(xmin, xmax)
ax.set_ylim(ymin, ymax)
ax.set_zlim(z0, float(Z.max()) * 1.08)

ax.view_init(elev=28, azim=-55)

# cleaner panes
ax.xaxis.pane.set_alpha(0.0)
ax.yaxis.pane.set_alpha(0.0)
ax.zaxis.pane.set_alpha(0.0)
fig.subplots_adjust(left=0.02, right=0.86, bottom=0.02, top=0.92)

plt.show()


#### Случайные величины



**Подход теории вероятностей:** в основе лежит идея, что существует пространство элементарных исходов $\Omega$ и закон вероятности на нём. Случайная величина $X\in\mathcal{X}$ — это отображение, которое каждому исходу $\omega\in\Omega$ сопоставляет наблюдаемый объект $x=X(\omega)\in\mathcal{X}$; тем самым $X$ «переносит» вероятностный закон с $\Omega$ на пространство данных $\mathcal{X}$, порождая распределение $P_X$ (или плотность $p_X$ в непрерывном случае): $\mathbb{P}(X\in A)=P_X(A)$ для областей $A\subseteq\mathcal{X}$, а при наличии плотности $P_X(A)=\int_A p_X(x),dx$ и $\int_{\mathcal{X}} p_X(x),dx=1$.

**Эмпирический подход:** если опустить формальное определение, то случайная величина — это переменная, значение которой не фиксировано заранее: при разных повторениях наблюдения мы получаем разные реализации $x$. Набор наблюдений ${x_i}_{i=1}^n$ можно рассматривать как сэмплы из некоторого распределения $P_X$ (часто пишут $x_i\sim P_X$), которое и описывает, какие значения встречаются чаще, а какие реже.


#### Условное распределение



Пусть мы наблюдаем пару $(x,y)\in\mathcal{X}\times\mathcal{Y}$ как один сэмпл из некоторого *совместного распределения* $p(x,y)$ на носителе $\mathcal{X}\times\mathcal{Y}$.

Тогда *условное распределение* $p(y\mid x)$ отвечает на вопрос:
$$
p(y\mid x)\ \text{--- как распределён } y,\ \text{если наблюдение } x \text{ зафиксировано.}
$$

Иначе говоря, при фиксированном $x$ функция $p(\cdot\mid x)$ задаёт распределение возможных значений $y$: она показывает, какие $y$ более типичны (имеют большую вероятность/плотность), а какие менее типичны *с учётом условия* $X=x$. Для каждого фиксированного $x$ это корректное распределение по $y$, то есть оно нормировано:
$$
\int_{\mathcal{Y}} p(y\mid x),dy = 1
\quad
\text{(или } \sum_{y\in\mathcal{Y}} p(y\mid x)=1 \text{ в дискретном случае).}
$$

Условное `распределение` и совместное связывает следующий закон:
$$p(x, y) = p(y \mid x) p(x) = p(x \mid y) p(y) \Rightarrow \text{тогда для $p(x, y)$ распределения $p(x)$ и $p(y)$ являются маргинальными распределениями}$$

Маргинализация `распределения` (или же закон выинтигрирования) позволяет получить из совместного `распределения` `маргинальное`  явным образом:
$$p(x) = \int_\mathcal{Y} p(x, y) dy \quad \quad p(y) = \int_\mathcal{X} p(x, y) dx$$

> Схематичный пример того, как выглядит бимодальное распределение объектов $x$, где каждая мода соответствует своему $y$ в совместном распределении $(x, y)$

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def kde_1d(x, grid, bandwidth=None):
    """
    Simple Gaussian KDE (no SciPy).
    x: (n,) samples
    grid: (m,) evaluation points
    bandwidth: if None -> Silverman's rule of thumb
    """
    x = np.asarray(x, dtype=float).ravel()
    n = x.size
    if n < 2:
        raise ValueError("Need at least 2 samples for KDE.")

    if bandwidth is None:
        # Silverman's rule: h = 1.06 * std * n^{-1/5}
        std = np.std(x, ddof=1)
        bandwidth = 1.06 * std * (n ** (-1/5))
        bandwidth = max(bandwidth, 1e-6)

    u = (grid[:, None] - x[None, :]) / bandwidth
    kernel = np.exp(-0.5 * u**2) / np.sqrt(2 * np.pi)
    return kernel.mean(axis=1) / bandwidth

# --- Synthetic data (replace with your own samples if needed) ---
rng = np.random.default_rng(42)
n1, n2 = 1200, 1200

# class 1: slightly narrower
x1 = np.r_[rng.normal(63.2, 2.1, int(0.9 * n1)),
           rng.normal(60.2, 1.6, int(0.1 * n1))]

# class 2: has a longer right tail
x2 = np.r_[rng.normal(68.2, 2.0, int(0.8 * n2)),
           rng.normal(73.0, 2.3, int(0.2 * n2))]

x_all = np.r_[x1, x2]

# --- KDE on a common grid ---
xmin = min(x_all.min(), x1.min(), x2.min()) - 3
xmax = max(x_all.max(), x1.max(), x2.max()) + 3
grid = np.linspace(xmin, xmax, 600)

d1 = kde_1d(x1, grid)
d2 = kde_1d(x2, grid)
d_all = kde_1d(x_all, grid)

# --- Plot (matplotlib only) ---
fig, ax = plt.subplots(figsize=(10, 6), dpi=150)

# Colors similar to the reference figure
c1 = "#ff4d6d"   # pink/red
c2 = "#4dabf7"   # blue

ax.fill_between(grid, 0, d1, alpha=0.35, color=c1)
ax.plot(grid, d1, color=c1, linewidth=1.2, label=r"$p(x \mid y=\mathrm{class}\ 1)$")

ax.fill_between(grid, 0, d2, alpha=0.35, color=c2)
ax.plot(grid, d2, color=c2, linewidth=1.2, label=r"$p(x \mid y=\mathrm{class}\ 2)$")

ax.plot(grid, d_all, color="black", linestyle=(0, (4, 4)), linewidth=1.2, label=r"$p(x)$")

ax.set_xlabel("x")
ax.set_ylabel("Density")
ax.set_xlim(xmin, xmax)
ax.set_ylim(bottom=0)

ax.legend(loc="upper right", frameon=True, fancybox=False, framealpha=1.0, edgecolor="black")
ax.grid(False)

plt.tight_layout()
plt.show()


#### Выборка



Выборка — это набор наблюдений, полученных из одного и того же неизвестного распределения данных в природе. Обычно предполагают, что элементы выборки независимы и одинаково распределены (i.i.d.), и записывают:
$$
x_1,\dots,x_n \sim p^*(x),
$$
где $p^*(x)$ — истинное (неизвестное) распределение. В задаче с разметкой наблюдают пары:
$$
(x_1,y_1),\dots,(x_n,y_n) \sim p^*(x,y).
$$
Задача обучения модели состоит в том, чтобы по выборке построить приближение $p_\theta \approx p^*$ (или соответствующие условные распределения).


#### Моменты случайных величин: матожидание и дисперсия



Помимо самого распределения (массы/плотности), на практике часто удобно описывать случайную величину $X$ через её **моменты** — числовые характеристики, которые компактно суммируют “типичное значение” и “разброс”.

**Математическое ожидание (mean)** — это среднее значение $X$ в смысле вероятностного закона:
$$
\mathbb{E}[X] = \int_{\mathcal{X}} x\,p(x)\,dx
\quad \text{(или } \mathbb{E}[X]=\sum_{x\in\mathcal{X}} x\,p(x)\text{ в дискретном случае).}
$$
Интуитивно $\mathbb{E}[X]$ — это “центр массы” распределения.

**Дисперсия (variance)** измеряет разброс значений вокруг среднего:
$$
\mathrm{Var}(X)=\mathbb{E}\!\left[(X-\mathbb{E}[X])^2\right]
= \int_{\mathcal{X}} (x-\mu)^2\,p(x)\,dx,\quad \mu=\mathbb{E}[X].
$$
Часто используют также **стандартное отклонение** $\sigma=\sqrt{\mathrm{Var}(X)}$, так как оно имеет те же единицы измерения, что и $X$.

Для **векторных** данных $X\in\mathbb{R}^d$ аналогами являются средний вектор и ковариационная матрица:
$$
\mu=\mathbb{E}[X]\in\mathbb{R}^d,\qquad
\Sigma=\mathrm{Cov}(X)=\mathbb{E}\!\left[(X-\mu)(X-\mu)^\top\right]\in\mathbb{R}^{d\times d}.
$$
Диагональные элементы $\Sigma$ — дисперсии отдельных координат, а внедиагональные — их взаимосвязь (корреляция/зависимость).

Наконец, если у нас есть выборка $x_1,\dots,x_n$, то моменты обычно оценивают эмпирически:
$$
\hat\mu=\frac{1}{n}\sum_{i=1}^n x_i,\qquad
\widehat{\mathrm{Var}}(X)=\frac{1}{n-1}\sum_{i=1}^n (x_i-\hat\mu)^2
$$


In [None]:
import numpy as np
import matplotlib.pyplot as plt

def normal_pdf(x, mu, sigma):
    return (1.0 / (np.sqrt(2*np.pi) * sigma)) * np.exp(-0.5 * ((x - mu) / sigma)**2)

# параметры двух распределений: одинаковое матожидание, разная дисперсия
mu = 0.0
sigma1 = 1.0
sigma2 = 2.0

x = np.linspace(-8, 8, 800)
p1 = normal_pdf(x, mu, sigma1)
p2 = normal_pdf(x, mu, sigma2)

fig, ax = plt.subplots(figsize=(10, 5), dpi=150)

ax.plot(x, p1, label=rf"$p_1(x)$: $\mathbb{{E}}[X]={mu}$, $\mathrm{{Var}}(X)={sigma1**2}$")
ax.plot(x, p2, label=rf"$p_2(x)$: $\mathbb{{E}}[X]={mu}$, $\mathrm{{Var}}(X)={sigma2**2}$")

# матожидание
ax.axvline(mu, linewidth=1.6, label=rf"$\mu=\mathbb{{E}}[X]$")

# интервалы +-1 sigma (для каждого распределения своим штрихом)
ax.axvline(mu - sigma1, linestyle="--", linewidth=1.2, label=rf"$\mu\pm\sigma_1$ (narrow)")
ax.axvline(mu + sigma1, linestyle="--", linewidth=1.2)

ax.axvline(mu - sigma2, linestyle=":", linewidth=1.4, label=rf"$\mu\pm\sigma_2$ (wide)")
ax.axvline(mu + sigma2, linestyle=":", linewidth=1.4)

ax.set_xlabel("x")
ax.set_ylabel("Density")
ax.set_title(r"Mean $\mathbb{E}[X]$ and spread via standard deviation $\sigma=\sqrt{\mathrm{Var}(X)}$")
ax.set_ylim(bottom=0)
ax.legend(loc="upper right", frameon=True)

plt.tight_layout()
plt.show()


## Дискриминативный vs. Генеративный ИИ

В вероятностном ML есть два **принципиально разных** режима работы с данными: `дискриминативный` и `генеративный`. Важно сразу зафиксировать: это **не** «классификация vs генерация изображений/текстов/графов (нужное подчеркнуть)». Классификация — лишь частный случай дискриминативного инференса, а генеративные модели часто решают *и* классификацию, и восстановление пропусков, и построение симуляторов — то есть работают как более общий инструмент моделирования данных.

Далее (как и выше) будем считать:
- $x\in\mathcal{X}$ — наблюдаемые признаки объекта (регрессоры),
- $y\in\mathcal{Y}$ — скрытая/целевая характеристика (класс, число, метка, атрибут, условие)


<p align="center">
  <img src="images/lecture_1/discriminative_generative.png" style="width:60%;">
</p>

#### Конструктивное сравнение двух подходов



**`Дискриминативная задача`** формулируется как моделирование условного распределения
$
p_\theta(y\mid x) \Rightarrow
$
мы выучиваем «как распределён $y$, если $x$ зафиксирован». И на инференсе $x$ обязателен: без наблюдения признаков условное распределение просто не информативно для решения данной задачи.

Здесь важно различать два уровня:
1) **вероятностный выход** модели: она возвращает распределение/его параметры (например, вектор вероятностей классов, параметры гауссианы и т.п.);
2) **предсказание** как детерминированное решение на основе распределения, например
$$
\hat y(x)=\arg\max_{y\in\mathcal{Y}} p_\theta(y\mid x)\quad\text{(MAP-оценка)}, \quad \text{или} \quad \hat y(x)=\mathbb{E}_\theta[y\mid x]\quad\text{(регрессия/байесовская точечная оценка).}
$$


> То есть дискриминативная модель по сути отвечает на вопрос: *«что можно утверждать про объект, если мы его уже наблюдаем?»*




**`Генеративная задача`** ориентирована на моделирование распределения самих наблюдаемых данных. В зависимости от контекста моделируют:
- безусловное $p_\theta(x)$,
- совместное $p_\theta(x,y)$,
- или условное $p_\theta(x\mid y)$ (контролируемая генерация).

Ключевым критерием выступает умение модели **семплировать**/создавать новые наблюдения,
$$
x\sim p_\theta(x) \qquad\text{или}\qquad x \mid y \sim p_\theta(x\mid y),
$$
такие, что они статистически согласованы с истинным распределением данных. Модель должна уметь **воспроизводить структуру множества объектов** и порождать новые экземпляры этой структуры. 


#### Связь подходов. Теорема Байеса


Ключевая связь между двумя парадигмами возникает, когда мы моделируем **совместный закон `распределения`** $p_\theta(x,y)$. Тогда дискриминативное распределение получается как *производное* по формуле Байеса:
$$
p_\theta(y\mid x) \;=\; \frac{p_\theta(x, y)}{p_\theta(x)} = \frac{p_\theta(x\mid y)\,p_\theta(y)}{p_\theta(x)},
\qquad
p_\theta(x)=\underbrace{\sum_{y\in\mathcal{Y}} p_\theta(x\mid y)\,p_\theta(y)}_{\text{(дискретный \(y\))}} = \underbrace{\int p_\theta(x\mid y)\,p_\theta(y)\,dy}_\text{(непрерывный \(y\))}
$$


#### Сравнение обучения моделей двух подходов

В дискриминативном обучении (при разметке) естественная цель — максимизация *условного* лог-правдоподобия:
$$
\hat\theta=\arg\max_\theta \sum_{i=1}^n \log p_\theta(y_i\mid x_i),
$$
что в классификации эквивалентно минимизации кросс-энтропии (или логистический лосс в случае бинарной классификации), а в регрессии — взятию функции правдоподобия под выбранную модель шума (гауссовский $\Rightarrow$ MSE, лапласовский $\Rightarrow$ MAE и т.д.).

В генеративном обучении мы исходим из гипотезы, что данные порождены некоторым неизвестным распределением $p^*(x)$, и хотим построить параметрическое приближение $p_\theta(x)$, которое (в пределе бесконечных данных) согласуется с $p^*$ настолько, насколько это возможно в выбранном семействе моделей.

Пусть $x_1,\dots,x_n \stackrel{i.i.d.}{\sim} p^*(x)$. Конструктивная цель — максимизировать правдоподобие наблюдаемой выборки:
$$
\hat\theta_n=\arg\max_{\theta\in\Theta}\sum_{i=1}^n \log p_\theta(x_i)
\quad\Longleftrightarrow\quad
\arg\min_{\theta\in\Theta}\; \hat L_n(\theta) = -\frac1n\sum_{i=1}^n \log p_\theta(x_i).
$$

При $n\to\infty$ по закону больших чисел $\hat L_n(\theta)\to L(\theta)$, где
$$
L(\theta)=\mathbb{E}_{x\sim p^*}\big[-\log p_\theta(x)\big].
$$

Далее получаем стандартное разложение:
$$
\begin{align*}
L(\theta)
&=\mathbb{E}_{p^*}\big[-\log p_\theta(x)\big] \\
&=\mathbb{E}_{p^*}\big[-\log p^*(x)\big] + \mathbb{E}_{p^*}\Big[\log \frac{p^*(x)}{p_\theta(x)}\Big] \\
&=\int_{\mathcal{X}} p^*(x)\,\big[-\log p^*(x)\big]\,dx \;+\; \int_{\mathcal{X}} p^*(x)\,\log\!\frac{p^*(x)}{p_\theta(x)}\,dx \\
&=H(p^*)+\mathrm{KL}\!\left(p^*\|p_\theta\right).
\end{align*}
$$

Поскольку $H(p^*)$ не зависит от $\theta$, минимизация $L(\theta)$ эквивалентна минимизации $\mathrm{KL}(p^*\|p_\theta)$. Значит, максимизируя $\sum_{i=1}^n \log p_\theta(x_i)$, мы подгоняем выучиваемое распределение $p_\theta$ под истинное $p^*$ в KL-смысле — ровно то, что и является целью генеративного обучения.


### Визуализация обучения Дискриминативного и Генеративного ИИ

In [None]:
import numpy as np

# -----------------------------
# 1) Discriminative: Logistic regression via gradient descent (parameter path)
# -----------------------------
import numpy as np

def logistic_regression_gd_path(
    X1, X2,
    lr=0.1, n_iters=200, l2=0.0, fit_intercept=True, seed=0,
    lr_mode="constant",            # <-- NEW: "constant" or "warmup"
    warmup_power=2.0,              # <-- NEW: controls how slow the start is
    lr_max=None                    # <-- NEW: if None, estimated automatically
):
    X1 = np.asarray(X1, dtype=float)
    X2 = np.asarray(X2, dtype=float)
    if X1.ndim != 2 or X2.ndim != 2:
        raise ValueError("X1 and X2 must be 2D arrays of shape (n, d).")
    if X1.shape[1] != X2.shape[1]:
        raise ValueError("X1 and X2 must have the same feature dimension d.")

    X = np.vstack([X1, X2])
    y = np.concatenate([np.zeros(len(X1)), np.ones(len(X2))])

    n, d = X.shape
    rng = np.random.default_rng(seed)

    w = 0.01 * rng.standard_normal(d)
    b = 0.0

    def sigmoid(z):
        z = np.clip(z, -50.0, 50.0)
        return 1.0 / (1.0 + np.exp(-z))

    def nll(w, b):
        z = X @ w + (b if fit_intercept else 0.0)
        p = sigmoid(z)
        eps = 1e-12
        loss = -np.mean(y * np.log(p + eps) + (1 - y) * np.log(1 - p + eps))
        if l2 > 0:
            loss += 0.5 * l2 * np.sum(w * w)
        return loss

    # ---- NEW: choose lr_max automatically if warmup requested ----
    if lr_mode == "warmup" and lr_max is None:
        # Lipschitz upper bound using Frobenius norm
        fro2 = np.sum(X * X)  # ||X||_F^2
        L = 0.25 * (fro2 / n) + l2
        lr_max = 0.9 / (L + 1e-12)

    w_path = np.zeros((n_iters + 1, d))
    b_path = np.zeros(n_iters + 1)
    loss_path = np.zeros(n_iters + 1)

    w_path[0] = w
    b_path[0] = b
    loss_path[0] = nll(w, b)

    for t in range(1, n_iters + 1):
        z = X @ w + (b if fit_intercept else 0.0)
        p = sigmoid(z)

        grad_w = (X.T @ (p - y)) / n
        if l2 > 0:
            grad_w += l2 * w

        grad_b = np.mean(p - y) if fit_intercept else 0.0

        # ---- NEW: step size schedule ----
        if lr_mode == "constant":
            step = lr
        elif lr_mode == "warmup":
            # step grows from ~0 to lr_max, making learning visibly gradual
            frac = (t / n_iters) ** warmup_power
            step = float(lr_max) * frac
        else:
            raise ValueError("lr_mode must be 'constant' or 'warmup'.")

        w -= step * grad_w
        if fit_intercept:
            b -= step * grad_b

        w_path[t] = w
        b_path[t] = b
        loss_path[t] = nll(w, b)

    return w_path, b_path, loss_path


# -----------------------------
# 2) Generative: 2-Gaussian Mixture via EM + sampling
# -----------------------------
def sample_from_gmm(pi, mu, Sigma, n_samples=200, seed=0):
    """
    Sample points from a 2-component Gaussian mixture.
    Returns samples (n_samples,d) and component labels z (n_samples,).
    """
    pi = np.asarray(pi, float)
    mu = np.asarray(mu, float)
    Sigma = np.asarray(Sigma, float)

    rng = np.random.default_rng(seed)
    z = rng.choice(2, size=n_samples, p=pi)

    d = mu.shape[1]
    samples = np.zeros((n_samples, d), float)
    for k in range(2):
        idx = np.where(z == k)[0]
        if len(idx) == 0:
            continue
        samples[idx] = rng.multivariate_normal(mean=mu[k], cov=Sigma[k], size=len(idx))
    return samples, z


def gmm2_em_and_sample(X1, X2, n_iters=100, tol=1e-6, reg_covar=1e-6, n_samples=200, seed=0):
    """
    Fit a 2-component Gaussian Mixture Model using EM on the union of X1 and X2.
    Uses X1 and X2 only for *initialization* (means/covariances/mixture weights).

    Parameters
    ----------
    X1, X2 : array-like
        Two sets of points, shapes (n1, d), (n2, d).
    n_iters : int
        Max EM iterations.
    tol : float
        Convergence tolerance on log-likelihood improvement.
    reg_covar : float
        Diagonal regularizer added to covariances for numerical stability.
    n_samples : int
        Number of points to sample from the learned mixture.
    seed : int
        Random seed.

    Returns
    -------
    params : dict
        {
          "pi": (2,),
          "mu": (2, d),
          "Sigma": (2, d, d),
          "loglik_path": (T,)   # T <= n_iters
        }
    samples : np.ndarray
        Shape (n_samples, d), sampled from the learned mixture.
    responsibilities : np.ndarray
        Shape (n, 2), final responsibilities r_{ik}.
    """
    X1 = np.asarray(X1, dtype=float)
    X2 = np.asarray(X2, dtype=float)
    if X1.ndim != 2 or X2.ndim != 2:
        raise ValueError("X1 and X2 must be 2D arrays of shape (n, d).")
    if X1.shape[1] != X2.shape[1]:
        raise ValueError("X1 and X2 must have the same feature dimension d.")

    X = np.vstack([X1, X2])
    n, d = X.shape
    rng = np.random.default_rng(seed)

    # ----- Initialization (using the two sets) -----
    mu = np.stack([X1.mean(axis=0), X2.mean(axis=0)], axis=0)  # (2, d)
    def empirical_cov(A):
        A = np.asarray(A, float)
        if len(A) <= 1:
            return np.eye(d)
        C = np.cov(A.T, bias=False)
        if C.ndim == 0:  # d=1 edge case
            C = np.array([[float(C)]])
        return C

    Sigma = np.stack([empirical_cov(X1), empirical_cov(X2)], axis=0)  # (2, d, d)
    Sigma += reg_covar * np.eye(d)[None, :, :]

    pi = np.array([len(X1), len(X2)], dtype=float)
    pi = pi / pi.sum()

    # Helpers
    def log_gaussian_pdf(X, m, S):
        """
        log N(x | m, S) for each row in X. Returns shape (n,)
        """
        # Cholesky for stability
        L = np.linalg.cholesky(S)
        # Solve (x-m) in whitened coordinates: v = L^{-1} (x-m)
        Xm = X - m
        v = np.linalg.solve(L, Xm.T)  # (d, n)
        quad = np.sum(v * v, axis=0)  # (n,)
        log_det = 2.0 * np.sum(np.log(np.diag(L)))
        return -0.5 * (d * np.log(2.0 * np.pi) + log_det + quad)

    def logsumexp(a, axis=1):
        amax = np.max(a, axis=axis, keepdims=True)
        return (amax + np.log(np.sum(np.exp(a - amax), axis=axis, keepdims=True))).squeeze(axis)

    loglik_path = []
    prev_ll = -np.inf

    for it in range(n_iters):
        # ----- E-step -----
        # log r_ik proportional to log pi_k + log N(x_i | mu_k, Sigma_k)
        log_r = np.zeros((n, 2))
        for k in range(2):
            log_r[:, k] = np.log(pi[k] + 1e-16) + log_gaussian_pdf(X, mu[k], Sigma[k])

        ll = np.sum(logsumexp(log_r, axis=1))
        loglik_path.append(ll)

        # Normalize responsibilities
        log_norm = logsumexp(log_r, axis=1)  # (n,)
        r = np.exp(log_r - log_norm[:, None])  # (n,2)

        # Convergence check
        if it > 0 and abs(ll - prev_ll) < tol * (1.0 + abs(prev_ll)):
            break
        prev_ll = ll

        # ----- M-step -----
        Nk = r.sum(axis=0) + 1e-16  # (2,)
        pi = Nk / n

        # Update means
        mu = (r.T @ X) / Nk[:, None]  # (2,d)

        # Update covariances
        Sigma_new = np.zeros((2, d, d))
        for k in range(2):
            Xm = X - mu[k]  # (n,d)
            # Weighted covariance: sum_i r_ik (x_i-mu)(x_i-mu)^T / Nk
            Sigma_new[k] = (Xm.T * r[:, k]) @ Xm / Nk[k]
            Sigma_new[k] += reg_covar * np.eye(d)
        Sigma = Sigma_new

    # ----- Sampling from the learned mixture -----
    # Choose component indices
    z = rng.choice(2, size=n_samples, p=pi)
    samples = np.zeros((n_samples, d))
    for k in range(2):
        idx = np.where(z == k)[0]
        if len(idx) == 0:
            continue
        samples[idx] = rng.multivariate_normal(mean=mu[k], cov=Sigma[k], size=len(idx))

    params = {
        "pi": pi,
        "mu": mu,
        "Sigma": Sigma,
        "loglik_path": np.array(loglik_path, dtype=float),
    }
    return params, samples, r


# -----------------------------
# Optional: convenience (2D line from hyperplane)
# -----------------------------
def line_from_hyperplane_2d(w, b, x_min, x_max, n=200):
    """
    For d=2, returns points on the decision boundary w^T x + b = 0:
    y = -(w0/w1) x - b/w1
    """
    w = np.asarray(w, dtype=float)
    if w.shape[0] != 2:
        raise ValueError("This helper is only for 2D (w must have shape (2,)).")
    xs = np.linspace(x_min, x_max, n)
    if abs(w[1]) < 1e-12:
        # vertical line: w0 x + b = 0 -> x = -b/w0
        x0 = -b / (w[0] + 1e-12)
        ys = np.linspace(-1.0, 1.0, n)
        xs = np.full_like(ys, x0)
    else:
        ys = -(w[0] * xs + b) / w[1]
    return xs, ys


In [None]:
import numpy as np
import matplotlib.pyplot as plt


# -----------------------------
# Helpers: robustly unpack outputs
# -----------------------------
def _as_logreg_bundle(logreg_out):
    """
    Accept either:
      - dict with keys: X1, X2, w_path, b_path (loss_path optional)
      - tuple: (w_path, b_path, loss_path, X1, X2) or (w_path, b_path, loss_path) + pass X1,X2 separately (not supported here)
    """
    if isinstance(logreg_out, dict):
        required = ["X1", "X2", "w_path", "b_path"]
        for k in required:
            if k not in logreg_out:
                raise ValueError(f"logreg_out dict must contain key '{k}'.")
        X1 = np.asarray(logreg_out["X1"], float)
        X2 = np.asarray(logreg_out["X2"], float)
        w_path = np.asarray(logreg_out["w_path"], float)
        b_path = np.asarray(logreg_out["b_path"], float)
        loss_path = np.asarray(logreg_out.get("loss_path", []), float)
        return X1, X2, w_path, b_path, loss_path

    if isinstance(logreg_out, (tuple, list)) and len(logreg_out) >= 5:
        w_path, b_path, loss_path, X1, X2 = logreg_out[:5]
        return np.asarray(X1, float), np.asarray(X2, float), np.asarray(w_path, float), np.asarray(b_path, float), np.asarray(loss_path, float)

    raise ValueError("Unsupported logreg_out format. Use dict or tuple (w_path,b_path,loss_path,X1,X2).")


def _as_gmm_bundle(gmm_out):
    """
    Accept either:
      - dict with keys: X1, X2, params, samples (responsibilities optional)
      - tuple: (params, samples, responsibilities, X1, X2) or (params, samples, responsibilities) + pass X1,X2 separately (not supported here)
    """
    if isinstance(gmm_out, dict):
        required = ["X1", "X2", "params", "samples"]
        for k in required:
            if k not in gmm_out:
                raise ValueError(f"gmm_out dict must contain key '{k}'.")
        X1 = np.asarray(gmm_out["X1"], float)
        X2 = np.asarray(gmm_out["X2"], float)
        params = gmm_out["params"]
        samples = np.asarray(gmm_out["samples"], float)
        r = gmm_out.get("responsibilities", None)
        if r is not None:
            r = np.asarray(r, float)
        return X1, X2, params, samples, r

    if isinstance(gmm_out, (tuple, list)) and len(gmm_out) >= 5:
        params, samples, r, X1, X2 = gmm_out[:5]
        return np.asarray(X1, float), np.asarray(X2, float), params, np.asarray(samples, float), (None if r is None else np.asarray(r, float))

    raise ValueError("Unsupported gmm_out format. Use dict or tuple (params,samples,r,X1,X2).")


def _clamp_index(t, T):
    if T <= 0:
        return 0
    return int(max(0, min(int(t), T - 1)))


def _cov_ellipse_points(mu, Sigma, n=200, nsig=2.0):
    """
    Return x,y points of an ellipse corresponding to nsig standard deviations
    for a 2D Gaussian N(mu, Sigma).
    """
    mu = np.asarray(mu, float).reshape(2,)
    Sigma = np.asarray(Sigma, float).reshape(2, 2)

    # Eigen-decomposition
    vals, vecs = np.linalg.eigh(Sigma)
    vals = np.maximum(vals, 1e-15)

    # Parametric circle
    theta = np.linspace(0, 2*np.pi, n)
    circle = np.stack([np.cos(theta), np.sin(theta)], axis=0)  # (2, n)

    # Scale circle to ellipse: vecs @ diag(sqrt(vals)) @ circle
    A = vecs @ (np.diag(np.sqrt(vals)) * nsig)
    ell = (A @ circle).T + mu[None, :]  # (n,2)
    return ell[:, 0], ell[:, 1]


# -----------------------------
# 1) Plot: Logistic regression boundary at timestamp t
# -----------------------------
def plot_logreg_snapshot(logreg_out, t, ax=None, dims=(0, 1), padding=0.08, title=True):
    """
    Plot a snapshot of logistic regression decision boundary at iteration t.
    Expects d>=2 for visualization; uses dims=(i,j) projection.

    logreg_out: dict with X1, X2, w_path, b_path (loss_path optional), or tuple.
    """
    X1, X2, w_path, b_path, loss_path = _as_logreg_bundle(logreg_out)

    if X1.shape[1] < 2:
        raise ValueError("Need at least 2D points for a 2D boundary plot.")
    i, j = dims

    T = w_path.shape[0]
    t = _clamp_index(t, T)
    w = w_path[t]
    b = float(b_path[t])

    # Project data and weights
    X1p = X1[:, [i, j]]
    X2p = X2[:, [i, j]]
    wp = np.asarray([w[i], w[j]], float)

    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=(6, 5))

    ax.scatter(X1p[:, 0], X1p[:, 1], s=18, label="X1 (y=0)")
    ax.scatter(X2p[:, 0], X2p[:, 1], s=18, label="X2 (y=1)")

    # Compute plot bounds
    Xall = np.vstack([X1p, X2p])
    xmin, ymin = Xall.min(axis=0)
    xmax, ymax = Xall.max(axis=0)
    dx, dy = xmax - xmin, ymax - ymin
    xmin -= padding * (dx + 1e-12)
    xmax += padding * (dx + 1e-12)
    ymin -= padding * (dy + 1e-12)
    ymax += padding * (dy + 1e-12)

    # Decision boundary: wp[0]*x + wp[1]*y + b = 0
    xs = np.linspace(xmin, xmax, 200)
    if abs(wp[1]) < 1e-12:
        # vertical line
        x0 = -b / (wp[0] + 1e-12)
        ax.plot([x0, x0], [ymin, ymax], linewidth=2)
    else:
        ys = -(wp[0] * xs + b) / wp[1]
        ax.plot(xs, ys, linewidth=2)

    ax.set_xlim(xmin, xmax)
    ax.set_ylim(ymin, ymax)
    ax.set_xlabel(f"x[{i}]")
    ax.set_ylabel(f"x[{j}]")

    if title:
        if loss_path.size == 0:
            ax.set_title(f"Logistic regression boundary")
        else:
            # loss_path might be length T, but be robust
            lt = loss_path[t] if t < len(loss_path) else np.nan
            ax.set_title(f"Logistic regression boundary")

    ax.legend(frameon=False)
    ax.grid(True, linewidth=0.5)
    return ax


# -----------------------------
# 2) Plot: GMM snapshot at timestamp t
# -----------------------------
def plot_gmm_snapshot(
    gmm_out, t, ax=None, dims=(0, 1), padding=0.08, title=True,
    show_samples=True, ellipse_nsig=(1.0, 2.0),
    n_samples=250, sample_seed=0, color_samples=True
):
    """
    Plot a 2-component GMM snapshot.
    FIX: samples are generated from the parameters at the selected timestamp t.

    If params has history: use params at iter t.
    Otherwise: use final params.
    """
    X1, X2, params, _samples_unused, r = _as_gmm_bundle(gmm_out)

    if X1.shape[1] < 2:
        raise ValueError("Need at least 2D points for a 2D GMM plot.")
    i, j = dims

    if ax is None:
        fig, ax = plt.subplots(1, 1, figsize=(6, 5))

    # --- Scatter data and capture their colors ---
    X1p = X1[:, [i, j]]
    X2p = X2[:, [i, j]]
    sc1 = ax.scatter(X1p[:, 0], X1p[:, 1], s=18, label="X1")
    sc2 = ax.scatter(X2p[:, 0], X2p[:, 1], s=18, label="X2")

    def _pick_color(sc):
        fc = sc.get_facecolor()
        if fc is not None and len(fc) > 0:
            return fc[0]
        ec = sc.get_edgecolor()
        if ec is not None and len(ec) > 0:
            return ec[0]
        return None

    col_X1 = _pick_color(sc1)
    col_X2 = _pick_color(sc2)

    # --- Choose which parameters to plot ---
    if "history" in params and params["history"] is not None:
        hist = params["history"]
        pi_path = np.asarray(hist["pi_path"], float)
        mu_path = np.asarray(hist["mu_path"], float)
        Sig_path = np.asarray(hist["Sigma_path"], float)
        T = pi_path.shape[0]
        tt = _clamp_index(t, T)
        pi = pi_path[tt]
        mu = mu_path[tt]
        Sigma = Sig_path[tt]
        used_t = tt
    else:
        pi = np.asarray(params["pi"], float)
        mu = np.asarray(params["mu"], float)
        Sigma = np.asarray(params["Sigma"], float)
        used_t = None
        tt = 0  # for deterministic seed below

    # --- Stabilize component<->dataset color mapping by mean proximity ---
    m1 = X1.mean(axis=0)
    m2 = X2.mean(axis=0)
    c_01 = np.linalg.norm(mu[0] - m1) + np.linalg.norm(mu[1] - m2)
    c_10 = np.linalg.norm(mu[0] - m2) + np.linalg.norm(mu[1] - m1)
    if c_01 <= c_10:
        comp_color = [col_X1, col_X2]
    else:
        comp_color = [col_X2, col_X1]

    # --- Plot means and ellipses with fixed colors ---
    for k in range(2):
        color = comp_color[k]
        muk = mu[k, [i, j]]
        Sigk = Sigma[k][np.ix_([i, j], [i, j])]

        ax.scatter([muk[0]], [muk[1]], s=60, marker="x", linewidths=2, color='black')

        for ns in ellipse_nsig:
            ex, ey = _cov_ellipse_points(muk, Sigk, nsig=float(ns))
            ax.plot(ex, ey, linewidth=1.5, color=color)

    # --- FIX: sample from CURRENT (pi,mu,Sigma) at timestamp t ---
    if show_samples and n_samples > 0:
        # deterministic per-timestamp seed so samples don't jump around
        seed_t = int(sample_seed) + 10_000 * int(tt)
        samples_t, z_t = sample_from_gmm(pi, mu, Sigma, n_samples=n_samples, seed=seed_t)
        sp = samples_t[:, [i, j]]

        if color_samples:
            # color samples by component, using the same fixed colors as ellipses
            for k in range(2):
                idx = np.where(z_t == k)[0]
                if len(idx) == 0:
                    continue
                ax.scatter(sp[idx, 0], sp[idx, 1], s=10, marker=".", color=comp_color[k], alpha=0.8)
        else:
            ax.scatter(sp[:, 0], sp[:, 1], s=10, marker=".", label="samples")

    # --- Bounds ---
    Xall = np.vstack([X1p, X2p])
    xmin, ymin = Xall.min(axis=0)
    xmax, ymax = Xall.max(axis=0)
    dx, dy = xmax - xmin, ymax - ymin
    xmin -= padding * (dx + 1e-12)
    xmax += padding * (dx + 1e-12)
    ymin -= padding * (dy + 1e-12)
    ymax += padding * (dy + 1e-12)
    ax.set_xlim(xmin, xmax)
    ax.set_ylim(ymin, ymax)

    ax.set_xlabel(f"x[{i}]")
    ax.set_ylabel(f"x[{j}]")
    ax.grid(True, linewidth=0.5)

    if title:
        ax.set_title("2-GMM (EM) snapshot")

    ax.legend(frameon=False)
    return ax


# -----------------------------
# 3) Plot both paradigms in one row
# -----------------------------
def plot_two_paradigms_row(logreg_out, gmm_out, t, dims=(0, 1), figsize=(12, 5), share_limits=False):
    """
    Draw two plots in a single row:
      - Left: discriminative logistic regression boundary @ t
      - Right: generative GMM fit @ t (or final if history absent)

    If share_limits=True, both axes use the same x/y limits (based on combined data).
    """
    fig, axes = plt.subplots(1, 2, figsize=figsize)

    ax0 = plot_logreg_snapshot(logreg_out, t=t, ax=axes[0], dims=dims, title=True)
    ax1 = plot_gmm_snapshot(gmm_out, t=t, ax=axes[1], dims=dims, title=True)

    if share_limits:
        # Compute combined limits
        X1, X2, _, _, _ = _as_logreg_bundle(logreg_out)
        Xall = np.vstack([X1[:, list(dims)], X2[:, list(dims)]])
        xmin, ymin = Xall.min(axis=0)
        xmax, ymax = Xall.max(axis=0)
        dx, dy = xmax - xmin, ymax - ymin
        pad = 0.08
        xmin -= pad * (dx + 1e-12); xmax += pad * (dx + 1e-12)
        ymin -= pad * (dy + 1e-12); ymax += pad * (dy + 1e-12)
        ax0.set_xlim(xmin, xmax); ax0.set_ylim(ymin, ymax)
        ax1.set_xlim(xmin, xmax); ax1.set_ylim(ymin, ymax)

    fig.tight_layout()
    return fig, axes


In [None]:
import numpy as np
import matplotlib.pyplot as plt

import ipywidgets as widgets
from IPython.display import display, clear_output


# -----------------------------
# Dataset generator (as requested)
# -----------------------------
def create_dataset(n1=200, n2=200, d=2, separation=2.5, cov_scale=0.8, rotate_deg=25.0, seed=0):
    """
    Generate two point clouds X1 and X2 in R^d (default d=2) with controllable overlap.

    Returns
    -------
    X1 : (n1, d)
    X2 : (n2, d)
    """
    rng = np.random.default_rng(seed)

    if d != 2:
        # For simplicity of visualization, we stick to d=2 by default.
        # You can generalize means/covariances for any d if needed.
        raise ValueError("This create_dataset is set for d=2 to support 2D visualization.")

    # Base covariance
    S0 = cov_scale * np.array([[1.0, 0.6],
                               [0.6, 1.4]])

    # Rotation
    theta = np.deg2rad(rotate_deg)
    R = np.array([[np.cos(theta), -np.sin(theta)],
                  [np.sin(theta),  np.cos(theta)]])
    S1 = R @ S0 @ R.T
    S2 = R.T @ S0 @ R  # slightly different orientation

    # Means
    mu1 = np.array([-separation/2, 0.0])
    mu2 = np.array([+separation/2, 0.0])

    X1 = rng.multivariate_normal(mu1, S1, size=n1)
    X2 = rng.multivariate_normal(mu2, S2, size=n2)
    return X1, X2


# -----------------------------
# EM for 2-GMM WITH history (so sliders truly move parameters)
# -----------------------------
def gmm2_em_and_sample_history(
    X1, X2,
    n_iters=80, tol=1e-6, reg_covar=1e-6, n_samples=200, seed=0,
    init="collapsed",          # <-- NEW: "from_sets" (fast), "random", "collapsed" (slow+pedagogical)
    damping=0.25,              # <-- NEW: alpha in (0,1], smaller => slower
    early_stop=False           # <-- NEW: for demos keep False to run full n_iters
):
    X1 = np.asarray(X1, float)
    X2 = np.asarray(X2, float)
    X = np.vstack([X1, X2])
    n, d = X.shape
    rng = np.random.default_rng(seed)

    # ---------- Initialization ----------
    if init == "from_sets":
        mu = np.stack([X1.mean(axis=0), X2.mean(axis=0)], axis=0)

        def empirical_cov(A):
            if len(A) <= 1:
                return np.eye(d)
            C = np.cov(A.T, bias=False)
            if C.ndim == 0:
                C = np.array([[float(C)]])
            return C

        Sigma = np.stack([empirical_cov(X1), empirical_cov(X2)], axis=0)
        Sigma += reg_covar * np.eye(d)[None, :, :]
        pi = np.array([len(X1), len(X2)], float)
        pi = pi / pi.sum()

    elif init == "random":
        idx = rng.choice(n, size=2, replace=False)
        mu = X[idx].copy()
        C = np.cov(X.T, bias=False)
        if C.ndim == 0:
            C = np.array([[float(C)]])
        Sigma = np.stack([C, C], axis=0) + reg_covar * np.eye(d)[None, :, :]
        pi = np.array([0.5, 0.5], float)

    elif init == "collapsed":
        # both components start near the global mean -> EM must "split" them gradually
        m = X.mean(axis=0)
        jitter = 0.05 * rng.standard_normal((2, d))
        mu = np.stack([m, m], axis=0) + jitter

        C = np.cov(X.T, bias=False)
        if C.ndim == 0:
            C = np.array([[float(C)]])
        Sigma = np.stack([C, C], axis=0) + reg_covar * np.eye(d)[None, :, :]
        pi = np.array([0.5, 0.5], float)

    elif init == "pca_split":
        # Start both components near global mean but separated along the principal direction.
        m = X.mean(axis=0)

        C = np.cov(X.T, bias=False)
        if C.ndim == 0:
            C = np.array([[float(C)]])

        # principal eigenvector
        vals, vecs = np.linalg.eigh(C)
        v = vecs[:, -1]                       # (d,)
        s = np.sqrt(max(vals[-1], 1e-12))     # scale of spread along principal axis

        # delta controls how "far" the initial means are; smaller -> slower but still breaks symmetry
        split_scale = 0.35                    # try 0.25..0.6
        delta = split_scale * s

        mu = np.stack([m - delta * v, m + delta * v], axis=0)

        # same covariance for both components at start (uninformative)
        Sigma = np.stack([C, C], axis=0) + reg_covar * np.eye(d)[None, :, :]

        # slightly asymmetric mixing weights to break symmetry further
        pi = np.array([0.55, 0.45], float)

    else:
        raise ValueError("init must be one of: 'from_sets', 'random', 'collapsed'.")

    # ---------- Helpers ----------
    def log_gaussian_pdf(X, m, S):
        L = np.linalg.cholesky(S)
        Xm = X - m
        v = np.linalg.solve(L, Xm.T)
        quad = np.sum(v * v, axis=0)
        log_det = 2.0 * np.sum(np.log(np.diag(L)))
        return -0.5 * (d * np.log(2.0 * np.pi) + log_det + quad)

    def logsumexp(a, axis=1):
        amax = np.max(a, axis=axis, keepdims=True)
        return (amax + np.log(np.sum(np.exp(a - amax), axis=axis, keepdims=True))).squeeze(axis)

    # ---------- History ----------
    loglik_path = []
    pi_path, mu_path, Sigma_path = [], [], []

    prev_ll = -np.inf

    for it in range(n_iters):
        # store snapshot (so slider can show evolution)
        pi_path.append(pi.copy())
        mu_path.append(mu.copy())
        Sigma_path.append(Sigma.copy())

        # E-step
        log_r = np.zeros((n, 2))
        for k in range(2):
            log_r[:, k] = np.log(pi[k] + 1e-16) + log_gaussian_pdf(X, mu[k], Sigma[k])

        ll = float(np.sum(logsumexp(log_r, axis=1)))
        loglik_path.append(ll)

        log_norm = logsumexp(log_r, axis=1)
        r = np.exp(log_r - log_norm[:, None])

        # optional early stop (disabled by default for pedagogy)
        if early_stop and it > 0 and abs(ll - prev_ll) < tol * (1.0 + abs(prev_ll)):
            break
        prev_ll = ll

        # M-step (compute "new" params)
        Nk = r.sum(axis=0) + 1e-16
        pi_new = Nk / n
        mu_new = (r.T @ X) / Nk[:, None]

        Sigma_new = np.zeros((2, d, d))
        for k in range(2):
            Xm = X - mu_new[k]
            Sigma_new[k] = (Xm.T * r[:, k]) @ Xm / Nk[k]
            Sigma_new[k] += reg_covar * np.eye(d)

        # ---- NEW: damped update ----
        a = float(damping)
        pi = (1 - a) * pi + a * pi_new
        pi = pi / np.sum(pi)  # renormalize

        mu = (1 - a) * mu + a * mu_new
        Sigma = (1 - a) * Sigma + a * Sigma_new

    # final snapshot
    pi_path.append(pi.copy())
    mu_path.append(mu.copy())
    Sigma_path.append(Sigma.copy())

    # Sampling from final mixture
    z = rng.choice(2, size=n_samples, p=pi)
    samples = np.zeros((n_samples, d))
    for k in range(2):
        idx = np.where(z == k)[0]
        if len(idx) == 0:
            continue
        samples[idx] = rng.multivariate_normal(mu[k], Sigma[k], size=len(idx))

    params = {
        "pi": pi,
        "mu": mu,
        "Sigma": Sigma,
        "loglik_path": np.array(loglik_path, float),
        "history": {
            "pi_path": np.array(pi_path, float),
            "mu_path": np.array(mu_path, float),
            "Sigma_path": np.array(Sigma_path, float),
        }
    }
    return params, samples, r



# -----------------------------
# Widget tool
# -----------------------------
def build_discriminative_vs_generative_widget(
    *,
    dataset_kwargs=None,
    logreg_kwargs=None,
    gmm_kwargs=None,
    dims=(0, 1),
    share_limits=True,
):
    """
    Creates an ipywidgets UI:
      - Button "Пересчитать"
      - Slider for LR iteration
      - Slider for EM iteration
      - 2 plots in a row (discriminative boundary + generative GMM)

    Notes:
      - Requires logistic_regression_gd_path, plot_logreg_snapshot, plot_gmm_snapshot already defined.
      - Uses gmm2_em_and_sample_history defined above.
    """
    if dataset_kwargs is None:
        dataset_kwargs = dict(n1=100, n2=100, d=2, separation=2.6, cov_scale=0.9, rotate_deg=20.0, seed=0)
    if logreg_kwargs is None:
        logreg_kwargs = dict(
            lr=0.1,
            n_iters=20,
            l2=1e-2,
            fit_intercept=True,
            seed=0,
            lr_mode="warmup",
            warmup_power=3.0,
            lr_max=None
        )
    if gmm_kwargs is None:
        gmm_kwargs = gmm_kwargs = dict(
            n_iters=100,
            tol=1e-6,
            reg_covar=1e-6,
            n_samples=250,
            seed=0,
            init="pca_split",
            damping=0.20,
            early_stop=False
        )

    # UI widgets
    btn = widgets.Button(description="Пересчитать", button_style="primary", icon="refresh")

    t_lr = widgets.IntSlider(description="iter LR", min=0, max=1, step=1, value=0, continuous_update=False)
    t_em = widgets.IntSlider(description="iter EM", min=0, max=1, step=1, value=0, continuous_update=False)

    out = widgets.Output()

    # Internal state
    state = {
        "seed": int(dataset_kwargs.get("seed", 0)),
        "logreg_out": None,
        "gmm_out": None,
    }

    def recompute():
        # Update seeds deterministically (so each click changes dataset)
        state["seed"] += 1
        ds_kwargs = dict(dataset_kwargs)
        ds_kwargs["seed"] = state["seed"]

        # Generate data
        X1, X2 = create_dataset(**ds_kwargs)

        # Train discriminative model (uses YOUR earlier function)
        w_path, b_path, loss_path = logistic_regression_gd_path(X1, X2, **logreg_kwargs)
        logreg_out = {"X1": X1, "X2": X2, "w_path": w_path, "b_path": b_path, "loss_path": loss_path}

        # Train generative model with EM history
        params, samples, r = gmm2_em_and_sample_history(X1, X2, **gmm_kwargs)
        gmm_out = {"X1": X1, "X2": X2, "params": params, "samples": samples, "responsibilities": r}

        state["logreg_out"] = logreg_out
        state["gmm_out"] = gmm_out

        # Update slider ranges
        t_lr.max = w_path.shape[0] - 1
        t_lr.value = min(t_lr.value, t_lr.max)

        # For EM, history length defines slider max
        T_em = params["history"]["pi_path"].shape[0]
        t_em.max = T_em - 1
        t_em.value = min(t_em.value, t_em.max)

        draw()

    def draw():
        if state["logreg_out"] is None or state["gmm_out"] is None:
            return

        with out:
            clear_output(wait=True)

            fig, axes = plt.subplots(1, 2, figsize=(12, 5))

            # Left: discriminative snapshot
            plot_logreg_snapshot(state["logreg_out"], t=int(t_lr.value), ax=axes[0], dims=dims, title=True)

            # Right: generative snapshot (uses params history, so ellipses move with t_em)
            plot_gmm_snapshot(state["gmm_out"], t=int(t_em.value), ax=axes[1], dims=dims, title=True, show_samples=True)

            if share_limits:
                X1 = state["logreg_out"]["X1"][:, list(dims)]
                X2 = state["logreg_out"]["X2"][:, list(dims)]
                Xall = np.vstack([X1, X2])
                xmin, ymin = Xall.min(axis=0)
                xmax, ymax = Xall.max(axis=0)
                dx, dy = xmax - xmin, ymax - ymin
                pad = 0.08
                xmin -= pad * (dx + 1e-12); xmax += pad * (dx + 1e-12)
                ymin -= pad * (dy + 1e-12); ymax += pad * (dy + 1e-12)
                axes[0].set_xlim(xmin, xmax); axes[0].set_ylim(ymin, ymax)
                axes[1].set_xlim(xmin, xmax); axes[1].set_ylim(ymin, ymax)

            plt.tight_layout()
            plt.show()

    # Wire events
    def on_click(_):
        recompute()

    btn.on_click(on_click)

    def on_slider_change(_):
        draw()

    t_lr.observe(on_slider_change, names="value")
    t_em.observe(on_slider_change, names="value")

    # Initial compute
    recompute()

    ui = widgets.VBox([
        widgets.HBox([btn, t_lr, t_em]),
        out
    ])

    return ui


# -----------------------------
# Usage (run this line)
# -----------------------------
ui = build_discriminative_vs_generative_widget()
display(ui)


## Основные типы генеративных моделей. Классификация


Внутри общей постановки генеративного обучения классифицировать модели не “по типу данных” (текст/картинки), а по **тому, как именно задаётся распределение $p_\theta(x)$** и как мы с ним можем работать:

1. можем ли мы вычислять $\log p_\theta(x)$ (точно или приближённо) для данного объекта $x$?
2. как устроено семплирование $x\sim p_\theta$ (один проход, обратимые преобразования, многошаговый стохастический процесс и т.д.)?

---

В частности, часто выделяют три класса генеративных моделей:

- **Явные (explicit) модели плотности** задают $p_\theta(x)$ в явном виде и позволяют (точно или приближённо) вычислять $\log p_\theta(x)$ для произвольного объекта $x$. Сюда относятся, например, **авторегрессионные модели** и **нормализующие потоки**, где плотность выражается через детерминант якобиана обратимого преобразования.

- **Неявные (implicit) генеративные модели** определяются через саму процедуру генерации
  $$
  z\sim p(z),\qquad x=g_\theta(z),
  $$
  где $p(z)$ — простой prior (например, стандартное нормальное), а $g_\theta$ — параметризованное отображение в пространство данных. Плотность $p_\theta(x)$ при этом не выписывается в явном виде, и $\log p_\theta(x)$ недоступен; классический пример — **GAN**.

- **Гибридные (латентные) модели** способны восстанавливать распределение над наблюдаемыми объектами, но в устройстве используют введение **латентных распределений** (то есть это не “просто explicit” и не “просто implicit”). Идея — упростить модель данных через явное выделение уровней/структуры в вероятностной модели; типичные представители — **VAE** и модели, которые можно понимать как их обобщение – диффузионные подходы.

<p align="center">
  <img src="images/lecture_1/gan_diff_vae.png" style="width:60%;">
</p>

### Авторегрессионные генеративные модели (Autoregressive)

Авторегрессионные (AR) модели задают **явную** плотность на сложных объектах $x$ (последовательностях, изображениях как сетке пикселей, токенах и т.п.) через факторизацию по правилу цепочки. Если $x=(x_1,\dots,x_T)$, то

$$
p_\theta(x)=p_\theta(x_1,\dots,x_T)=\prod_{t=1}^{T} p_\theta(x_t\mid x_{<t}),
\qquad x_{<t}=(x_1,\dots,x_{t-1}).
$$

Эта формула — математическое ядро всего подхода: мы раскладываем вероятность (плотность) цепочки на произведение более “простых” условных распределений. В качестве параметризации условных распределений обычно используют нейросеть, которая по контексту $x_{<t}$ выдаёт параметры распределения $p_\theta(x_t\mid x_{<t})$.

Обучение максимально прямолинейно: это **MLE** по наблюдаемым данным, то есть минимизация отрицательного лог-правдоподобия (teacher forcing):
$$
\hat\theta=\arg\max_\theta \sum_{i=1}^n \log p_\theta(x^{(i)})
=\arg\max_\theta \sum_{i=1}^n \sum_{t=1}^{T} \log p_\theta\!\left(x^{(i)}_t \mid x^{(i)}_{<t}\right).
$$
В дискретном случае это эквивалентно сумме кросс-энтропий по позициям. Семплирование, напротив, является **последовательным**: сначала $x_1\sim p_\theta(\cdot)$, затем $x_2\sim p_\theta(\cdot\mid x_1)$ и так далее до $x_T$. Таким образом мы автоматически получаем оценку вероятности объекта


#### Пример использования текстовой генеративной сети

In [None]:
import torch
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML  # ✅ HTML для wrap

from transformers import AutoProcessor, Qwen3VLForConditionalGeneration
from transformers.generation.streamers import BaseStreamer

MODEL_ID = "Qwen/Qwen3-VL-2B-Instruct"

# --- load once ---
if "processor" not in globals():
    processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)

if "model" not in globals():
    model = Qwen3VLForConditionalGeneration.from_pretrained(
        MODEL_ID,
        dtype="auto",
        device_map="auto" if torch.cuda.is_available() else None,
        trust_remote_code=True,
    ).eval()
    if not torch.cuda.is_available():
        model = model.to("cpu")


def _html_escape(s: str) -> str:
    return (s.replace("&", "&amp;")
             .replace("<", "&lt;")
             .replace(">", "&gt;")
             .replace('"', "&quot;")
             .replace("'", "&#39;"))


def _render_wrapped(prefix_k: int, max_k: int, text: str):
    """
    Печатает текст в HTML-блоке с переносами строк и переносом длинных токенов.
    """
    safe = _html_escape(text)
    display(HTML(
        f"""
        <div style="
            width: 100%;
            max-width: 100%;
            box-sizing: border-box;
            padding: 10px;
            border: 1px solid #ddd;
            border-radius: 8px;
            font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, 'Liberation Mono', 'Courier New', monospace;
            font-size: 13px;
            line-height: 1.35;
            white-space: pre-wrap;          /* ✅ сохраняем переводы строк + переносим */
            word-wrap: break-word;          /* ✅ старый перенос */
            overflow-wrap: anywhere;        /* ✅ перенос даже очень длинных “слов/токенов” */
            word-break: break-word;
        ">
<b>Generated tokens prefix:</b> {prefix_k}/{max_k}

{safe}
        </div>
        """
    ))


class PrefixByTokenStreamer(BaseStreamer):
    def __init__(self, tokenizer, prompt_len: int, max_new_tokens: int,
                 skip_prompt: bool = True, skip_special_tokens: bool = True):
        self.tokenizer = tokenizer
        self.prompt_len = int(prompt_len)
        self.max_new_tokens = int(max_new_tokens)
        self.skip_prompt = bool(skip_prompt)
        self.skip_special_tokens = bool(skip_special_tokens)

        self._remaining_prompt = self.prompt_len if self.skip_prompt else 0
        self.generated_ids = []

        clear_output(wait=True)
        _render_wrapped(0, self.max_new_tokens, "")

    def put(self, value):
        if isinstance(value, torch.Tensor):
            token_ids = value[0].tolist() if value.dim() == 2 else value.tolist()
        else:
            token_ids = list(value)

        if self._remaining_prompt > 0:
            if len(token_ids) <= self._remaining_prompt:
                self._remaining_prompt -= len(token_ids)
                return
            token_ids = token_ids[self._remaining_prompt:]
            self._remaining_prompt = 0

        for tid in token_ids:
            self.generated_ids.append(int(tid))
            k = len(self.generated_ids)

            text_k = self.tokenizer.decode(
                self.generated_ids,
                skip_special_tokens=self.skip_special_tokens,
                clean_up_tokenization_spaces=False,
            ).strip()

            clear_output(wait=True)
            _render_wrapped(k, self.max_new_tokens, text_k)

    def end(self):
        pass


def _build_messages_text_only(user_text: str):
    return [{"role": "user", "content": [{"type": "text", "text": user_text.strip()}]}]


@torch.inference_mode()
def stream_generate(user_text: str, max_new_tokens: int, temperature: float = 0.7, top_p: float = 0.9):
    messages = _build_messages_text_only(user_text)

    inputs = processor.apply_chat_template(
        messages,
        tokenize=True,
        add_generation_prompt=True,
        return_dict=True,
        return_tensors="pt",
    )
    inputs.pop("token_type_ids", None)
    inputs = inputs.to(model.device)

    prompt_len = int(inputs["input_ids"].shape[-1])
    streamer = PrefixByTokenStreamer(
        tokenizer=processor.tokenizer,
        prompt_len=prompt_len,
        max_new_tokens=max_new_tokens,
        skip_prompt=True,
        skip_special_tokens=True,
    )

    do_sample = temperature is not None and float(temperature) > 0.0

    _ = model.generate(
        **inputs,
        max_new_tokens=int(max_new_tokens),
        do_sample=do_sample,
        temperature=float(temperature),
        top_p=float(top_p),
        streamer=streamer,
        eos_token_id=processor.tokenizer.eos_token_id,
    )


# --- Notebook UI ---
prompt_box = widgets.Textarea(
    value="Кратко (5–7 предложений): чем диффузия концептуально отличается от GAN?",
    description="Prompt:",
    layout=widgets.Layout(width="100%", height="60px"),
)

temp_slider = widgets.FloatSlider(
    value=0.7, min=0.0, max=1.5, step=0.05,
    description="temp:", readout_format=".2f",
    layout=widgets.Layout(width="60%")
)

top_p_slider = widgets.FloatSlider(
    value=0.9, min=0.1, max=1.0, step=0.01,
    description="top_p:", readout_format=".2f",
    layout=widgets.Layout(width="60%")
)

max_tok_slider = widgets.IntSlider(
    value=64, min=16, max=2048, step=1,
    description="max_tok:",
    layout=widgets.Layout(width="60%")
)

btn = widgets.Button(description="Stream", button_style="primary")
out = widgets.Output()

def _on_click(_):
    with out:
        out.clear_output()
        text = prompt_box.value.strip()
        if not text:
            display(HTML("<b>Введите запрос.</b>"))
            return
        stream_generate(
            text,
            max_new_tokens=int(max_tok_slider.value),
            temperature=float(temp_slider.value),
            top_p=float(top_p_slider.value),
        )

btn.on_click(_on_click)

display(prompt_box, btn, temp_slider, top_p_slider, max_tok_slider, out)

#### GAN (Generative Adversarial Networks)

GAN — типичный представитель неявных-генеративных моделей: распределение $p_\theta(x)$ задаётся **неявно** через процедуру генерации. Фиксируем prior $z\sim p(z)$ (обычно $p(z)=\mathcal N(0,I)$) и генератор $G_\theta:\mathcal Z\to\mathcal X$,
$$
x = G_\theta(z),
$$
что индуцирует распределение $p_\theta$ как отображения меры $p(z)$ в носитель $\mathcal{X}$. При этом явная плотность $\log p_\theta(x)$, как правило, недоступна, так как от самой сети $G$ мы не требуем обратимости. Поэтому обучение не сводится к прямой максимизации правдоподобия.

Обучение формулируется как состязательная оптимизация с дискриминатором $D_\psi(x)\in(0,1)$, отличающим реальные образцы $x\sim p^*(x)$ от синтетических $x\sim p_\theta(x)$:
$$
\min_\theta \max_\psi\ 
\mathbb{E}_{x\sim p^*}\big[\log D_\psi(x)\big]
+
\mathbb{E}_{z\sim p(z)}\big[\log\big(1-D_\psi(G_\theta(z))\big)\big].
$$
критерий индуцирует минимизацию дивергенции между $p_\theta$ и $p^*$ (в частности, связанной с Jensen–Shannon). Семплирование в GAN одношаговое (один прогон $G_\theta$), что делает инференс быстрым и часто даёт высокое перцептуальное качество. Основные ограничения — нестабильность обучения и риск *mode collapse* (когда модель хорошо покрывает малую область "существования" реальных объектов, из-за чего деряется разнообразие генерируемых объектов).


In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display
from PIL import Image

from pytorch_pretrained_biggan import BigGAN, one_hot_from_int, truncated_noise_sample

# ---- Load model (once) ----
MODEL_NAME = "biggan-deep-256"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if "biggan" not in globals():
    biggan = BigGAN.from_pretrained(MODEL_NAME).to(device).eval()

def _tensor_to_pil(img_t: torch.Tensor) -> Image.Image:
    x = (img_t.clamp(-1, 1) + 1.0) / 2.0
    x = (x * 255.0).to(torch.uint8).permute(1, 2, 0).cpu().numpy()
    return Image.fromarray(x)

def _make_grid(images, cols=4):  # ✅ fixed cols=4
    n = len(images)
    cols = 4
    rows = int(np.ceil(n / cols))
    fig = plt.figure(figsize=(3.2 * cols, 3.2 * rows))
    for i, im in enumerate(images, 1):
        ax = fig.add_subplot(rows, cols, i)
        ax.imshow(im)
        ax.axis("off")
    plt.tight_layout()
    plt.show()

@torch.inference_mode()
def generate_biggan(class_id: int, n: int, truncation: float, seed: int):
    if seed < 0:
        seed = np.random.randint(65000)
    noise = truncated_noise_sample(truncation=truncation, batch_size=n, seed=seed)
    noise = torch.from_numpy(noise).to(device)

    class_vec = one_hot_from_int(int(class_id), batch_size=n)
    class_vec = torch.from_numpy(class_vec).to(device)

    out = biggan(noise, class_vec, truncation)
    images = [_tensor_to_pil(out[i]) for i in range(out.shape[0])]
    return images

# ---- Widgets UI ----
class_id_slider = widgets.IntSlider(
    value=207, min=0, max=999, step=1,
    description="class_id:",
    layout=widgets.Layout(width="520px")
)

n_slider = widgets.IntSlider(
    value=4, min=1, max=16, step=1,
    description="n:",
    layout=widgets.Layout(width="520px")
)

trunc_slider = widgets.FloatSlider(
    value=0.4, min=0.1, max=1.0, step=0.05,
    description="trunc:",
    readout_format=".2f",
    layout=widgets.Layout(width="520px")
)

seed_box = widgets.IntText(
    value=-1,
    description="seed:",
    layout=widgets.Layout(width="240px")
)

btn_gen = widgets.Button(description="Generate (BigGAN)", button_style="primary")
btn_rand = widgets.Button(description="Random class", button_style="info")
out = widgets.Output()

def _on_rand(_):
    class_id_slider.value = int(np.random.randint(0, 1000))

def _on_gen(_):
    with out:
        out.clear_output()
        images = generate_biggan(
            class_id=int(class_id_slider.value),
            n=int(n_slider.value),
            truncation=float(trunc_slider.value),
            seed=int(seed_box.value),
        )
        _make_grid(images, cols=4)  # ✅ always 4

btn_rand.on_click(_on_rand)
btn_gen.on_click(_on_gen)

display(widgets.VBox([
    widgets.HTML("<b>BigGAN (class-conditional GAN on ImageNet)</b><br>Выбираем класс по индексу 0..999"),
    widgets.HBox([class_id_slider]),
    widgets.HBox([seed_box, btn_rand]),
    n_slider,
    trunc_slider,
    btn_gen,
    out
]))


### VAE (Variational Autoencoder)

VAE — канонический пример *латентной* генеративной модели: вводится скрытая переменная $z\in\mathcal Z$ и задаётся совместное распределение

$$
p_\theta(x,z)=p_\theta(x\mid z)\,p(z), \quad\quad p_\theta(x)=\int p_\theta(x\mid z)\,p(z)\,dz
$$

При этом $p(z)$ берут как некий простой prior (обычно $p(z)=\mathcal N(0,I)$). Обучать и использовать условных генератор/декодер $p_\theta(x\mid z)$ оказывается намного проще и стабильнее. Однако у нас появляется потребность в приближении еще одного распределения $q_\phi(z\mid x)$ – будем приближать второй нейросетью (энкодером). Тогда при обучении оптимизируют нижнюю вариационную оценку лог-правдоподобия (ELBO):

$$
\log p_\theta(x)\ge 
\mathcal L_{\mathrm{ELBO}}(x;\theta,\phi)
=
\mathbb E_{z\sim q_\phi(z\mid x)}\big[\log p_\theta(x\mid z)\big]
-
\mathrm{KL}\!\big(q_\phi(z\mid x)\,\|\,p(z)\big) \longrightarrow \max
$$

Семплирование после обучения тривиально: сначала $z\sim p(z)$, затем $x\sim p_\theta(x\mid z)$; для контролируемой генерации аналогично используют условные варианты $p_\theta(x\mid z,y)$ и $q_\phi(z\mid x,y)$.

<p align="center">
  <img src="images/lecture_1/vae.png" style="width:60%;">
</p>



In [None]:
import os
import sys
import torch
import torch.nn as nn
import torchaudio
import soundfile as sf
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, Audio, clear_output
from huggingface_hub import snapshot_download
from datasets import load_dataset, Audio as HFAudio
from itertools import islice

# ----------------------------
# LibriSpeech streaming settings
# ----------------------------
LIBRISPEECH_SPLIT = "validation.clean"
LIBRISPEECH_SKIP_MAX = 2000  # больше -> случайнее, но потенциально медленнее

# ----------------------------
# 0) Download model repo from Hugging Face (code + weights)
# ----------------------------
MODEL_REPO = "earlab/EAR_VAE"
LOCAL_DIR = os.path.join(os.getcwd(), "hf_ear_vae")  # local cache folder

repo_path = snapshot_download(
    repo_id=MODEL_REPO,
    local_dir=LOCAL_DIR,
    local_dir_use_symlinks=False,
    revision="main",
)

if repo_path not in sys.path:
    sys.path.insert(0, repo_path)

from model.ear_vae import EAR_VAE  # provided by the repo

CONFIG_PATH = os.path.join(repo_path, "config", "ear_vae_v2.json")
WEIGHT_PATH = os.path.join(repo_path, "pretrained_weight", "ear_vae_v2_48k.pyt")

TARGET_SR = 48000
DOWNSAMPLE_RATIO = 960  # aligns T -> T/960 latents

with open(CONFIG_PATH, "r") as f:
    cfg = json.load(f)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def _unwrap_state_dict(ckpt):
    """Return a pure state_dict name->tensor."""
    if not isinstance(ckpt, dict):
        return ckpt
    for key in ["state_dict", "model", "net", "ema", "generator"]:
        if key in ckpt and isinstance(ckpt[key], dict):
            return ckpt[key]
    return ckpt

# ---- Load model once (robust to optional bottleneck transformer mismatch) ----
if "ear_vae" not in globals():
    ear_vae = EAR_VAE(model_config=cfg).to(device)

    ckpt = torch.load(WEIGHT_PATH, map_location="cpu")
    sd = _unwrap_state_dict(ckpt)
    sd = {k.replace("module.", ""): v for k, v in sd.items()}

    has_transformer_weights = any(k.startswith("transformers.") for k in sd.keys())
    if (not has_transformer_weights) and hasattr(ear_vae, "transformers"):
        # checkpoint was trained without transformer bottleneck => disable it
        ear_vae.transformers = nn.Identity()

    incompat = ear_vae.load_state_dict(sd, strict=False)
    if len(incompat.missing_keys) or len(incompat.unexpected_keys):
        print("WARNING: state_dict incompatibilities detected.")
        print("Missing keys (first 20):", incompat.missing_keys[:20])
        print("Unexpected keys (first 20):", incompat.unexpected_keys[:20])

    ear_vae.eval()

# ----------------------------
# 1) Utilities: load / preprocess audio
# ----------------------------
def _load_audio_from_upload(upload_widget):
    """Returns waveform tensor (C,T) float32 and sample rate."""
    if not upload_widget.value:
        return None, None
    item = next(iter(upload_widget.value.values()))
    raw = item["content"]
    bio = io.BytesIO(raw)
    audio_np, sr = sf.read(bio, dtype="float32", always_2d=True)  # (T,C)
    wav = torch.from_numpy(audio_np).T.contiguous()               # (C,T)
    return wav, int(sr)

def _to_stereo(wav_ct):
    if wav_ct.ndim != 2:
        raise ValueError("Expected waveform shape (C,T).")
    if wav_ct.shape[0] == 1:
        return torch.cat([wav_ct, wav_ct], dim=0)
    return wav_ct[:2]

def _resample_if_needed(wav_ct, sr, target_sr=TARGET_SR):
    if sr == target_sr:
        return wav_ct
    resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=target_sr)
    return resampler(wav_ct)

def _crop_or_pad_to_seconds_and_multiple(wav_ct, sr, seconds, multiple=DOWNSAMPLE_RATIO):
    T_target = int(sr * float(seconds))
    if wav_ct.shape[-1] >= T_target:
        wav_ct = wav_ct[..., :T_target]
    else:
        wav_ct = torch.nn.functional.pad(wav_ct, (0, T_target - wav_ct.shape[-1]))

    T = wav_ct.shape[-1]
    Tm = (T // multiple) * multiple
    if Tm < multiple:
        Tm = multiple
    if Tm != T:
        wav_ct = wav_ct[..., :Tm] if T >= Tm else torch.nn.functional.pad(wav_ct, (0, Tm - T))
    return wav_ct

def _peak_normalize(wav_ct, eps=1e-8):
    peak = float(wav_ct.abs().max())
    if peak < eps:
        return wav_ct
    return (wav_ct / peak).clamp(-1, 1)

def _random_fragment(wav_ct, sr, seconds):
    """Take a random contiguous fragment of length seconds; pad if needed."""
    T_need = int(sr * float(seconds))
    T = wav_ct.shape[-1]
    if T >= T_need:
        start = np.random.randint(0, T - T_need + 1) if (T - T_need) > 0 else 0
        return wav_ct[..., start:start + T_need]
    return torch.nn.functional.pad(wav_ct, (0, T_need - T))

# ----------------------------
# 2) Random LibriSpeech fallback (streaming, decode via bytes)
# ----------------------------
if "ls_stream" not in globals():
    ls_stream = load_dataset("openslr/librispeech_asr", split=LIBRISPEECH_SPLIT, streaming=True)
    ls_stream = ls_stream.cast_column("audio", HFAudio(decode=False))

def _read_audio_meta(audio_meta):
    """
    Extract wav (C,T) float32 and sr from audio_meta.
    Prefers audio_meta['bytes'] for streaming. Falls back to existing path only.
    """
    if not isinstance(audio_meta, dict):
        raise RuntimeError(f"audio_meta has unexpected type: {type(audio_meta)}")

    # decode=True case
    if "array" in audio_meta and "sampling_rate" in audio_meta and audio_meta["array"] is not None:
        arr = audio_meta["array"]
        sr = int(audio_meta["sampling_rate"])
        if arr.ndim == 1:
            arr = arr[:, None]
        wav = torch.from_numpy(arr.astype("float32")).T.contiguous()  # (C,T)
        return wav, sr

    # streaming decode=False: bytes is the reliable route
    b = audio_meta.get("bytes", None)
    if b is not None:
        audio_np, sr = sf.read(io.BytesIO(b), dtype="float32", always_2d=True)  # (T,C)
        wav = torch.from_numpy(audio_np).T.contiguous()
        return wav, int(sr)

    # path fallback only if it exists (often it's just basename and does NOT exist)
    path = audio_meta.get("path", None)
    if path is not None and os.path.exists(path):
        audio_np, sr = sf.read(path, dtype="float32", always_2d=True)
        wav = torch.from_numpy(audio_np).T.contiguous()
        return wav, int(sr)

    raise RuntimeError(f"Cannot decode audio_meta. keys={list(audio_meta.keys())}")

def _get_random_librispeech_fragment(seconds):
    """
    Returns stereo waveform (C,T) float32 and sr.
    Each call: skip random K items in streaming iterator, then take next sample.
    """
    k = int(np.random.randint(0, LIBRISPEECH_SKIP_MAX + 1))
    it = iter(ls_stream)
    sample = next(islice(it, k, None))
    wav, sr = _read_audio_meta(sample["audio"])
    wav = _to_stereo(wav)
    wav = _random_fragment(wav, sr, seconds=float(seconds))
    return wav, sr

# ----------------------------
# 3) Encode -> latent -> decode (mu/sigma and optional sampling)
# ----------------------------
@torch.inference_mode()
def encode_latent_and_reconstruct(wav_ct, use_sample=True):
    """
    wav_ct: (C,T) float32 on CPU
    Returns: mu, sigma, z, recon, kl_est (all CPU)
    """
    wav = wav_ct.to(device, torch.float32).unsqueeze(0)  # (1,C,T)

    status = ear_vae.encoder(wav)                         # (1, 2*Cz, Tz)
    mu, scale = status.chunk(2, dim=1)                    # each (1,Cz,Tz)
    sigma = torch.nn.functional.softplus(scale)           # stddev

    if use_sample:
        eps = torch.randn_like(mu)                        # truly random
        z = mu + sigma * eps
    else:
        z = mu

    recon = ear_vae.decode(z)                             # (1,C,T)

    var = sigma * sigma + 1e-6
    logvar = torch.log(var)
    kl = (mu * mu + var - logvar - 1).sum(1).mean().item()

    return (
        mu.squeeze(0).detach().cpu(),
        sigma.squeeze(0).detach().cpu(),
        z.squeeze(0).detach().cpu(),
        recon.squeeze(0).detach().cpu(),
        float(kl),
    )

@torch.inference_mode()
def sample_from_prior(seconds=2.5):
    seconds = float(seconds)
    T = int(TARGET_SR * seconds)
    T = max(DOWNSAMPLE_RATIO, (T // DOWNSAMPLE_RATIO) * DOWNSAMPLE_RATIO)
    Tz = T // DOWNSAMPLE_RATIO
    Cz = int(cfg["decoder"]["config"]["latent_dim"])  # typically 64
    z = torch.randn(1, Cz, Tz, device=device, dtype=torch.float32)  # truly random
    y = ear_vae.decode(z).squeeze(0).detach().cpu()
    return y

# ----------------------------
# 4) Visualization
# ----------------------------
def _plot_waveforms(orig, recon, sr=TARGET_SR, title="Waveforms"):
    T = orig.shape[-1]
    max_plot = min(T, sr * 3)
    t = np.arange(max_plot) / sr

    fig = plt.figure(figsize=(12, 4))
    ax = fig.add_subplot(1, 1, 1)
    ax.plot(t, orig[0, :max_plot].numpy(), linewidth=1.0, label="orig L")
    ax.plot(t, recon[0, :max_plot].numpy(), linewidth=1.0, label="recon L", alpha=0.8)
    ax.set_title(title)
    ax.set_xlabel("time (s)")
    ax.set_ylabel("amplitude")
    ax.grid(True, linewidth=0.3)
    ax.legend(loc="upper right")
    plt.tight_layout()
    plt.show()

def _plot_latent_heatmap(z, title="Latent z heatmap (channels × time)"):
    Cz, Tz = z.shape
    c_show = min(Cz, 64)
    t_show = min(Tz, 256)
    zz = z[:c_show, :t_show].numpy()

    fig = plt.figure(figsize=(12, 4))
    ax = fig.add_subplot(1, 1, 1)
    im = ax.imshow(zz, aspect="auto", origin="lower")
    ax.set_title(title)
    ax.set_xlabel("latent time index")
    ax.set_ylabel("latent channel")
    plt.colorbar(im, ax=ax, fraction=0.025, pad=0.02)
    plt.tight_layout()
    plt.show()

def _plot_mu_sigma_stats(mu, sigma):
    fig = plt.figure(figsize=(12, 4))
    ax1 = fig.add_subplot(1, 2, 1)
    ax1.hist(mu.flatten().numpy(), bins=60)
    ax1.set_title("mu histogram")
    ax1.grid(True, linewidth=0.3)

    ax2 = fig.add_subplot(1, 2, 2)
    ax2.hist(sigma.flatten().numpy(), bins=60)
    ax2.set_title("sigma histogram")
    ax2.grid(True, linewidth=0.3)

    plt.tight_layout()
    plt.show()

# ----------------------------
# 5) ipywidgets UI
# ----------------------------
upload = widgets.FileUpload(accept=".wav,.flac,.ogg", multiple=False)

seconds_slider = widgets.FloatSlider(
    value=2.5, min=0.5, max=10.0, step=0.5,
    description="seconds:",
    layout=widgets.Layout(width="60%")
)

latent_mode = widgets.ToggleButtons(
    options=[("mean (deterministic)", "mean"), ("sample (stochastic)", "sample")],
    description="latent:",
)

btn_recon = widgets.Button(description="Encode → Latent → Reconstruct", button_style="primary")
btn_prior = widgets.Button(description="Sample z ~ N(0,I) → Decode", button_style="warning")

out = widgets.Output()

def _run_recon(_):
    with out:
        out.clear_output()

        wav, sr = _load_audio_from_upload(upload)
        source = "user upload"
        if wav is None:
            wav, sr = _get_random_librispeech_fragment(seconds=float(seconds_slider.value))
            source = f"LibriSpeech random ({LIBRISPEECH_SPLIT})"

        wav = _to_stereo(wav)
        wav = _resample_if_needed(wav, sr, TARGET_SR)
        wav = _crop_or_pad_to_seconds_and_multiple(wav, TARGET_SR, seconds=float(seconds_slider.value))
        wav = _peak_normalize(wav)

        use_sample = (latent_mode.value == "sample")
        mu, sigma, z, recon, kl = encode_latent_and_reconstruct(wav, use_sample=use_sample)
        recon = _peak_normalize(recon)

        clear_output(wait=True)
        print(f"Source: {source}")
        print(f"Input: (C,T)={tuple(wav.shape)}, SR={TARGET_SR}")
        print(f"Latent: mu/sigma/z shapes = {tuple(mu.shape)} ; KL estimate ≈ {kl:.3f}\n")

        display(widgets.HTML("<b>Original audio</b>"))
        display(Audio(wav.numpy(), rate=TARGET_SR))

        display(widgets.HTML("<b>Reconstructed audio</b>"))
        display(Audio(recon.numpy(), rate=TARGET_SR))

        _plot_waveforms(wav, recon, sr=TARGET_SR, title="Original vs Reconstruction (Left channel)")
        _plot_mu_sigma_stats(mu, sigma)
        _plot_latent_heatmap(z)

def _run_prior(_):
    with out:
        out.clear_output()
        seconds = float(seconds_slider.value)
        y = sample_from_prior(seconds=seconds)
        y = _to_stereo(y)
        y = _peak_normalize(y)

        clear_output(wait=True)
        print(f"Prior sample: (C,T)={tuple(y.shape)}, SR={TARGET_SR}, seconds≈{seconds}\n")

        display(widgets.HTML("<b>Generated audio from prior (z ~ N(0,I))</b>"))
        display(Audio(y.numpy(), rate=TARGET_SR))

        fig = plt.figure(figsize=(12, 3))
        ax = fig.add_subplot(1, 1, 1)
        T = min(y.shape[-1], TARGET_SR * 3)
        t = np.arange(T) / TARGET_SR
        ax.plot(t, y[0, :T].numpy(), linewidth=1.0)
        ax.set_title("Prior-sampled waveform (Left channel)")
        ax.set_xlabel("time (s)")
        ax.grid(True, linewidth=0.3)
        plt.tight_layout()
        plt.show()

btn_recon.on_click(_run_recon)
btn_prior.on_click(_run_prior)

display(
    widgets.VBox([
        widgets.HTML("<b>VAE demo on audio (earlab/EAR_VAE): upload or random LibriSpeech → latent → reconstruction</b>"),
        upload,
        seconds_slider,
        latent_mode,
        widgets.HBox([btn_recon, btn_prior]),
        out
    ])
)


#### Диффузионные модели (Diffusion)



Диффузионные модели представляют генерацию как результат **обращения последовательного зашумления данных**. В отличие от GAN, где генерация происходит за один шаг, и в отличие от VAE, где генерация — это декодирование одной латентной переменной, диффузионные модели строят объект **итеративно**, постепенно восстанавливая структуру данных из шума.

Концептуально вводится прямой процесс, который шаг за шагом разрушает структуру исходных данных $x_0$, превращая их в простой шумовой распределённый объект $x_T$ (обычно близкий к $\mathcal N(0,I)$). Этот процесс фиксирован и не обучается. Генерация же соответствует обратному процессу: начиная с шума, модель последовательно строит всё более структурированные состояния, приближаясь к распределению данных. Таким образом, генеративная модель реализует цепочку переходов
$$
x_T \rightarrow x_{T-1} \rightarrow \dots \rightarrow x_0,
$$
где каждый шаг аппроксимируется нейросетью.

С точки зрения обучения, диффузионные модели относятся к *likelihood-based* генеративным моделям: они оптимизируют вариационную нижнюю оценку на $\log p_\theta(x)$, но делают это через локальные задачи восстановления структуры на каждом шаге цепочки. В результате обучение оказывается **стабильным**, без состязательной динамики, характерной для GAN, и с хорошим покрытием мод распределения данных.

Ключевым практическим свойством диффузионных моделей является высокое качество и разнообразие генерации при цене **многократного итеративного инференса**. Современные архитектуры смягчают этот недостаток за счёт уменьшения числа шагов или переноса диффузии в латентное пространство (Latent Diffusion), где итеративный процесс работает над компактным представлением объекта, а декодер восстанавливает данные в исходном пространстве.

In [None]:
import sys, subprocess, importlib, os
import torch
import ipywidgets as widgets
from IPython.display import display, clear_output
from PIL import Image

from diffusers import StableDiffusionXLPipeline, DEISMultistepScheduler

# ----------------------------
# Model / device setup
# ----------------------------
MODEL_ID = "stabilityai/stable-diffusion-xl-base-1.0"

use_cuda = torch.cuda.is_available()
device = "cuda" if use_cuda else "cpu"
dtype = torch.float16 if use_cuda else torch.float32

# Optional: HF token (needed if model access is gated in your account)
# Either set env var HF_TOKEN in your notebook environment, or leave None.
HF_TOKEN = os.environ.get("HF_TOKEN", None)

# Load pipeline once
if "sdxl_pipe" not in globals():
    sdxl_pipe = StableDiffusionXLPipeline.from_pretrained(
        MODEL_ID,
        torch_dtype=dtype,
        use_safetensors=True,
        variant="fp16" if use_cuda else None,
        token=HF_TOKEN,                  # safe if None
    )
    # Set DEIS scheduler
    sdxl_pipe.scheduler = DEISMultistepScheduler.from_config(sdxl_pipe.scheduler.config)

    if use_cuda:
        sdxl_pipe = sdxl_pipe.to("cuda")
        # Speed/memory tweaks (safe defaults)
        try:
            sdxl_pipe.enable_attention_slicing()
        except Exception:
            pass
    else:
        sdxl_pipe = sdxl_pipe.to("cpu")

    # For reproducibility you could set a generator, but user didn't ask; keep stochastic.
    sdxl_pipe.set_progress_bar_config(disable=True)

def _show_images(imgs, cols=2, max_w=512):
    """Simple grid display without matplotlib (PIL + notebook)."""
    if not imgs:
        return
    cols = max(1, int(cols))
    rows = (len(imgs) + cols - 1) // cols

    # Resize for consistent display
    resized = []
    for im in imgs:
        if isinstance(im, Image.Image):
            w, h = im.size
            if w > max_w:
                nh = int(h * (max_w / w))
                im = im.resize((max_w, nh))
            resized.append(im)
        else:
            resized.append(im)

    # Create a grid canvas
    widths = [im.size[0] for im in resized]
    heights = [im.size[1] for im in resized]
    cell_w = max(widths)
    cell_h = max(heights)

    grid = Image.new("RGB", (cell_w * cols, cell_h * rows), (255, 255, 255))
    for idx, im in enumerate(resized):
        r = idx // cols
        c = idx % cols
        x0, y0 = c * cell_w, r * cell_h
        # center in cell
        dx = (cell_w - im.size[0]) // 2
        dy = (cell_h - im.size[1]) // 2
        grid.paste(im, (x0 + dx, y0 + dy))
    display(grid)

@torch.inference_mode()
def run_sdxl_deis(prompt: str, nfe: int, guidance: float, h: int, w: int, num_images: int):
    prompt = prompt.strip()
    nfe = int(nfe)

    # SDXL constraints: width/height should be multiples of 8, typically 1024 for best results
    h = int(h) - (int(h) % 8)
    w = int(w) - (int(w) % 8)

    out = sdxl_pipe(
        prompt=prompt,
        num_inference_steps=nfe,     # NFE
        guidance_scale=float(guidance),
        height=h,
        width=w,
        num_images_per_prompt=int(num_images),
    )
    return out.images

# ----------------------------
# Widgets UI
# ----------------------------
prompt_box = widgets.Textarea(
    value="A high-detail studio photo of a small robotic arm assembling a tiny circuit board, realistic lighting, shallow depth of field",
    description="Prompt:",
    layout=widgets.Layout(width="100%", height="120px"),
)

nfe_slider = widgets.IntSlider(
    value=25, min=5, max=80, step=1,
    description="NFE:",
    layout=widgets.Layout(width="70%"),
)

guidance_slider = widgets.FloatSlider(
    value=6.0, min=0.0, max=12.0, step=0.5,
    description="CFG:",
    readout_format=".1f",
    layout=widgets.Layout(width="70%"),
)

size_dropdown = widgets.Dropdown(
    options=[
        ("1024×1024 (default SDXL)", (1024, 1024)),
        ("1152×896", (1152, 896)),
        ("896×1152", (896, 1152)),
        ("832×1216", (832, 1216)),
        ("1216×832", (1216, 832)),
    ],
    value=(1024, 1024),
    description="Size:",
    layout=widgets.Layout(width="70%"),
)

num_images_slider = widgets.IntSlider(
    value=1, min=1, max=4, step=1,
    description="#img:",
    layout=widgets.Layout(width="70%"),
)

btn = widgets.Button(description="Generate (SDXL + DEIS)", button_style="primary")
out = widgets.Output()

def _on_click(_):
    with out:
        out.clear_output()
        h, w = size_dropdown.value
        images = run_sdxl_deis(
            prompt=prompt_box.value,
            nfe=int(nfe_slider.value),
            guidance=float(guidance_slider.value),
            h=h, w=w,
            num_images=int(num_images_slider.value),
        )
        _show_images(images, cols=2)

btn.on_click(_on_click)

display(widgets.VBox([
    widgets.HTML("<b>SDXL generation with DEIS scheduler</b>"),
    prompt_box,
    nfe_slider,
    guidance_slider,
    size_dropdown,
    num_images_slider,
    btn,
    out
]))


## Этические аспекты генеративного ИИ

Этические и социальные эффекты генеративных моделей нельзя корректно анализировать в отрыве от их математической постановки. Генеративный ИИ не является системой принятия нормативных решений; он представляет собой механизм приближения эмпирического распределения данных \(p^*(x)\) некоторым параметрическим распределением \(p_\theta(x)\), обучаемым по принципу максимизации правдоподобия или его вариационных/неявных аналогов.

Следовательно, любые эффекты, возникающие при применении таких моделей, в первую очередь обусловлены **свойствами обучающего распределения, способом его аппроксимации и режимом использования сэмплов**, а не «намерениями» или «ошибками» модели как таковой.

---

### Генеративная модель как статистический репликатор данных

В пределе оптимизации генеративная модель стремится воспроизвести структуру распределения \(p^*(x)\) в KL-смысле:
\[
p_\theta \approx p^*.
\]
Это утверждение принципиально: модель воспроизводит *частоты*, *корреляции* и *модальности* данных ровно в той мере, в какой они представлены в выборке.

Отсюда следует фундаментальный вывод: **генеративная модель не “исправляет” данные и не “очищает” их от социальных или семантических перекосов**, а напротив — корректно их аппроксимирует. Любые систематические смещения (bias), присутствующие в данных, являются статистически значимой частью \(p^*(x)\) и, следовательно, переносятся в \(p_\theta(x)\).

В этом смысле генеративный ИИ является *усилителем* существующих структур данных, а не их интерпретатором.

---

### Bias и fairness как внешние ограничения, а не свойства модели

Понятия bias и fairness не являются внутренними характеристиками вероятностной модели. Они определяются **вне** вероятностной постановки — через социальные, правовые или прикладные требования к допустимым результатам генерации.

Если модель обучена на данных с неравномерным представлением групп, ролей или атрибутов, то воспроизведение этого неравенства является статистически корректным результатом обучения. С точки зрения MLE или ELBO здесь нет ошибки оптимизации.

Таким образом, требования fairness могут быть реализованы только как:
- модификация обучающего распределения (reweighting, filtering);
- дополнительные ограничения на модель или пространство генерации;
- пост-обработка сэмплов;
- или контекстно-зависимые правила использования модели.

Важно подчеркнуть: **fairness — это не цель генеративного обучения**, а дополнительное нормативное ограничение, которое должно быть задано явно и не выводится из вероятностной модели автоматически.

---

### Правдоподобие против истинности: природа галлюцинаций

Генеративная модель оптимизирует правдоподобие, а не истинность. Это означает, что сэмпл \(x\sim p_\theta(x)\) считается “качественным”, если он:
- статистически типичен;
- согласован с контекстом;
- лежит в области высокой плотности распределения.

Проверка фактической корректности, каузальной согласованности или соответствия внешней реальности в эту постановку не входит. Поэтому явление, называемое *hallucinations*, представляет собой не сбой, а **естественное следствие вероятностной генерации в условиях неполной или противоречивой информации**.

Особенно остро это проявляется в языковых моделях, где пространство допустимых последовательностей чрезвычайно велико, а множество “правдоподобных, но ложных” утверждений имеет ненулевую вероятность. С точки зрения модели, такие утверждения являются допустимыми сэмплами из \(p_\theta(x)\).

Следовательно, борьба с галлюцинациями не может быть сведена к “улучшению архитектуры” и требует либо внешних механизмов верификации, либо жёсткого разделения между генерацией и утверждением фактов.

---

### Масштабирование дезинформации как системный эффект

Классическая дезинформация была ограничена стоимостью производства контента. Генеративные модели снимают это ограничение, позволяя масштабировать генерацию правдоподобных текстов, изображений и видео практически без затрат.

С точки зрения вероятностной модели это означает следующее: если \(p_\theta(x)\) хорошо аппроксимирует распределение “естественного” контента, то многократное семплирование из \(p_\theta\) способно статистически изменить информационную среду, даже если каждый отдельный сэмпл неотличим от реального.

Ключевым фактором здесь является **масштаб**, а не качество единичной генерации. Генеративный ИИ превращает дезинформацию из точечного инструмента в системное явление.

---

### Deepfake и условная генерация идентичностей

Deepfake-технологии являются частным случаем условной генерации:
\[
x \sim p_\theta(x \mid y),
\]
где условие \(y\) кодирует признаки конкретного человека (внешность, голос, манеру речи).

С математической точки зрения deepfake не вводит новых механизмов по сравнению с другими формами conditional generation. Отличие заключается в **семантике условия**: условие соответствует реальной личности, а результат генерации может быть воспринят как документальное свидетельство.

Этический риск возникает не на уровне модели, а на уровне интерпретации и применения: когда сэмплы генеративной модели выходят за рамки художественного или синтетического контекста и начинают функционировать как утверждения о реальных людях и событиях.

---

### Заключительная рамка

Этические проблемы генеративного ИИ не являются аномалиями или побочными эффектами. Они логически следуют из самой идеи вероятностного моделирования данных.

Генеративная модель:
- не различает норму и отклонение;
- не оптимизирует истинность или справедливость;
- не несёт встроенных социальных ограничений.

Поэтому этика в генеративном ИИ должна рассматриваться как **часть дизайна системы в целом** — включая данные, обучение, инференс и сценарии применения, — а не как свойство отдельной нейросетевой архитектуры.
