<a href="https://colab.research.google.com/github/Daniilsol200/KURSOVAI_kovalevski_Kurs3/blob/main/%D0%9A%D1%83%D1%80%D1%81%D0%BE%D0%B2%D0%B0%D1%8F%D0%9D%D0%B5%D0%B9%D1%80%D0%BE%D0%BD%D1%8B%D0%B5%D0%A1%D0%B5%D1%82%D0%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [186]:
import numpy as np, random
np.random.seed(42); random.seed(42)

# =========================
# 1) Подготовка данных
# =========================
text = """
Нейронные — это мощный инструмент, который способен находить закономерности в данных.
Они применяются в обработке языка, изображений, в прогнозировании и многом другом.
В этой небольшой демонстрации мы обучаем простые рекуррентные сети для предсказания следующего символа.
Искусственный интеллект, нейронные сети, машинное обучение — что на самом деле означают все эти нынче популярные понятия?
Для большинства непосвященных людей, коим являюсь и я сам, они всегда казались чем-то фантастическим.
Впервые понятие искусственных нейронных сетей (ИНС) возникло при попытке смоделировать процессы головного мозга.
Сегодня мы стоим на пороге новой эры, когда модели могут генерировать связный текст, решать сложные задачи и даже проходить экзамены лучше среднестатистического студента.
При этом базовые принципы остались те же: сеть получает входные данные, преобразует их через множество слоёв с весами, применяет нелинейные активации и постепенно учится предсказывать правильный ответ, минимизируя ошибку.
В нашей задаче посимвольного предсказания мы видим эти принципы в самом чистом виде: каждая буква, пробел или знак препинания превращается в вектор, подаётся в рекуррентную ячейку, а сеть должна угадать, какой символ идёт дальше.
Казалось бы, примитивно — но даже на таком простом примере прекрасно видно, насколько сильно лстм сети превосходит классические сети Элмана и Джордана по способности запоминать долгосрочные зависимости в последовательности.
""".strip() * 3

text = text[:4000]
chars = sorted(list(set(text)))
vocab_size = len(chars)
char2idx = {ch: i for i, ch in enumerate(chars)}
idx2char = {i: ch for ch, i in char2idx.items()}

data = np.array([char2idx[ch] for ch in text], dtype=np.int32)
X_all, Y_all = data[:-1], data[1:]
split = int(0.9 * len(X_all))
X_train, Y_train = X_all[:split], Y_all[:split]
X_test, Y_test = X_all[split:], Y_all[split:]
print(f"len text={len(text)}, vocab={vocab_size}, train={len(X_train)}, test={len(X_test)}")

len text=4000, vocab=51, train=3599, test=400


In [187]:
# =========================
# 2) Вспомогательные функции
# =========================
def onehot(index, vocab_size):
    v = np.zeros(vocab_size, dtype=np.float32)
    v[index] = 1.0
    return v

def to_onehot_seq(indices, vocab_size):
    return [onehot(int(i), vocab_size) for i in indices]

def softmax(x):
    x = x - np.max(x)
    e = np.exp(x)
    return e / np.sum(e)

def cross_entropy_from_probs(probs, target_idx):
    return -np.log(probs[target_idx] + 1e-12)

def sample_with_temperature(p, T=1.0):
    p = np.asarray(p, dtype=np.float64)
    p = np.log(p + 1e-12) / T
    p = np.exp(p - np.max(p))
    p = p / np.sum(p)
    return np.random.choice(len(p), p=p)

