<a href="https://colab.research.google.com/github/BVika/Methods_of_semantic_information_processing/blob/main/%D0%9B%D0%B0%D0%B1_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Задания
Реализовать GPT как в п.2

## Установка библиотек

In [145]:
!pip install nltk
!pip install pymorphy3



## Импорт Библиотек и Загрузка Данных

In [None]:
import nltk
from nltk.stem import SnowballStemmer
from pymorphy3 import MorphAnalyzer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import numpy as np
import math

nltk.download('punkt_tab')
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

## Текст(датабейс)

In [204]:
input_text = 'Солнце медленно поднималось над горизонтом, окрашивая небо в яркие оттенки оранжевого и розового. Птицы начали петь, наполняя утро мелодиями.'

phrases = [
    'Солнце поднималось',
    'Небо окрашивалось',
    'Птицы начали',
    'Утро наполнилось',
    'Горизонт был',
    'Мир просыпался',
    'Звуки природы',
    'Время летело'
    ]

mask = [
    'горизонтом',
    'в краски',
    'петь',
    'мелодиями',
    'красивым',
    'новыми',
    'звучали',
    'быстро'
    ]

## Обработка текста

In [205]:
class TextProcessor:
    def split_text(self, content: str) -> list[str]:
        """Токенизация текста."""
        return word_tokenize(content)

    def normalize_words(self, tokens: list[str]) -> list[str]:
        """Лемматизация токенов."""
        morph = MorphAnalyzer()
        return [morph.parse(word)[0].normal_form for word in tokens]

    def reduce_words(self, tokens: list[str]) -> list[str]:
        """Стемминг токенов."""
        stemmer = SnowballStemmer("russian")
        return [stemmer.stem(word) for word in tokens]

    def create_vector(self, tokens: list[str]) -> list[int]:
        """Создание векторного представления слов."""
        vector_dict = {}
        vector_result = []
        for word in tokens:
            if word not in vector_dict:
                vector_dict[word] = len(vector_dict)
            vector_result.append(vector_dict[word])
        return vector_result

    def filter_stopwords(self, tokens: list[str]) -> list[str]:
        """Удаление стоп-слов из токенов."""
        stop_words = set(stopwords.words('russian')).union(['.', ',', ':', '?', '!'])
        return [word for word in tokens if word not in stop_words]

    def word_frequency(self, tokens: list[str]) -> dict[str, int]:
        """Создание словаря частот слов."""
        freq_dict = {}
        for word in tokens:
            freq_dict[word] = freq_dict.get(word, 0) + 1
        return freq_dict

    def term_frequency(self, tokens: list[str]) -> dict[str, float]:
        """Расчет частоты термина (TF)."""
        freq_dict = self.word_frequency(tokens)
        total_tokens = len(tokens)
        return {word: count / total_tokens for word, count in freq_dict.items()}

    def inverse_document_frequency(self, documents: list[list[str]]) -> dict[str, float]:
        """Расчет обратной частоты документа (IDF)."""
        idf_dict = {}
        all_words = set(word for doc in documents for word in set(doc))
        total_docs = len(documents)
        for word in all_words:
            idf_dict[word] = math.log(total_docs / sum(word in doc for doc in documents))
        return idf_dict

    def tf_idf(self, documents: list[list[str]], doc_index: int) -> dict[str, float]:
        """Расчет TF-IDF для заданного документа."""
        tf = self.term_frequency(documents[doc_index])
        idf = self.inverse_document_frequency(documents)
        return {word: tf[word] * idf[word] for word in tf}


## "Нормализация" весов

In [206]:
def xavier_initialization(size):
    limit = np.sqrt(6 / (size[0] + size[1]))
    return np.random.uniform(-limit, limit, size)

