# Imports

In [None]:
import numpy as np
import pandas as pd
import re
from string import punctuation
import emoji

In [None]:
import nltk
from nltk.tokenize import word_tokenize, TweetTokenizer
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, Embedding, SimpleRNN, GRU

In [None]:
import matplotlib.pyplot as plt

In [None]:
nltk.download("punkt_tab")
nltk.download('stopwords')

# Vars definition

In [None]:
n_gram = 4
window_size = 5

In [None]:
stop_words = set(stopwords.words("english"))
stop_words

# Data preparation

## Initial clean

In [None]:
df = pd.read_csv("data/data.csv")
df

In [None]:
df_review = df[["review"]]
df_review

In [None]:
def clean_text_dataframe(
    df_i: pd.DataFrame,
    columns: list[str] | None = None,
    keep_apostrophe: bool = True,
    min_words: int = 3,
) -> pd.DataFrame:
    """
    Очищает текст и удаляет строки с малым количеством слов

    Params:
        df (pd.DataFrame): Исходный DataFrame
        columns (list[str]|None): Столбцы для обработки (None = все строковые)
        keep_apostrophe (bool): Сохранять апострофы (по умолчанию True)
        min_words (int): Минимальное количество слов для сохранения строки

    Return:
        pd.DataFrame: Очищенная и отфильтрованная копия DataFrame
    """
    df_clean = df_i.copy()

    # Определение целевых столбцов
    if columns is None:
        columns = df_clean.select_dtypes(include=["object", "string"]).columns.tolist()

    # Настройка паттерна для пунктуации
    punct_pattern = r"[{}]".format(
        re.escape(
            punctuation.replace("'", "") if keep_apostrophe else re.escape(punctuation)
        )
    )

    def text_cleaner(text):
        if not isinstance(text, str):
            return text

        # Удаление эмодзи
        text = emoji.replace_emoji(text, replace="")

        # Удаление пунктуации
        text = re.sub(punct_pattern, " ", text)

        # Удаление спецсимволов
        text = re.sub(r"[^a-zA-Z0-9\'\s]", " ", text)

        # Нормализация пробелов
        text = re.sub(r"\s+", " ", text).strip()

        return text

    for col in columns:
        if col in df_clean.columns:
            df_clean[col] = df_clean[col].apply(text_cleaner)

    word_count_mask = (
        df_clean[columns]
        .apply(lambda col: col.str.split().str.len() > min_words)
        .all(axis=1)
    )

    df_clean = df_clean[word_count_mask].reset_index(drop=True)

    return df_clean

In [None]:
df_review = clean_text_dataframe(df_review, min_words=n_gram)
df_review

## Data tokenize

In [None]:
def tokenize_text_dataframe(df_i: pd.DataFrame, tokenizer):
    return pd.DataFrame(
        df_i.iloc[:, 0].apply(
            lambda col: [
                word for word in tokenizer(col.lower()) if word not in stop_words
            ]
        )
    )

In [None]:
df_tokens = tokenize_text_dataframe(
    df_review, TweetTokenizer(match_phone_numbers=False).tokenize
)
df_tokens

In [None]:
df_tokens = df_tokens[df_tokens["review"].apply(lambda col: len(col) > n_gram)].reset_index(drop=True)
df_tokens

In [None]:
def vocab_text_dataframe(df_i: pd.DataFrame):
    return pd.DataFrame(df_i.iloc[:, 0].apply(lambda col: sorted(set(col))))


def idx_text_dataframe(df_i: pd.DataFrame):
    return pd.DataFrame(
        df_i.iloc[:, 0].apply(lambda col: {word: idx for idx, word in enumerate(col)})
    )