In [188]:
# =========================
# 3) Elman RNN
# =========================
class ElmanRNN:
    def __init__(self, vocab_size, hidden_size=128):
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        limit = np.sqrt(6.0 / (vocab_size + hidden_size))
        self.Wxh = np.random.uniform(-limit, limit, (hidden_size, vocab_size)).astype(np.float32)
        self.Whh = np.random.uniform(-limit, limit, (hidden_size, hidden_size)).astype(np.float32)
        self.Why = np.random.uniform(-limit, limit, (vocab_size, hidden_size)).astype(np.float32)
        self.bh = np.zeros(hidden_size, dtype=np.float32)
        self.by = np.zeros(vocab_size, dtype=np.float32)

    def forward(self, inputs, h0=None):
        T = len(inputs)
        if h0 is None:
            h_prev = np.zeros(self.hidden_size, dtype=np.float32)
        else:
            h_prev = h0.copy()
        hs = np.zeros((T + 1, self.hidden_size), dtype=np.float32)
        hs[0] = h_prev
        ps = np.zeros((T, self.vocab_size), dtype=np.float32)
        for t in range(T):
            x = inputs[t]
            hs[t + 1] = np.tanh(self.Wxh @ x + self.Whh @ hs[t] + self.bh)
            o = self.Why @ hs[t + 1] + self.by
            ps[t] = softmax(o)
        return hs, ps

    def bptt(self, inputs, targets, lr=0.005, clip=5.0, h0=None):
        T = len(inputs)
        hs, ps = self.forward(inputs, h0)
        dWxh = np.zeros_like(self.Wxh)
        dWhh = np.zeros_like(self.Whh)
        dWhy = np.zeros_like(self.Why)
        dbh = np.zeros_like(self.bh)
        dby = np.zeros_like(self.by)
        loss = 0.0
        dh_next = np.zeros(self.hidden_size, dtype=np.float32)

        for t in range(T):
            loss += cross_entropy_from_probs(ps[t], int(targets[t]))

        for t in reversed(range(T)):
            dy = ps[t].copy()
            dy[int(targets[t])] -= 1.0
            dWhy += np.outer(dy, hs[t + 1])
            dby += dy
            dh = self.Why.T @ dy + dh_next
            dh_raw = (1.0 - hs[t + 1] ** 2) * dh
            dbh += dh_raw
            dWxh += np.outer(dh_raw, inputs[t])
            dWhh += np.outer(dh_raw, hs[t])
            dh_next = self.Whh.T @ dh_raw

        for g in [dWxh, dWhh, dWhy, dbh, dby]:
            np.clip(g, -clip, clip, out=g)

        self.Wxh -= lr * dWxh
        self.Whh -= lr * dWhh
        self.Why -= lr * dWhy
        self.bh -= lr * dbh
        self.by -= lr * dby

        return loss / T, hs[-1].copy()

    def predict_next(self, x_onehot, h_prev):
        h = np.tanh(self.Wxh @ x_onehot + self.Whh @ h_prev + self.bh)
        o = self.Why @ h + self.by
        p = softmax(o)
        return p, h

In [189]:
# =========================
# 4) Jordan RNN
# =========================
class JordanRNN:
    def __init__(self, vocab_size, hidden_size=128):
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        limit = np.sqrt(6.0 / (vocab_size + hidden_size))
        self.Wxh = np.random.uniform(-limit, limit, (hidden_size, vocab_size)).astype(np.float32)
        self.Wch = np.random.uniform(-limit, limit, (hidden_size, vocab_size)).astype(np.float32)
        self.Why = np.random.uniform(-limit, limit, (vocab_size, hidden_size)).astype(np.float32)
        self.bh = np.zeros(hidden_size, dtype=np.float32)
        self.by = np.zeros(vocab_size, dtype=np.float32)

    def forward(self, inputs, ctx0=None):
        T = len(inputs)
        if ctx0 is None:
            ctx_prev = np.zeros(self.vocab_size, dtype=np.float32)
        else:
            ctx_prev = ctx0.copy()
        hs = np.zeros((T + 1, self.hidden_size), dtype=np.float32)
        ps = np.zeros((T, self.vocab_size), dtype=np.float32)
        for t in range(T):
            x = inputs[t]
            hs[t + 1] = np.tanh(self.Wxh @ x + self.Wch @ ctx_prev + self.bh)
            o = self.Why @ hs[t + 1] + self.by
            p = softmax(o)
            ps[t] = p
            ctx_prev = p
        return hs, ps

    def bptt(self, inputs, targets, lr=0.01, clip=5.0, ctx0=None):
        T = len(inputs)
        hs, ps = self.forward(inputs, ctx0)
        dWxh = np.zeros_like(self.Wxh)
        dWch = np.zeros_like(self.Wch)
        dWhy = np.zeros_like(self.Why)
        dbh = np.zeros_like(self.bh)
        dby = np.zeros_like(self.by)
        loss = 0.0

        for t in range(T):
            loss += cross_entropy_from_probs(ps[t], int(targets[t]))

        for t in reversed(range(T)):
            dy = ps[t].copy()
            dy[int(targets[t])] -= 1.0
            dWhy += np.outer(dy, hs[t + 1])
            dby += dy
            dh = self.Why.T @ dy
            dh_raw = (1.0 - hs[t + 1] ** 2) * dh
            dbh += dh_raw
            dWxh += np.outer(dh_raw, inputs[t])
            if t > 0:
                dWch += np.outer(dh_raw, ps[t - 1])
        for g in [dWxh, dWch, dWhy, dbh, dby]:
            np.clip(g, -clip, clip, out=g)

        self.Wxh -= lr * dWxh
        self.Wch -= lr * dWch
        self.Why -= lr * dWhy
        self.bh -= lr * dbh
        self.by -= lr * dby

        return loss / T, ps[-1].copy()

    def predict_next(self, x_onehot, ctx_prev):
        h = np.tanh(self.Wxh @ x_onehot + self.Wch @ ctx_prev + self.bh)
        o = self.Why @ h + self.by
        p = softmax(o)
        return p, h, p