In [207]:
class Transformer:
    def __init__(self, vocab_size, embed_size, hidden_size, learning_rate=0.01):
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.learning_rate = learning_rate

        self.initialize_parameters()

    def initialize_parameters(self):
        """Инициализация параметров модели."""
        self.embeddings = np.random.randn(self.vocab_size, self.embed_size) * np.sqrt(1. / self.embed_size)
        self.W_query = xavier_initialization((self.embed_size, self.hidden_size))
        self.W_key = xavier_initialization((self.embed_size, self.hidden_size))
        self.W_value = xavier_initialization((self.embed_size, self.hidden_size))
        self.W_output = xavier_initialization((self.hidden_size, self.vocab_size))

    def softmax(self, x):
        exp_x = np.exp(x - np.max(x))  # Для численной стабильности
        return exp_x / np.sum(exp_x, axis=0)

    def forward_pass(self, input_indices):
        """Прямой проход через модель."""
        self.input_indices = input_indices
        self.embedded_input = self.get_embedded_input(input_indices)

        self.Q, self.K, self.V = self.calculate_attention_components()
        self.attention_weights = self.calculate_attention_weights()
        self.attended_values = self.calculate_attended_values()

        self.context_vector = np.mean(self.attended_values, axis=0)
        self.logits = self.calculate_logits()
        self.probabilities = self.softmax(self.logits)

        return self.probabilities

    def get_embedded_input(self, input_indices):
        """Получение встраивания входных индексов."""
        return self.embeddings[input_indices]  # (seq_len, embed_size)

    def calculate_attention_components(self):
        """Расчет Q, K и V."""
        Q = self.embedded_input @ self.W_query  # (seq_len, hidden_size)
        K = self.embedded_input @ self.W_key
        V = self.embedded_input @ self.W_value
        return Q, K, V

    def calculate_attention_weights(self):
        """Расчет весов внимания."""
        attention_scores = self.Q @ self.K.T / np.sqrt(self.embed_size)  # (seq_len, seq_len)
        return self.softmax(attention_scores)  # (seq_len, seq_len)

    def calculate_attended_values(self):
        """Расчет значений, на которые обращается внимание."""
        return self.attention_weights @ self.V  # (seq_len, hidden_size)

    def calculate_logits(self):
        """Расчет логитов для выходного слоя."""
        return self.context_vector @ self.W_output  # (vocab_size,)

    def backward_pass(self, target_index):
        """Обратный проход для обновления весов."""
        d_logits = self.probabilities.copy()
        d_logits[target_index] -= 1  # (vocab_size,)

        dW_output = self.calculate_dW_output(d_logits)
        d_context = d_logits @ self.W_output.T  # (hidden_size,)

        d_attended_values = self.calculate_d_attended_values(d_context)
        dV = self.attention_weights.T @ d_attended_values  # (hidden_size,)

        d_attention_weights = self.calculate_d_attention_weights(d_attended_values)
        d_scores = self.calculate_d_scores(d_attention_weights)

        dQ, dK = self.calculate_gradients(d_scores)
        dW_query, dW_key, dW_value = self.calculate_weight_gradients(dQ, dK, dV)

        self.update_embeddings(dQ, dK, dV)
        self.update_weights(dW_output, dW_query, dW_key, dW_value)

    def calculate_dW_output(self, d_logits):
        """Расчет градиента для выходных весов."""
        return np.outer(self.context_vector, d_logits)  # (hidden_size, vocab_size)

    def calculate_d_attended_values(self, d_context):
        """Расчет градиента для значений, на которые обращается внимание."""
        # d_context имеет размер (hidden_size,)
        # Учитываем, что d_attended_values - это среднее значение, поэтому делим на количество последовательных элементов
        seq_len = self.attended_values.shape[0]
        return np.ones_like(self.attended_values) * (d_context / seq_len)

    def calculate_d_attention_weights(self, d_attended_values):
        """Расчет градиента для весов внимания."""
        # d_attended_values имеет размер (seq_len, hidden_size)
        return d_attended_values @ self.V.T  # (seq_len, seq_len)

    def calculate_d_scores(self, d_attention_weights):
        """Расчет градиента для оценок внимания."""
        # Используем производную функции softmax
        return d_attention_weights * self.attention_weights * (1 - self.attention_weights)  # (seq_len, seq_len)

    def calculate_gradients(self, d_scores):
        """Расчет градиентов для Q и K."""
        dQ = d_scores @ self.K / np.sqrt(self.embed_size)  # (seq_len, hidden_size)
        dK = d_scores.T @ self.Q / np.sqrt(self.embed_size)  # (hidden_size, seq_len)
        return dQ, dK

    def calculate_weight_gradients(self, dQ, dK, dV):
        """Расчет градиентов для весов Q, K и V."""
        dW_query = self.embedded_input.T @ dQ  # (embed_size, hidden_size)
        dW_key = self.embedded_input.T @ dK  # (embed_size, hidden_size)
        dW_value = self.embedded_input.T @ dV  # (embed_size, hidden_size)
        return dW_query, dW_key, dW_value

    def update_embeddings(self, dQ, dK, dV):
        """Обновление встраиваний на основе градиентов."""
        d_embedding = dQ @ self.W_query.T + dK @ self.W_key.T + dV @ self.W_value.T  # (seq_len, embed_size)
        for i, idx in enumerate(self.input_indices):
            self.embeddings[idx] -= self.learning_rate * d_embedding[i]

    def update_weights(self, dW_output, dW_query, dW_key, dW_value):
        """Обновление весов модели."""
        self.W_output -= self.learning_rate * dW_output
        self.W_query -= self.learning_rate * dW_query
        self.W_key -= self.learning_rate * dW_key
        self.W_value -= self.learning_rate * dW_value

    def train_step(self, input_indices, target_index):
        """Шаг обучения."""
        self.forward_pass(input_indices)
        self.backward_pass(target_index)

    def predict(self, input_indices):
        """Предсказание на основе входных индексов."""
        probabilities = self.forward_pass(input_indices)
        return np.argmax(probabilities)

