In [24]:
import shutil
from shutil import get_terminal_size
import os
import kagglehub
from tqdm import tqdm

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import nltk
import re

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

from textblob import TextBlob

from sklearn.model_selection import train_test_split

from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences


from sklearn.multioutput import MultiOutputClassifier
from sklearn.svm import SVC
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import multilabel_confusion_matrix

import ast

import joblib
import warnings
warnings.filterwarnings('ignore')

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MultiLabelBinarizer

In [33]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [7]:
current_dir = os.getcwd()
print(current_dir)

c:\Users\kakao\Desktop\Ajou_SocialNetworkAnalysis\Project


In [8]:
goe1 = os.path.join(current_dir, "goemotion_dataset/goemotions_1.csv")
goe2 = os.path.join(current_dir, "goemotion_dataset/goemotions_2.csv")
goe3 = os.path.join(current_dir, "goemotion_dataset/goemotions_3.csv")


goemotion_df1 = pd.read_csv(goe1)
goemotion_df2 = pd.read_csv(goe2)
goemotion_df3 = pd.read_csv(goe3)

In [16]:
df = pd.concat([goemotion_df1, goemotion_df2, goemotion_df3], axis=0, ignore_index=True)

df = df[df['example_very_unclear'] == False] # uncler = True인 row 제거

df = df.dropna(subset=['text'])

df.drop_duplicates(subset=['text'], inplace=True)

df = df.drop(['id', 'author', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'rater_id', 'example_very_unclear','neutral'], axis=1)
df.columns

Index(['text', 'admiration', 'amusement', 'anger', 'annoyance', 'approval',
       'caring', 'confusion', 'curiosity', 'desire', 'disappointment',
       'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear',
       'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride',
       'realization', 'relief', 'remorse', 'sadness', 'surprise'],
      dtype='object')

In [14]:
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')

def preprocess_text(text):
    text = text.lower()

    text = re.sub(r'[^a-z\s]', '', text)

    tokens = word_tokenize(text)

    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]

    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(word) for word in tokens]

    return ' '.join(tokens)

[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\kakao\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\kakao\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\kakao\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [15]:
df['text'] = df['text'].apply(str)
df['text'] = df['text'].apply(preprocess_text)

In [18]:
df = df[~(df.loc[:, 'admiration':'surprise'].sum(axis=1) == 0)]

In [35]:
texts = df['text']
emotions = df.loc[:, 'admiration':'surprise']

In [36]:
X_train, X_test, y_train, y_test = train_test_split(texts, emotions, test_size=0.2, random_state=42)

In [37]:
class EmotionDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

In [38]:
tokenizer = Tokenizer(num_words=10000)  # 최대 5000개의 단어만 고려
tokenizer.fit_on_texts(X_train)

X_train_seq = tokenizer.texts_to_sequences(X_train)
X_test_seq = tokenizer.texts_to_sequences(X_test)

In [58]:
max_seq_length = 350
X_train_pad = pad_sequences(X_train_seq, maxlen=max_seq_length, padding='post')
X_test_pad = pad_sequences(X_test_seq, maxlen=max_seq_length, padding='post')

X_train_tensor = torch.tensor(X_train_pad, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_pad, dtype=torch.long)

y_train_array = y_train.to_numpy()  # 또는 y_train.values
y_test_array = y_test.to_numpy()

y_train_tensor = torch.tensor(y_train_array, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_array, dtype=torch.float32)

train_dataset = EmotionDataset(X_train_tensor, y_train_tensor)
test_dataset = EmotionDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [59]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=2, alpha=0.25, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, outputs, targets):
        # Binary Cross Entropy Loss
        bce_loss = F.binary_cross_entropy(outputs, targets, reduction='none')
        pt = torch.exp(-bce_loss)  # pt = probability of true class
        focal_loss = self.alpha * (1 - pt) ** self.gamma * bce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss


In [60]:
def load_glove_embeddings(glove_file, vocab, embedding_dim):

    embedding_index = {}

    # GloVe 파일 로드
    with open(glove_file, "r", encoding="utf-8") as f:
        for line in f:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype="float32")
            embedding_index[word] = vector

    # 단어 집합 크기 정의
    vocab_size = len(vocab) + 1  # 0번 인덱스는 패딩에 사용
    embedding_matrix = np.zeros((vocab_size, embedding_dim))  # 초기화

    # 단어 -> GloVe 벡터 매핑
    for word, i in vocab.items():
        if i >= vocab_size:
            continue
        embedding_vector = embedding_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[i] = embedding_vector  # 매핑된 벡터 추가

    return embedding_matrix