In [190]:
# =========================
# 5) Обучение
# =========================
def train_model(model, X, Y, seq_len=50, epochs=80, lr=0.002, name="Model"):
    n = len(X)
    base_lr = lr

    # начальные состояния
    if isinstance(model, ElmanRNN):
        state = np.zeros(model.hidden_size, dtype=np.float32)
    else:
        state = np.zeros(model.vocab_size, dtype=np.float32)

    for epoch in range(1, epochs + 1):
        lr = base_lr * (0.97 ** (epoch // 5))
        total_loss = 0.0
        count = 0

        for i in range(0, n - seq_len, seq_len):
            X_seq = to_onehot_seq(X[i:i + seq_len], model.vocab_size)
            Y_seq = Y[i:i + seq_len]

            if isinstance(model, ElmanRNN):
                loss, state = model.bptt(X_seq, Y_seq, lr=lr, clip=1.0, h0=state)
                state = np.clip(state, -5, 5)
            else:
                loss, state = model.bptt(X_seq, Y_seq, lr=lr, clip=1.0, ctx0=state)
                state = np.clip(state, 0, 1)  # контекст — вероятности, ограничим диапазон

            total_loss += loss * len(X_seq)
            count += len(X_seq)

        avg_loss = total_loss / count
        if epoch % 5 == 0 or epoch == 1 or epoch == epochs:
            print(f"[{name}] epoch {epoch}/{epochs}, avg loss={avg_loss:.4f}, lr={lr:.5f}")

In [191]:
# =========================
# 6) Accuracy и генерация
# =========================
def compute_accuracy(model, X, Y):
    correct, total = 0, 0
    if isinstance(model, ElmanRNN):
        h = np.zeros(model.hidden_size, dtype=np.float32)
        for i in range(len(X)):
            x = onehot(int(X[i]), model.vocab_size)
            p, h = model.predict_next(x, h)
            if np.argmax(p) == int(Y[i]):
                correct += 1
            total += 1
    else:
        ctx = np.zeros(model.vocab_size, dtype=np.float32)
        for i in range(len(X)):
            x = onehot(int(X[i]), model.vocab_size)
            p, h, ctx = model.predict_next(x, ctx)
            if np.argmax(p) == int(Y[i]):
                correct += 1
            total += 1
    return correct / total

def generate(model, seed, length=200, T=0.8):
    out = seed
    if isinstance(model, ElmanRNN):
        h = np.zeros(model.hidden_size, dtype=np.float32)
        for ch in seed[:-1]:
            x = onehot(char2idx.get(ch, 0), model.vocab_size)
            _, h = model.predict_next(x, h)
        last_idx = char2idx.get(seed[-1], 0)
        for _ in range(length):
            x = onehot(last_idx, model.vocab_size)
            p, h = model.predict_next(x, h)
            next_idx = sample_with_temperature(p, T)
            out += idx2char[next_idx]
            last_idx = next_idx
    else:
        ctx = np.zeros(model.vocab_size, dtype=np.float32)
        for ch in seed[:-1]:
            x = onehot(char2idx.get(ch, 0), model.vocab_size)
            p, h, ctx = model.predict_next(x, ctx)
        last_idx = char2idx.get(seed[-1], 0)
        for _ in range(length):
            x = onehot(last_idx, model.vocab_size)
            p, h, ctx = model.predict_next(x, ctx)
            next_idx = sample_with_temperature(p, T)
            out += idx2char[next_idx]
            last_idx = next_idx
    return out

In [193]:
# =========================
# 7) Запуск обучения и генерации
# =========================
elman = ElmanRNN(vocab_size, hidden_size=128)
jordan = JordanRNN(vocab_size, hidden_size=128)

print("\nTraining ElmanRNN...")
train_model(elman, X_train, Y_train, seq_len=80, epochs=50, lr=0.005, name="Elman")

print("\nTraining JordanRNN...")
train_model(jordan, X_train, Y_train, seq_len=80, epochs=50, lr=0.01, name="Jordan")

acc_e = compute_accuracy(elman, X_test, Y_test)
acc_j = compute_accuracy(jordan, X_test, Y_test)
print(f"\nAccuracy — Elman: {acc_e*100:.2f}%, Jordan: {acc_j*100:.2f}%")



Training ElmanRNN...
[Elman] epoch 1/50, avg loss=3.4668, lr=0.00500
[Elman] epoch 5/50, avg loss=2.6280, lr=0.00485
[Elman] epoch 10/50, avg loss=1.8223, lr=0.00470
[Elman] epoch 15/50, avg loss=1.2576, lr=0.00456
[Elman] epoch 20/50, avg loss=0.9201, lr=0.00443
[Elman] epoch 25/50, avg loss=0.7258, lr=0.00429
[Elman] epoch 30/50, avg loss=0.5578, lr=0.00416
[Elman] epoch 35/50, avg loss=0.4141, lr=0.00404
[Elman] epoch 40/50, avg loss=0.2879, lr=0.00392
[Elman] epoch 45/50, avg loss=0.1856, lr=0.00380
[Elman] epoch 50/50, avg loss=0.1138, lr=0.00369

Training JordanRNN...
[Jordan] epoch 1/50, avg loss=3.3023, lr=0.01000
[Jordan] epoch 5/50, avg loss=2.5189, lr=0.00970
[Jordan] epoch 10/50, avg loss=2.3396, lr=0.00941
[Jordan] epoch 15/50, avg loss=2.2544, lr=0.00913
[Jordan] epoch 20/50, avg loss=2.1992, lr=0.00885
[Jordan] epoch 25/50, avg loss=2.1592, lr=0.00859
[Jordan] epoch 30/50, avg loss=2.1281, lr=0.00833
[Jordan] epoch 35/50, avg loss=2.1023, lr=0.00808
[Jordan] epoch 40/50

In [194]:
# =========================
# 8) ГОТОВАЯ LSTM ИЗ PyTorch
# =========================
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class PyTorchLSTM(nn.Module):
    def __init__(self, vocab_size, hidden_size=128, num_layers=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.input_proj = nn.Linear(vocab_size, vocab_size, bias=False)
        self.lstm = nn.LSTM(vocab_size, hidden_size, num_layers, batch_first=True)
        self.output_proj = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden=None):

        if x.dim() == 1:
            x = x.unsqueeze(0)

        x_onehot = torch.zeros(x.size(0), x.size(1), vocab_size, device=x.device)
        x_onehot.scatter_(2, x.unsqueeze(-1), 1.0) # настоящие one-hot тензоры

        x_emb = self.input_proj(x_onehot)
        out, hidden = self.lstm(x_emb, hidden)
        out = self.output_proj(out)
        return out, hidden

# Данные (уже есть выше)
X_train_t = torch.from_numpy(X_train).long().to(device)
Y_train_t = torch.from_numpy(Y_train).long().to(device)
X_test_t  = torch.from_numpy(X_test).long().to(device)
Y_test_t  = torch.from_numpy(Y_test).long().to(device)

torch_lstm = PyTorchLSTM(vocab_size, hidden_size=128, num_layers=1).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(torch_lstm.parameters(), lr=0.005)

print("\nTraining PyTorch LSTM")
seq_len = 80
epochs = 50

for epoch in range(1, epochs + 1):
    torch_lstm.train()
    total_loss = 0.0
    count = 0
    i = 0
    while i + seq_len < len(X_train_t):
        seq = X_train_t[i:i + seq_len]
        target = Y_train_t[i:i + seq_len]

        optimizer.zero_grad()
        output, _ = torch_lstm(seq)
        loss = criterion(output.view(-1, vocab_size), target)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(torch_lstm.parameters(), 1.0)
        optimizer.step()

        total_loss += loss.item() * seq_len
        count += seq_len
        i += seq_len

    if epoch % 5 == 0 or epoch == 1 or epoch == epochs:
        print(f"[PyTorch LSTM] epoch {epoch}/{epochs}, avg loss={total_loss/count:.4f}")

# --------------------- Accuracy ---------------------
def compute_accuracy_torch(model, X, Y):
    model.eval()
    correct = 0
    hidden = None
    with torch.no_grad():
        for i in range(len(X)):
            x = X[i].unsqueeze(0)
            y = Y[i]
            output, hidden = model(x, hidden)
            pred = output[0, -1].argmax().item()
            if pred == y.item():
                correct += 1
    return correct / len(X)


# --------------------- Генерация ---------------------
def generate_torch(model, seed_text, length=600, temperature=0.7):
    model.eval()
    generated = seed_text
    hidden = None

    # Прогрев сети по символам из seed
    for ch in seed_text:
        idx = torch.tensor([[char2idx[ch]]], device=device)
        _, hidden = model(idx, hidden)

    cur_idx = char2idx[seed_text[-1]]
    with torch.no_grad():
        for _ in range(length):
            x = torch.tensor([[cur_idx]], device=device)
            output, hidden = model(x, hidden)
            logits = output[0, -1] / temperature
            probs = torch.softmax(logits, dim=-1).cpu().numpy()
            cur_idx = np.random.choice(vocab_size, p=probs)
            generated += idx2char[cur_idx]
    return generated


Using device: cpu

Training PyTorch LSTM
[PyTorch LSTM] epoch 1/50, avg loss=3.3017
[PyTorch LSTM] epoch 5/50, avg loss=1.8073
[PyTorch LSTM] epoch 10/50, avg loss=0.5064
[PyTorch LSTM] epoch 15/50, avg loss=0.1119
[PyTorch LSTM] epoch 20/50, avg loss=0.0505
[PyTorch LSTM] epoch 25/50, avg loss=0.0335
[PyTorch LSTM] epoch 30/50, avg loss=0.0279
[PyTorch LSTM] epoch 35/50, avg loss=0.0251
[PyTorch LSTM] epoch 40/50, avg loss=0.0235
[PyTorch LSTM] epoch 45/50, avg loss=0.0225
[PyTorch LSTM] epoch 50/50, avg loss=0.0217


In [198]:
seed = "Нейронные се"
print("\nGenerated (Elman):\n", generate(elman, seed, 200, 0.7))
print("\nGenerated (Jordan):\n", generate(jordan, seed, 100, 0.7))
print("\nLSTM ИЗ PyTorch:\n", generate_torch(torch_lstm, seed, length=100, temperature=0.7))
print(f"\nAccuracy — Elman: {acc_e*100:.2f}%, Jordan: {acc_j*100:.2f}%, PyTorch LSTM: {acc_pt*100:.2f}%")


Generated (Elman):
 Нейронные сети Эля пребсказинить брегослы в деменны инимисети, иннел —
сто,менсе ко срододные сети, машинное обучение — что на самом деле означают все эти нынче популярныекловсти  слодол роготсимволеинети постия 

Generated (Jordan):
 Нейронные се —нняюдсвент поляють сим нажента селенисим се преним нные катьных преловать чей ирром процимыесесссе

LSTM ИЗ PyTorch:
 Нейронные сет входные данные, преобразует их через множество слоёв с весами, применяет нелинейные активации и по

Accuracy — Elman: 95.75%, Jordan: 37.50%, PyTorch LSTM: 91.87%