## Обработка текста и подготовка данных

In [208]:
text_processor = TextProcessor()

In [209]:
# Обработка текстов
tokens = text_processor.filter_stopwords(text_processor.normalize_words(text_processor.split_text(input_text)))
# Векторизация словаря
vocabulary = list(set(tokens))
word_to_index = {word: i for i, word in enumerate(vocabulary)}
index_to_word = {i: word for word, i in word_to_index.items()}
print(tokens)
print(vocabulary)
print(word_to_index)
print(index_to_word)

['солнце', 'медленно', 'подниматься', 'горизонт', 'окрашивать', 'небо', 'яркий', 'оттенок', 'оранжевый', 'розовый', 'птица', 'начать', 'петь', 'наполнять', 'утро', 'мелодия']
['солнце', 'птица', 'яркий', 'окрашивать', 'утро', 'мелодия', 'горизонт', 'медленно', 'небо', 'подниматься', 'наполнять', 'начать', 'розовый', 'оранжевый', 'оттенок', 'петь']
{'солнце': 0, 'птица': 1, 'яркий': 2, 'окрашивать': 3, 'утро': 4, 'мелодия': 5, 'горизонт': 6, 'медленно': 7, 'небо': 8, 'подниматься': 9, 'наполнять': 10, 'начать': 11, 'розовый': 12, 'оранжевый': 13, 'оттенок': 14, 'петь': 15}
{0: 'солнце', 1: 'птица', 2: 'яркий', 3: 'окрашивать', 4: 'утро', 5: 'мелодия', 6: 'горизонт', 7: 'медленно', 8: 'небо', 9: 'подниматься', 10: 'наполнять', 11: 'начать', 12: 'розовый', 13: 'оранжевый', 14: 'оттенок', 15: 'петь'}


## Подготовка данных для обучения

In [210]:
X = []
y = []

for phrase, label in zip(phrases, mask):
    print(f"Обрабатываемая фраза: {phrase}, метка: {label}")  # Выводим фразы и метки
    tokens = text_processor.filter_stopwords(text_processor.normalize_words(text_processor.split_text(phrase)))
    print(f"Токены после фильтрации: {tokens}")  # Выводим токены после фильтрации

    if len(tokens) < 2:
        print("Пропускаем фразу из-за недостаточной длины.")  # Сообщение о пропуске
        continue  # Пропускаем фразы с недостаточным количеством токенов

    input_indices = [word_to_index.get(word, -1) for word in tokens if word in word_to_index]
    input_indices = [index for index in input_indices if index != -1]  # Удаляем недопустимые индексы

    if len(input_indices) == 0:
        print("Пропускаем фразу из-за отсутствия допустимых токенов.")  # Сообщение о пропуске
        continue  # Пропускаем пустые массивы

    label_index = word_to_index.get(label, -1)
    if label_index == -1:
        print("Пропускаем фразу из-за недопустимой метки.")  # Сообщение о пропуске
        continue  # Пропускаем, если метка недопустима

    X.append(input_indices)
    y.append(label_index)