In [61]:
vocab_size = 10000  # 최대 단어 수
embedding_dim = 128
hidden_dim = 64
output_dim = y_train.shape[1]  # 감정 레이블 수

In [62]:
# GloVe 파일 경로 및 임베딩 차원 설정
glove_dir = os.path.join(current_dir, "glove.6B.100d.txt")
glove_file = glove_dir  # GloVe 임베딩 파일 경로
embedding_dim = 100  # GloVe 벡터 차원

# 토크나이저에서 단어 집합(vocab) 가져오기
vocab = tokenizer.word_index  # 토크나이저의 단어 -> 인덱스 매핑

# GloVe 임베딩 매트릭스 생성
embedding_matrix = load_glove_embeddings(glove_file, vocab, embedding_dim)

# PyTorch 모델 정의
class ImprovedDeepLSTMEmotionModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=3, dropout=0.3, embedding_matrix=None):
        super(ImprovedDeepLSTMEmotionModel, self).__init__()

        # Embedding Layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        if embedding_matrix is not None:
            self.embedding.weight.data.copy_(torch.tensor(embedding_matrix))  # GloVe 적용
            self.embedding.weight.requires_grad = True  # GloVe 미세조정(fine-tuning) 활성화

        # Bi-LSTM Layer
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout,
            bidirectional=True  # 양방향 LSTM
        )

        # Conv1D Layer for Feature Extraction
        self.conv1d = nn.Conv1d(hidden_dim * 2, hidden_dim, kernel_size=3, padding=1)

        # Fully Connected Layer
        self.fc = nn.Linear(hidden_dim, output_dim)

        # Dropout and BatchNorm
        self.dropout = nn.Dropout(dropout)
        self.batch_norm = nn.BatchNorm1d(hidden_dim)

        # Sigmoid for Multi-label Classification
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Embedding
        embedded = self.embedding(x)

        # Bi-LSTM
        lstm_out, _ = self.lstm(embedded)  # lstm_out shape: (batch, seq_len, hidden_dim * 2)

        # Conv1D
        conv_out = self.conv1d(lstm_out.permute(0, 2, 1))  # Conv1D requires (batch, channels, seq_len)
        conv_out = F.relu(conv_out)

        # Global Average Pooling
        pooled = torch.mean(conv_out, dim=2)  # shape: (batch, hidden_dim)

        # Batch Normalization
        normed = self.batch_norm(pooled)

        # Fully Connected + Dropout
        output = self.fc(self.dropout(normed))
        return self.sigmoid(output)




Load Model

In [48]:
device

device(type='cpu')

In [64]:
improved_model = ImprovedDeepLSTMEmotionModel(
    vocab_size=len(vocab) + 1,
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    output_dim=output_dim,
    num_layers=3,
    dropout=0.3,
    embedding_matrix=embedding_matrix
)

# model_dir = os.path.join(current_dir, "improved_model.pth")
# state_dict = torch.load(model_dir, map_location=torch.device('cpu'))  # state_dict 로드
# improved_model.load_state_dict(state_dict)  # state_dict를 모델에 로드


In [65]:
criterion = FocalLoss(gamma=2, alpha=0.25)
optimizer = torch.optim.Adam(improved_model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5)

epochs = 100
for epoch in range(epochs):
    improved_model.train()  # 학습 모드 활성화
    total_loss = 0

    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
    for texts, labels in progress_bar:
        texts, labels = texts.to(device), labels.to(device)

        optimizer.zero_grad()  # 옵티마이저 초기화
        outputs = improved_model(texts)  # 모델 출력
        loss = criterion(outputs, labels)  # 손실 계산
        loss.backward()  # 역전파
        optimizer.step()  # 가중치 업데이트

        total_loss += loss.item()
        progress_bar.set_postfix(loss=f"{loss.item():.4f}")

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}, Average Loss: {avg_loss:.4f}")
    scheduler.step(avg_loss)

# 학습 완료 후 모델 저장
# torch.save(improved_model.state_dict(), "improved_model.pth")