def global_idx_text_dataframe(df_i: pd.DataFrame):
    """
    Создаёт словарь {слово: индекс} для всех уникальных слов
    из объединённой первой колонки DataFrame, сохраняя порядок появления слов.
    Возвращает DataFrame с одним словарём в виде строки.
    """
    # Объединяем все элементы из первой колонки в один список
    all_words = sum(df_i.iloc[:, 0].tolist(), [])

    # Удаляем дубликаты с сохранением порядка первого появления
    unique_words = list(set(all_words))

    # Создаём итоговый словарь {слово: индекс}
    combined_dict = {word: idx for idx, word in enumerate(unique_words)}

    return pd.DataFrame(list(combined_dict.items()), columns=["Word", "Index"])

In [None]:
df_vocab = vocab_text_dataframe(df_tokens)
df_word_to_idx = idx_text_dataframe(df_vocab)
df_global_word_to_idx = global_idx_text_dataframe(df_vocab)

In [None]:
df_vocab

In [None]:
df_word_to_idx

In [None]:
df_global_word_to_idx

In [None]:
tokens = df_tokens.iloc[:, 0].to_list()
vocab = df_vocab.iloc[:, 0].to_list()
global_vocab = list(sorted(set([item for sublist in vocab for item in sublist])))
vocab_size = len(global_vocab)
word_to_idx = df_word_to_idx.iloc[:, 0].to_list()
global_word_to_idx = {
    v: k for k, v in df_global_word_to_idx.iloc[:, 0].to_dict().items()
}

In [None]:

word_to_idx

In [None]:
global_word_to_idx

In [None]:
vocab

In [None]:
global_vocab

## Token preparation

### BoW

