# Лабораторная 7. Сентимент-анализ

Задачи классификации текста аналогично обычной задаче классификации предполагает присвоение метки класса некоторому тексту. Здесь можно действовать любыми методами для того, чтобы классифицировать текст, но мы пойдем по следующему пути: векторизуем последовательности (обязательно почитайте про подходы к векторизации, об эмбеддингах) и обучим RNN

После обучения базовых моделей разрешается использовать любой другой подход

За выполнение базовой работы можно получить 15 баллов, за преодоление отметки в 94% точности классификации еще 5 баллов
Удачи!

Примечание: обязательно почитайте про лемматизацию, стеминг, TF-IDF и Word2Vec подходы

In [None]:
# Импорт необходимые библиотеки
import pandas as pd  # Для работы с данными в формате таблиц
import numpy as np  # Для работы с массивами и математическими функциями
import matplotlib.pyplot as plt  
import seaborn as sns  # Для улучшенной визуализации данных
from tqdm.auto import tqdm  # Для отображения прогресс-баров

# Импортирт библиотек для обработки текста
import nltk  # Библиотека для работы с текстом
from nltk.corpus import stopwords  # Для работы со списком стоп-слов
from nltk.stem import WordNetLemmatizer  # Для лемматизации слов
import re  # Для работы с регулярными выражениями
from collections import Counter  # Для подсчета частоты слов
from string import punctuation  # Для работы с пунктуацией

# Импорт инструментов для векторизации текста и подготовки данных
from sklearn.feature_extraction.text import TfidfVectorizer  # Для TF-IDF векторизации
from sklearn.model_selection import train_test_split  # Для разделения данных на обучающую и тестовую выборки
from sklearn.preprocessing import LabelEncoder  # Для кодирования меток классов

# Импорт Word2Vec для создания векторных представлений слов
from gensim.models import Word2Vec

# Импорт библиотеки PyTorch для создания и обучения нейронных сетей
import torch  
import torch.nn as nn  # Для создания нейронных сетей
from torch.optim import Adam  # Для оптимизации
from torch.utils.data import DataLoader, TensorDataset  # Для работы с данными в формате тензоров

# Инициализация лемматизатора и кодировщика меток
lemma = WordNetLemmatizer()
lb = LabelEncoder()

In [None]:
# Загрузка необходимых ресурсы NLTK
nltk.download('wordnet')

# Загрузка данных из CSV файла
df = pd.read_csv('twitter_training.csv', header=None)

In [None]:
# Просмотр первых 5 строк данных
df.head()

In [None]:
# Удаление первого столбца
df = df.drop(0, axis=1)

In [None]:
# Переименовываем столбцы 
df = df.rename(columns={1: "Feature2", 3: "Feature1", 2: "labels"})


In [None]:
# Объединение текстовых данных из двух столбцов в один
df["tweets"] = df["Feature1"].astype(str) + " " + df["Feature2"].astype(str)

In [None]:
# Удаление исходных текстовых столбцов
df = df.drop(["Feature1", "Feature2"], axis=1)


In [None]:
# Создание словаря для кодирования меток классов
df_labels = {key: value for value, key in enumerate(np.unique(df['labels']))}

In [None]:
# Проверка уникальных меток классов
np.unique(df[1])

In [None]:
# Функция для получения числовой метки по тексту
def getlabel(n): 
    for x, y in df_labels.items(): 
        if y == n: 
            return x

In [None]:
# Функция для предобработки текста
def DataPrep(text): 
    text = re.sub('<.*?>', '', text)  # Удаляем HTML теги
    text = re.sub(r'\d+', '', text)  # Удаляем числа
    text = re.sub(r'[^\w\s]', '', text)  # Удаляем специальные символы
    text = re.sub(r'http\S+', '', text)  # Удаляем URL
    text = re.sub(r'@\S+', '', text)  # Удаляем упоминания
    text = re.sub(r'#\S+', '', text)  # Удаляем хештеги
    
    # Токенизация
    tokens = nltk.word_tokenize(text) 
    
    # Удаляем пунктуацию
    punc = list(punctuation)
    words = [word for word in tokens if word not in punc]
    
    # Удаление стоп-слова
    stop_words = set(stopwords.words('english'))
    words = [word for word in words if not word.lower() in stop_words]
    
    # Лемматизация
    words = [lemma.lemmatize(word) for word in words] 
    
    # Возвращение слова обратно в строку
    text = ' '.join(words)
    
    return text


In [None]:
# Применение предобработки к каждому твиту
df['cleaned_tweets'] = df['tweets'].apply(DataPrep)

In [None]:
# Вывод количества дубликатов
print(f'There are around {int(df["cleaned_tweets"].duplicated().sum())} duplicated tweets, we will remove them.')

# Удаление дубликатов
df.drop_duplicates("cleaned_tweets", inplace=True)