Epoch 1/100:   2%|▏         | 16/1049 [00:08<09:19,  1.85it/s, loss=0.0416]


KeyboardInterrupt: 

In [None]:
improved_model.eval()  # 평가 모드 활성화
total_loss = 0
all_labels = []
all_preds = []

progress_bar = tqdm(test_loader, desc="Evaluating")

with torch.no_grad():
    for texts, labels in progress_bar:
        texts, labels = texts.to(device), labels.to(device)
        outputs = improved_model(texts)

        loss = criterion(outputs, labels)
        total_loss += loss.item()

        probabilities = outputs  # Sigmoid가 이미 적용된 상태
        preds = (probabilities > 0.5).float()  # 0.5 임계값으로 이진화

        all_labels.append(labels.cpu())
        all_preds.append(preds.cpu())

        progress_bar.set_postfix(loss=f"{loss.item():.4f}")

avg_loss = total_loss / len(test_loader)
print(f"Test Loss: {avg_loss:.4f}")

# Confusion Matrix 출력
all_labels = torch.cat(all_labels).numpy()
all_preds = torch.cat(all_preds).numpy()

### Visualization Confusion Matrix

In [None]:
def calculate_emotion_confusion_matrix(true_labels, predicted_labels, label_names):

    num_labels = len(label_names)
    matrix = np.zeros((num_labels, num_labels), dtype=int)

    for true, pred in zip(true_labels, predicted_labels):
        true_indices = np.where(true == 1)[0]  # True labels
        pred_indices = np.where(pred == 1)[0]  # Predicted labels

        for t in true_indices:
            for p in pred_indices:
                matrix[t, p] += 1

    return pd.DataFrame(matrix, index=label_names, columns=label_names)

# 레이블 이름
emotion_labels = ['admiration', 'amusement', 'anger', 'annoyance', 'approval',
       'caring', 'confusion', 'curiosity', 'desire', 'disappointment',
       'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear',
       'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride',
       'realization', 'relief', 'remorse', 'sadness', 'surprise', 'neutral']  # 예시

conf_matrix_df = calculate_emotion_confusion_matrix(all_labels, all_preds, emotion_labels)

print(conf_matrix_df)

# Confusion Matrix 시각화
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix_df, annot=True, fmt="d", cmap="Blues", xticklabels=emotion_labels, yticklabels=emotion_labels)
plt.title("Emotion Confusion Matrix")
plt.xlabel("Predicted Emotion")
plt.ylabel("True Emotion")
plt.show()

In [27]:
def print_directory_tree(startpath, prefix=""):
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, "").count(os.sep)
        indent = " " * 4 * level
        print(f"{prefix}{indent}📁 {os.path.basename(root)}")  # 디렉토리 출력
        sub_indent = " " * 4 * (level + 1)
        for f in files:
            print(f"{prefix}{sub_indent}📄 {f}")  # 파일 출력

# 실행
print_directory_tree(current_dir)  # your_directory_path_here에 작업 디렉토리 경로 입력

📁 Project
    📄 .env
    📄 .gitattributes
    📄 app.py
    📄 classification_report_k_10.csv
    📄 classification_report_k_30.csv
    📄 df_lstm_training.ipynb
    📄 glove.6B.100d.txt
    📄 improved_model.pth
    📄 improve_lstm.ipynb
    📄 ml_k_30_train_text_after_process.csv
    📄 ml_train_text_after_process.csv
    📄 project_base.ipynb
    📄 project_base.py
    📄 random_forest_model_textK_10.pkl
    📄 random_forest_model_textK_30.pkl
    📄 README.md
    📄 total_song_lyrics.csv
    📄 total_song_lyrics_result.csv
    📄 total_song_lyrics_result_add_MLk10.csv
    📄 training_ml_model.ipynb
    📁 Client
        📄 main.html
        📄 personer_lyric.html
    📁 emotion_lyric
        📄 Taylor_Swift
        📄 Taylor_Swift.csv
    📁 glove
        📄 glove.6B.100d.txt
        📄 glove.6B.200d.txt
        📄 glove.6B.300d.txt
        📄 glove.6B.50d.txt
    📁 goemotion_dataset
        📄 goemotions_1.csv
        📄 goemotions_2.csv
        📄 goemotions_3.csv
    📁 Lib
    📁 share
        📁 jupyter
       