In [None]:
corpus, y_bow = [], []
for idx, cur_token in enumerate(tokens
                                [: 2 * len(tokens) // 3]
                                ):
    for i in range(len(cur_token) - window_size):
        context = cur_token[i : i + window_size]
        corpus.append(" ".join(context))
        y_bow.append(word_to_idx[idx][cur_token[i + window_size]])

vectorizer = CountVectorizer(vocabulary=global_vocab)
X_bow = vectorizer.fit_transform(corpus).toarray()
y_bow = np.array(y_bow)

#### df

In [None]:
# df_bow = pd.DataFrame({"x": X_bow.tolist(), "y": y_bow.tolist()})
# df_bow

### N-gram

In [None]:
mass_sequences = []
for idx, cur_token in enumerate(tokens):
    mass_sequences.append([])
    for i in range(len(cur_token) - n_gram + 1):
        mass_sequences[idx].append(cur_token[i : i + n_gram])

X_ngram, y_ngram = [], []
for idx, sequences in enumerate(mass_sequences):
    for seq in sequences:
        # print(seq, word_to_idx)
        X_ngram.append([word_to_idx[idx][word] for word in seq[:-1]])
        y_ngram.append(word_to_idx[idx][seq[-1]])

X_ngram = np.array(X_ngram)
y_ngram = np.array(y_ngram)

#### df

In [None]:
# df_ngram = pd.DataFrame({"x": X_ngram.tolist(), "y": y_ngram.tolist()})
# df_ngram

## Data split

In [None]:
X_train_bow, X_test_bow, y_train_bow, y_test_bow = train_test_split(
    X_bow, y_bow, test_size=0.2
)

In [None]:
X_train_ng, X_test_ng, y_train_ng, y_test_ng = train_test_split(
    X_ngram, y_ngram, test_size=0.2
)

In [None]:
X_train_ng[0], y_train_ng[0]

In [None]:
X_train_ng.shape, y_train_ng.shape

In [None]:
X_test_ng.shape, y_test_ng.shape

In [None]:
X_train_bow[0], y_train_bow[0]

In [None]:
X_train_bow.shape, y_train_bow.shape

In [None]:
X_test_bow.shape, y_test_bow.shape

# Models

## Graph

In [None]:
def plot_results(history, title):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history["accuracy"], label="Train Accuracy")
    plt.plot(history.history["val_accuracy"], label="Test Accuracy")
    plt.title(f"{title} - Accuracy")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history["loss"], label="Train Loss")
    plt.plot(history.history["val_loss"], label="Test Loss")
    plt.title(f"{title} - Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    plt.show()

## Model preparing

### Dense

In [None]:
def dense_model(m_type, shape):
    model = Sequential(
        [
            Input((shape,)),
            Dense(128, activation="relu"),
            Dense(vocab_size, activation="softmax"),
        ]
    )
    if m_type == "bow":
        c_loss = "categorical_crossentropy"
    elif m_type == "ng":
        c_loss = "sparse_categorical_crossentropy"
    else:
        print("WRONG TYPE!")
        return
    model.compile(loss=c_loss, optimizer="adam", metrics=["accuracy"])
    return model

### RNN

In [None]:
def rnn_model(m_type, shape):
    model = Sequential(
        [
            Embedding(vocab_size, 128, input_length=shape),
            SimpleRNN(128),
            Dense(vocab_size, activation="softmax"),
        ]
    )
    if m_type == "bow":
        c_loss = "categorical_crossentropy"
    elif m_type == "ng":
        c_loss = "sparse_categorical_crossentropy"
    else:
        print("WRONG TYPE!")
        return
    model.compile(loss=c_loss, optimizer="adam", metrics=["accuracy"])
    return model

### GRU

In [None]:
def gru_model(m_type, shape):
    model = Sequential(
        [
            Embedding(vocab_size, 128, input_length=shape),
            GRU(128),
            Dense(vocab_size, activation="softmax"),
        ]
    )
    if m_type == "bow":
        c_loss = "categorical_crossentropy"
    elif m_type == "ng":
        c_loss = "sparse_categorical_crossentropy"
    else:
        print("WRONG TYPE!")
        return
    model.compile(loss=c_loss, optimizer="adam", metrics=["accuracy"])
    return model

LSTM

In [None]:
def lstm_model(m_type, shape):
    model = Sequential(
        [
            Embedding(vocab_size, 128, input_length=shape),
            GRU(128),
            Dense(vocab_size, activation="softmax"),
        ]
    )
    if m_type == "bow":
        c_loss = "categorical_crossentropy"
    elif m_type == "ng":
        c_loss = "sparse_categorical_crossentropy"
    else:
        print("WRONG TYPE!")
        return
    model.compile(loss=c_loss, optimizer="adam", metrics=["accuracy"])
    return model

## Model training

### Dense

#### Ngram

In [None]:
dense_ng = dense_model("ng", n_gram - 1)
dense_ng_hist = dense_ng.fit(X_train_ng, y_train_ng, epochs=20, validation_data=(X_test_ng, y_test_ng))

In [None]:
plot_results(dense_ng_hist, "dense_ng")

In [None]:
dense_ng.save("models/dense/ng.keras")

#### BoW

In [None]:
dense_bow = dense_model("ng", vocab_size)
dense_bow_hist = dense_bow.fit(
    X_train_bow, y_train_bow, epochs=20, validation_data=(X_test_bow, y_test_bow)
)

In [None]:
plot_results(dense_bow_hist, "dense_bow")

In [None]:
dense_bow.save("models/dense/bow.keras")

### RNN

#### Ngram

In [None]:
rnn_ng = rnn_model("ng", n_gram - 1)
rnn_ng_hist = rnn_ng.fit(X_train_ng, y_train_ng, epochs=10, validation_data=(X_test_ng, y_test_ng))


In [None]:
plot_results(rnn_ng_hist, "rnn_ng")


In [None]:
rnn_ng.save("models/rnn/ng.keras")

#### BoW

In [None]:
rnn_bow = rnn_model("ng", window_size)
rnn_bow_hist = rnn_bow.fit(X_train_bow, y_train_bow, epochs=10, validation_data=(X_test_bow, y_test_bow))

In [None]:
plot_results(rnn_bow_hist, "rnn_bow")

In [None]:
rnn_bow.save("models/rnn/bow.keras")

### GRU

#### Ngram

In [None]:
gru_ng = gru_model("ng", n_gram - 1)
gru_ng_hist = gru_ng.fit(X_train_ng, y_train_ng, epochs=20, validation_data=(X_test_ng, y_test_ng))


In [None]:
plot_results(gru_ng_hist, "gru_ng")


In [None]:
gru_ng.save("models/gru/ng.keras")

#### BoW

In [None]:
gru_bow = gru_model("ng", window_size)
gru_bow_hist = gru_bow.fit(X_train_bow, y_train_bow, epochs=10, validation_data=(X_test_bow, y_test_bow))

In [None]:
plot_results(gru_bow_hist, "gru_bow")

In [None]:
gru_bow.save("models/gru/bow.keras")

### LSTM

#### Ngram

In [None]:
lstm_ng = lstm_model("ng", n_gram - 1)
lstm_ng_hist = lstm_ng.fit(X_train_bow, y_train_bow, epochs=10, validation_data=(X_test_bow, y_test_bow))

In [None]:
plot_results(lstm_ng_hist, "lstm_ng")

In [None]:
lstm_ng.save("models/lstm/ng.keras")

#### BoW

In [None]:
lstm_bow = lstm_model("ng", window_size)
lstm_bow_hist = lstm_bow.fit(X_train_bow, y_train_bow, epochs=10, validation_data=(X_test_bow, y_test_bow))

In [None]:
plot_results(lstm_bow_hist, "lstm_bow")

In [None]:
lstm_bow.save("models/lstm/bow.keras")

## Evaluation

In [None]:
def evaluate_model(model, X_test, y_test, name):
    y_pred = model.predict(X_test).argmax(axis=1)
    print(f"\n{name} Classification Report:")
    print(classification_report(y_test, y_pred, zero_division=0))
    
def eval_bow(model, name):
    evaluate_model(model, X_test_bow, y_test_bow, name + "_bow")
    
def eval_ng(model, name):
    evaluate_model(model, X_test_ng, y_test_ng, name + "_ng")

In [None]:
eval_bow(dense_bow, "Dense")
eval_bow(rnn_bow, "RNN")
eval_bow(gru_bow, "GRU")
eval_bow(lstm_bow, "LSTM")

In [None]:
eval_ng(dense_ng, "Dense")
eval_ng(rnn_ng, "RNN")
eval_ng(gru_ng, "GRU")
eval_bow(lstm_ng, "LSTM")

# Word prediction

In [None]:
def predict_next_word(
    model, input_sequence, word_to_idx, idx_to_word, mode="ngram", top_k=3
):
    """
    Предсказывает следующее слово на основе входной последовательности.

    Параметры:
        model: обученная модель (Keras или sklearn).
        input_sequence: исходное предложение (строка).
        word_to_idx: словарь для преобразования слов в индексы.
        idx_to_word: словарь для преобразования индексов в слова.
        mode: тип модели ("ngram" или "bow").
        top_k: количество вариантов для вывода.
    """
    # Токенизация и преобразование в нижний регистр
    tokens = word_tokenize(input_sequence.lower())
    tokens_idx = [
        word_to_idx.get(word, -1) for word in tokens
    ]  # -1 для неизвестных слов

    # Обработка неизвестных слов (замена на <UNK> или пропуск)
    tokens_idx = [
        idx if idx != -1 else word_to_idx.get("<UNK>", -1) for idx in tokens_idx
    ]
    if -1 in tokens_idx:
        print("Есть неизвестные слова!")
        return []

    # Подготовка данных в зависимости от типа модели
    if mode == "bow":
        # Используем последние window_size слов как контекст
        # window_size = 5  # Должно совпадать с обучением!
        context = tokens_idx[-window_size:]
        if len(context) < window_size:
            # Дополняем нулями слева (pad_sequences)
            context = [0] * (window_size - len(context)) + context

        # Создаем вектор BoW (количество вхождений каждого слова)
        bow_vector = np.zeros(len(word_to_idx))
        for idx in context:
            if idx < len(word_to_idx):
                bow_vector[idx] += 1
        input_data = bow_vector.reshape(1, -1)

    elif mode == "ngram":
        # Используем последние n-1 слов для N-граммной модели
        # n_gram = 3  # Должно совпадать с обучением!
        seq_length = n_gram - 1
        context = tokens_idx[-seq_length:]
        if len(context) < seq_length:
            # Дополняем нулями слева
            context = [0] * (seq_length - len(context)) + context

        input_data = np.array([context])

    else:
        raise ValueError("Режим должен быть 'bow' или 'ngram'")

    # Предсказание
    preds = model.predict(input_data)[0]
    top_indices = preds.argsort()[-top_k:][::-1]  # Топ-K индексов
    top_words = [idx_to_word[idx] for idx in top_indices if idx in idx_to_word]

    return top_words

In [None]:
def generate_text(
    model, 
    seed_text, 
    word_to_idx, 
    idx_to_word, 
    mode="ngram", 
    num_words=5, 
    temperature=1.0, 
    top_k=5
):
    """
    Генерирует последовательность слов на основе начального текста.
    
    Параметры:
        model: обученная модель
        seed_text: начальный текст (строка)
        word_to_idx: словарь слово -> индекс
        idx_to_word: словарь индекс -> слово
        mode: "ngram" (RNN/GRU) или "bow"
        num_words: количество слов для генерации
        temperature: уровень случайности (0.1-2.0)
        top_k: выбор из топ-K вероятных слов
    """
    generated = seed_text.split()
    tokens = word_tokenize(seed_text.lower())
    tokens_idx = [word_to_idx.get(word, word_to_idx.get("<UNK>", 0)) for word in tokens]

    for _ in range(num_words):
        # Подготовка входных данных
        if mode == "bow":
            # window_size = 5
            context = tokens_idx[-window_size:]
            if len(context) < window_size:
                context = [0] * (window_size - len(context)) + context
            
            bow_vector = np.zeros(len(word_to_idx))
            for idx in context:
                bow_vector[idx] += 1
            input_data = bow_vector.reshape(1, -1)
            
        elif mode == "ngram":
            # n_gram = 3
            seq_length = n_gram - 1
            context = tokens_idx[-seq_length:]
            if len(context) < seq_length:
                context = [0] * (seq_length - len(context)) + context
            input_data = np.array([context])
        
        # Предсказание
        preds = model.predict(input_data, verbose=0)[0]
        preds = np.log(preds) / temperature  # Применяем температуру
        exp_preds = np.exp(preds)
        preds = exp_preds / np.sum(exp_preds)
        
        # Выбор из топ-K слов
        top_indices = np.argpartition(preds, -top_k)[-top_k:]
        top_probs = preds[top_indices]
        top_probs = top_probs / np.sum(top_probs)  # Нормализуем
        
        # Сэмплирование
        chosen_idx = np.random.choice(top_indices, p=top_probs)
        chosen_word = idx_to_word.get(chosen_idx, "<UNK>")
        
        # Обновляем контекст
        generated.append(chosen_word)
        tokens_idx.append(chosen_idx)

    return generated

In [None]:
input_sentence = "love this model"
idx_to_word = {v: k for k, v in global_word_to_idx.items()}  # Создаем обратный словарь

# Предсказание через BoW
bow_prediction = generate_text(
    dense_bow, input_sentence, global_word_to_idx, idx_to_word, mode="bow", top_k=3
)
" ".join(bow_prediction)

In [None]:
input_sentence = "love this model"
idx_to_word = {v: k for k, v in global_word_to_idx.items()}  # Создаем обратный словарь

# Предсказание через BoW
bow_prediction = generate_text(
    rnn_ng, input_sentence, global_word_to_idx, idx_to_word, mode="ngram", top_k=3
)
" ".join(bow_prediction)