# Добавление столбца с длиной твитов
df['tweet_len'] = [len(text.split()) for text in df.cleaned_tweets]

# Удаление твитов с длиной больше 99.5% квантиля
df = df[df['tweet_len'] < df['tweet_len'].quantile(0.995)]

In [None]:
# Визуализация распределения длины твитов
plt.figure(figsize=(16, 5))
ax = sns.countplot(x='tweet_len', data=df[(df['tweet_len'] <= 1000) & (df['tweet_len'] > 10)], palette='Blues_r')
plt.title('Count of tweets with high number of words', fontsize=25)
plt.yticks([])
ax.bar_label(ax.containers[0])
plt.ylabel('count')
plt.xlabel('')
plt.show()

## Обучаем модели

In [None]:
# Определение максимальной длины твитов
MAX_LEN = np.max(df['tweet_len'])


In [None]:
# Функция для подготовки данных для LSTM
def lstm_prep(column, seq_len):
    # Создаем словарь слов из текстов
    corpus = [word for text in column for word in text.split()]
    words_count = Counter(corpus)  # Подсчитываем количество слов
    sorted_words = words_count.most_common()  # Сортируем слова по частоте
    vocab_to_int = {w: i + 1 for i, (w, c) in enumerate(sorted_words)}  # Создаем словарь для преобразования слов в индексы
    
    text_int = []  # Список для хранения преобразованных текстов
    
    # Преобразуем каждый текст в последовательность индексов
    for text in column:
        token = [vocab_to_int[word] for word in text.split()]  # Преобразуем слова в индексы
        text_int.append(token)  # Добавляем последовательность в список
        
    # Подгоняем длину последовательностей
    features = np.zeros((len(text_int), seq_len), dtype=int)  # Создаем массив нулей для хранения последовательностей
    for idx, y in tqdm(enumerate(text_int)):  # Проходим по всем последовательностям
        if len(y) <= seq_len:  # Если длина последовательности меньше или равна максимальной
            zeros = list(np.zeros(seq_len - len(y)))  # Создаем список нулей
            new = zeros + y  # Добавляем нули в начало последовательности
        else:  # Если длина больше максимальной
            new = y[:seq_len]  # Обрезаем последовательность до максимальной длины
            
        features[idx, :] = np.array(new)  # Заполняем массив последовательностями
        
    return sorted_words, features  # Возвращаем отсортированные слова и массив признаков

In [None]:
# Подготавка данных для LSTM
VOCAB, tokenized_column = lstm_prep(df['cleaned_tweets'], MAX_LEN)

In [None]:
# Вывод первых 10 слов из словаря
VOCAB[:10]

In [None]:
# Получение размеров токенизированного столбца
tokenized_column.shape


In [None]:
# Функция для визуализации самых распространенных слов
def most_common_words(vocab):
    keys = []  # Список для хранения слов
    values = []  # Список для хранения их частоты
    for key, value in vocab[:30]:  # Проходим по 30 самым распространенным словам
        keys.append(key)  # Добавляем слово в список
        values.append(value)  # Добавляем частоту в список
        
    plt.figure(figsize=(15, 5))  # Устанавливаем размер графика
    ax = plt.bar(keys, values)  # Строим столбчатую диаграмму
    plt.title('Top 20 most common words', size=25)  # Заголовок графика
    plt.ylabel("Words count")  # Подпись оси Y
    plt.xticks(rotation=45)  # Поворачиваем подписи по оси X
    plt.subplots_adjust(bottom=0.15)  # Увеличиваем отступ снизу
    plt.show()  # Отображаем график

In [None]:
# Визуализируем самые распространенные слова
most_common_words(VOCAB)


In [None]:
# Определение X и y для обучения
X = tokenized_column
y = lb.fit_transform(df['labels'].values)  # Кодируем метки классов

# Разделение данных на обучающую и валидационную выборки
X_train, X_val, Y_train, Y_val = train_test_split(X, y, train_size=0.85, random_state=42)

In [None]:
# Создание датасета для PyTorch
train_data = TensorDataset(torch.from_numpy(X_train), torch.LongTensor(Y_train))
val_data = TensorDataset(torch.from_numpy(X_val), torch.LongTensor(Y_val))

In [None]:
 # Устанавливаем размер батча
BATCH_SIZE = 64

# Создаем загрузчики данных для обучающей и валидационной выборок
train_dataloader = DataLoader(
    dataset=train_data,
    batch_size=BATCH_SIZE,
    shuffle=True  # Перемешиваем данные
)
val_dataloader = DataLoader(
    dataset=val_data,
    batch_size=BATCH_SIZE,
    shuffle=False  # Не перемешиваем валидационные данные
)

In [None]:
# Устанавливаем размерность эмбеддингов
EMBEDDING_DIM = 200