Обрабатываемая фраза: Солнце поднималось, метка: горизонтом
Токены после фильтрации: ['солнце', 'подниматься']
Пропускаем фразу из-за недопустимой метки.
Обрабатываемая фраза: Небо окрашивалось, метка: в краски
Токены после фильтрации: ['небо', 'окрашиваться']
Пропускаем фразу из-за недопустимой метки.
Обрабатываемая фраза: Птицы начали, метка: петь
Токены после фильтрации: ['птица', 'начать']
Обрабатываемая фраза: Утро наполнилось, метка: мелодиями
Токены после фильтрации: ['утро', 'наполниться']
Пропускаем фразу из-за недопустимой метки.
Обрабатываемая фраза: Горизонт был, метка: красивым
Токены после фильтрации: ['горизонт']
Пропускаем фразу из-за недостаточной длины.
Обрабатываемая фраза: Мир просыпался, метка: новыми
Токены после фильтрации: ['мир', 'просыпаться']
Пропускаем фразу из-за отсутствия допустимых токенов.
Обрабатываемая фраза: Звуки природы, метка: звучали
Токены после фильтрации: ['звук', 'природа']
Пропускаем фразу из-за отсутствия допустимых токенов.
Обрабатываемая 

## Обучение модели

In [211]:
model = Transformer(vocab_size=len(vocabulary), embed_size=16, hidden_size=32)

for epoch in range(10000):
    total_loss = 0
    for i in range(len(X)):
        model.train_step(X[i], y[i])
        probs = model.forward_pass(X[i])

        # Проверяем, чтобы не было нуля
        if probs[y[i]] > 0:
            total_loss += -np.log(probs[y[i]])
        else:
            total_loss += np.inf  # Или какое-то значение, чтобы указать на ошибку

    # Вывод информации о процессе обучения
    if epoch % 1000 == 0:
        avg_loss = total_loss / len(X)
        print(f"Epoch {epoch}: Loss: {avg_loss:.4f}")

Epoch 0: Loss: 2.7791
Epoch 1000: Loss: 0.0026
Epoch 2000: Loss: 0.0010
Epoch 3000: Loss: 0.0006
Epoch 4000: Loss: 0.0004
Epoch 5000: Loss: 0.0003
Epoch 6000: Loss: 0.0002
Epoch 7000: Loss: 0.0002
Epoch 8000: Loss: 0.0002
Epoch 9000: Loss: 0.0001


## Предсказание

In [212]:
    print("\nПредсказания для новых фраз:")
    new_phrases = [
        'Солнце поднималось над горизонтом',
        'Небо стало ярким',
        'Птицы пели мелодии',
        'Утро было прекрасным'
    ]

    for phrase in new_phrases:
        print(f"\nОбрабатываемая новая фраза: '{phrase}'")
        tokens = text_processor.filter_stopwords(text_processor.normalize_words(text_processor.split_text(phrase)))
        print(f"Токены после фильтрации: {tokens}")

        input_indices = [word_to_index.get(word, -1) for word in tokens if word in word_to_index]
        input_indices = [index for index in input_indices if index != -1]  # Удаляем недопустимые индексы

        if len(input_indices) == 0:
            print("Нет допустимых токенов для предсказания.")
            continue

        predicted_index = model.predict(input_indices)
        predicted_word = index_to_word.get(predicted_index, "Неизвестное слово")
        print(f"Предсказанное слово: '{predicted_word}'")


Предсказания для новых фраз:

Обрабатываемая новая фраза: 'Солнце поднималось над горизонтом'
Токены после фильтрации: ['солнце', 'подниматься', 'горизонт']
Предсказанное слово: 'петь'

Обрабатываемая новая фраза: 'Небо стало ярким'
Токены после фильтрации: ['небо', 'стать', 'яркий']
Предсказанное слово: 'петь'

Обрабатываемая новая фраза: 'Птицы пели мелодии'
Токены после фильтрации: ['птица', 'петь', 'мелодия']
Предсказанное слово: 'петь'

Обрабатываемая новая фраза: 'Утро было прекрасным'
Токены после фильтрации: ['утро', 'прекрасный']
Предсказанное слово: 'наполнять'