In [None]:
Word2vec_train_data = list(map(lambda x: x.split(), df['cleaned_tweets']))  # Подготовка данные для Word2Vec
word2vec_model = Word2Vec(Word2vec_train_data, vector_size=EMBEDDING_DIM)  # Обучение модели Word2Vec


In [None]:
# Функция для создания матрицы весов эмбеддингов
def weight_matrix(model, vocab):
    vocab_size = len(vocab) + 1  # Размер словаря
    embedding_matrix = np.zeros((vocab_size, EMBEDDING_DIM))  # Создание матрицы нулей для эмбеддингов
    for word, token in vocab:  # Проход по всем словам в словаре
        if model.wv.__contains__(word):  # Если слово есть в модели
            embedding_matrix[token] = model.wv.__getitem__(word)  # Получение вектор слова и добавляем в матрицу
    return embedding_matrix  # Возврат матрицу весов


In [None]:
# Получаем матрицу весов эмбеддингов
embedding_vec = weight_matrix(word2vec_model, VOCAB)
print("Embedding Matrix Shape:", embedding_vec.shape)  # Вывод форму матрицы весов

In [None]:
# Функция для подсчета параметров модели
def param_count(model):
    params = [p.numel() for p in model.parameters() if p.requires_grad]  # Получаем количество параметров, требующих градиента
    print('The Total number of parameters in the model : ', sum(params))  # Выводим общее количество параметров

In [None]:
# Определяем архитектуру модели
class Model(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_layers, hidden_dim, out_channels, bidirectional, device='cpu'):
        super().__init__()  # Инициализация родительского класса
        print(device)  # Выводим устройство (CPU или GPU)
        self.no_layers = num_layers  # Сохраняем количество слоев
        self.hidden_dim = hidden_dim  # Сохраняем размер скрытого слоя
        self.out_channels = out_channels  # Сохраняем количество выходных каналов
        self.num_directions = 2 if bidirectional else 1  # Определяем количество направлений LSTM
        self.embedding = nn.Embedding(vocab_size, embedding_dim)  # Создаем слой эмбеддингов
        self.embedding = self.embedding.to(device)  # Переносим слой на устройство
        self.device = device  # Сохраняем устройство
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers,
            dropout=0.5,  # Добавляем дроп-аут для предотвращения переобучения
            bidirectional=bidirectional,
            batch_first=True  # Указываем, что первый размер - размер батча
        )
        self.lstm = self.lstm.to(device)  # Переносим LSTM на устройство
        
        self.fc = nn.Linear(hidden_dim * self.num_directions, out_channels)  # Полносвязный слой
        self.fc = self.fc.to(device)  # Переносим слой на устройство
        
    # Определяем прямой проход модели
    def forward(self, x):
        h0 = torch.zeros((self.no_layers * self.num_directions, x.size(0), self.hidden_dim)).to(self.device)  # Начальное состояние скрытого слоя
        c0 = torch.zeros((self.no_layers * self.num_directions, x.size(0), self.hidden_dim)).to(self.device)  # Начальное состояние ячейки
        
        embedded = self.embedding(x)  # Получаем эмбеддинги для входных данных
        
        out, _ = self.lstm(embedded, (h0, c0))  # Пропускаем эмбеддинги через LSTM
        
        out = out[:, -1, :]  # Берем последний выход LSTM для классификации
        
        out = self.fc(out)  # Пропускаем через полносвязный слой
        
        return out  # Возвращаем выход модели


In [None]:
# определение параметров модели
VOCAB_SIZE = len(VOCAB) + 1  # Размер словаря
NUM_LAYERS = 2  # Количество слоев LSTM
OUT_CHANNELS = 4  # Количество выходных классов
HIDDEN_DIM = 256  # Размер скрытого слоя
BIDIRECTIONAL = True  # Используем двунаправленный LSTM
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'  # Определение устройства
# Создание экземпляра модели
model = Model(VOCAB_SIZE, EMBEDDING_DIM, NUM_LAYERS, HIDDEN_DIM, OUT_CHANNELS, BIDIRECTIONAL, DEVICE)

# Копирование веса эмбеддингов из матрицы весов
model.embedding.weight.data.copy_(torch.from_numpy(embedding_vec))

# Разрешение обновление весов эмбеддингов
model.embedding.weight.requires_grad = True

# Перенос модели на GPU, если есть возможность
if torch.cuda.is_available():
    model = model.cuda()

In [None]:
# счетчик параметров модели
param_count(model)

In [None]:
# Определение функцим потерь и оптимизатор
criterion = nn.CrossEntropyLoss()  # Функция потерь для многоклассовой классификации
optimizer = Adam(model.parameters(), lr=0.001)  # Оптимизатор Adam

# Устанавление количества эпох
epochs = 10 
training_loss = []  # Список для хранения потерь
training_acc = []  # Список для хранения точности

# Цикл обучения
for i in tqdm(range(epochs)):
    epoch_loss = 0  # Суммарные потери за эпоху
    epoch_acc = 0  # Суммарная точность за эпоху
    for batch, (x_train, y_train) in enumerate(train_dataloader):  # Проход по батчам
        x_train, y_train = x_train.to(DEVICE), y_train.to(DEVICE)  # Перенос данныч на устройство
        y_pred = model(x_train)  # Получаем предсказания модели
        
        loss = criterion(y_pred, y_train)  # Вычисляем потери
        
        if batch % 500 == 0:  # Каждые 500 батчей выводим информацию
            print(f"Looked at {batch * len(x_train)}/{len(train_dataloader.dataset)} samples.")
            
        loss.backward()  # Вычисление градиентов
        optimizer.step()  # Обновление параметров модели
        optimizer.zero_grad()  # Обнуление градиентов
        
        epoch_loss += loss  # Суммирование потери за эпоху
        epoch_acc += accuracy_score(y_train.cpu(), y_pred.argmax(dim=1).cpu())  # Вычисляем точность
        
    # Сохранение потерь и точности за эпоху
    training_loss.append((epoch_loss / len(train_dataloader)).detach().cpu().numpy())
    training_acc.append(epoch_acc / len(train_dataloader))
    
    # Вывод результатов обучения за эпоху
    print(f"Epoch {i+1}: Accuracy: {(epoch_acc / len(train_dataloader)) * 100}, Loss: {(epoch_loss / len(train_dataloader))}\n\n")

## Время Prediction

In [None]:
# Загружаем тестовый набор данных из CSV файла
test_df = pd.read_csv('twitter_validation.csv', header=None)
print(test_df.head())  # Выводим первые 5 строк для проверки

# Удаляем первый столбец (индексы)
test_df = test_df.drop(0, axis=1)

# Переименовываем столбцы для удобства
test_df = test_df.rename(columns={1: "Feature2", 3: "Feature1", 2: "labels"})
test_df.head()  # Проверяем изменения


In [None]:
# Объединяем тексты из двух столбцов в один столбец "tweets"
test_df["tweets"] = test_df["Feature1"].astype(str) + " " + test_df["Feature2"].astype(str)

# Удаляем старые столбцы Feature1 и Feature2
test_df = test_df.drop(["Feature1", "Feature2"], axis=1)

In [None]:
test_df.head()  # Проверка изменения


In [None]:
# Функция для выполнения предсказаний на случайных твитах
def make_predictions(row):
    # Случайным образом выбираем 10 твитов из тестового набора
    random_data = row.sample(n=10, random_state=42)  # Установлен random_state для воспроизводимости
    random_tweets = random_data['tweets'].values  # Извлекаем тексты твитов
    
    # Очищаем текст твитов
    cleaned_tweets = [DataPrep(tweet) for tweet in random_tweets]  # Используем list comprehension для улучшения читаемости
    
    # Преобразуем очищенные твиты в векторы
    x_test = vec.transform(cleaned_tweets).toarray()  # Преобразуем текст в векторы
    
    # Извлекаем истинные метки классов
    y_test = random_data['labels'].values
    
    # Подготавливаем данные для LSTM
    _, X_test = lstm_prep(cleaned_tweets, MAX_LEN)  # Подготовка данных
    X_test = torch.from_numpy(X_test).to(DEVICE)  # Переводим данные в тензор и на нужное устройство

    # Получаем предсказания от модели LSTM
    lstm_pred = model(X_test)  # Получаем предсказания
    lstm_pred = torch.softmax(lstm_pred, dim=1).argmax(dim=1)  # Применяем softmax и получаем индексы классов
    
    # Получаем метки классов по индексам
    pred = np.array([getlabel(lstm_pred[i]) for i in range(len(lstm_pred))])  
    
    # Выводим оригинальные твиты, их метки и предсказания модели
    for i in tqdm(range(2)):  # Ограничиваем вывод первыми двумя твитами
        print(f"The original tweet : {random_tweets[i]}\n")  # Выводим оригинальный твит
        print(f"The original label : {y_test[i]}\n")  # Выводим истинную метку
        print(f"The lstm prediction is : {getlabel(lstm_pred[i])}\n")  # Выводим предсказанную метку
        print('-' * 120)  # Разделитель для читабельности

    # Вычисляем и выводим точность предсказаний
    accuracy = accuracy_score(pred, y_test)  # Вычисляем точность
    print(f'Accuracy of predictions: {accuracy:.2f}')  # Выводим точность с двумя знаками после запятой

# Вызов функцию для выполнения предсказаний на тестовом наборе данных
make_predictions(test_df)  


## Что дальше?

Попытайтесь улучшить модель (попробуйте GRU), изменить подход к токенизации данных и так далее, удачи!