In [None]:
import pandas as pd
import numpy as np
import networkx as nx
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import nltk
import re
import matplotlib.pyplot as plt
import dgl
import itertools

from allennlp.modules.elmo import Elmo, batch_to_ids
from eunjeon import Mecab
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split

from dgl.dataloading import GraphDataLoader
from sklearn.metrics import accuracy_score, recall_score, f1_score

plt.rc('font', family='Malgun Gothic')

In [None]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')

In [None]:
data_dir = '../analysis_files/files/'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# 감성대화말뭉치(최종데이터)_Training.csv 파일을 pandas로 읽어옵니다.
df = pd.read_csv(os.path.join(data_dir, '감성대화말뭉치(최종데이터)_Validation.csv'), encoding='cp949')
df = df[['사람문장1', '시스템문장1', '사람문장2','시스템문장2','사람문장3','시스템문장3','감정_대분류', '상황키워드']]

In [None]:
# ELMo 모델 초기화
options_file = "../analysis_files/elmo/elmo_2x4096_512_2048cnn_2xhighway_5.5B_options.json"
weight_file = "../analysis_files/elmo/elmo_2x4096_512_2048cnn_2xhighway_5.5B_weights.hdf5"
elmo = Elmo(options_file, weight_file, num_output_representations=1)

# mecab 모델 다운로드
mecab = Mecab()

# elmo 모델 다운로드
elmo = elmo.to(device)

In [None]:
# 한국어 문장 형태소 분리 함수
def tokenize_korean_sentence(sentence):
    tokens = mecab.morphs(sentence)
    return tokens

In [None]:
def preprocessing(sentences) :
    sentences = re.sub(r'\([^)]*\)', '', sentences)
    sentences = sentences.replace('.', '')
    sentences = re.sub(r'[^가-힣\s]', '', sentences)
    sentences = re.sub(r'\b(?:cm|km|etc)\b', '', sentences)
    return sentences

In [None]:
def extract_sentence_features(sentence):
    # TF-IDF 벡터화 객체 생성
    tfidf_vectorizer = TfidfVectorizer(token_pattern=r"(?u)\b\w+\b")

    # 문장 길이
    sentence_length = len(sentence)

    # 품사 개수
    tokens = mecab.pos(sentence)
    pos_tags = [tag for _, tag in tokens]
    num_pos_tags = len(pos_tags)

    # 명사 추출
    nouns = mecab.nouns(sentence)

    # TF-IDF 벡터화 및 상위 3개 단어 추출
    top_words = ['', '', '']  # 단어가 없을 경우 빈 문자열로 초기화
    top_scores = [0.0, 0.0, 0.0]  # 단어가 없을 경우 TF-IDF 스코어를 0.0으로 초기화
    top_word_vectors = np.zeros((3,))  # 단어가 없을 경우 0 벡터로 초기화

    if nouns:
        tfidf_matrix = tfidf_vectorizer.fit_transform(nouns)
        feature_names = tfidf_vectorizer.get_feature_names_out()
        tfidf_scores = tfidf_matrix.toarray()[0]
        top_indices = np.argsort(tfidf_scores)[-3:][::-1]  # 상위 3개 단어의 인덱스 추출
        top_words = [feature_names[index] for index in top_indices]  # 상위 3개 단어 추출
        top_scores = [tfidf_scores[index] for index in top_indices]  # 상위 3개 단어의 TF-IDF 스코어 추출
        top_word_vectors = [tfidf_matrix.toarray()[0][index] for index in top_indices]  # 상위 3개 단어의 벡터값 추출

    return sentence_length, num_pos_tags, top_words, top_scores, top_word_vectors

In [None]:
data_train = []
data_test = []

# 레이블을 정수형으로 변환
label_mapping_emotion = {'기쁨': 0, '당황': 1, '분노': 2, '불안' : 3, '상처' : 4,'슬픔' : 5}  # 감정에 해당하는 레이블과 정수 매핑
label_mapping_situation = {'가족관계': 0, '건강': 1, '건강,죽음': 2, '대인관계' : 3, '대인관계(부부, 자녀)' : 4, '연애,결혼,출산' : 5, '재정' : 6, '재정,은퇴,노후준비' : 7, '직장, 업무 스트레스' : 8, '진로,취업,직장' : 9, '학교폭력/따돌림' : 10, '학업 및 진로' : 11}  # 상황에 해당하는 레이블과 정수 매핑

In [None]:
def calculate_and_add_similarity(G, i, source_node, target_node):
    prev_sentence_embedding = G.nodes[source_node]['feature']
    curr_sentence_embedding = G.nodes[target_node]['feature']

    # 코사인 유사도를 계산하기 위해 두 벡터를 같은 크기로 변경 (1차원으로 변환)
    prev_sentence_embedding = prev_sentence_embedding.view(-1)
    curr_sentence_embedding = curr_sentence_embedding.view(-1)

    # 코사인 유사도 계산
    similarity_score = F.cosine_similarity(prev_sentence_embedding, curr_sentence_embedding, dim=0)

    # 엣지 특성으로 추가
    G.edges[source_node, target_node]['sentence_similarity'] = similarity_score


In [None]:
# 문장의 최대 길이를 30으로 설정
max_sentence_length = 30

# 그래프 생성 및 노드 추가
graphs = []
for _, row in df.iterrows():
    G = nx.Graph()
    sentences = [row['사람문장1'], row['시스템문장1'], row['사람문장2'], row['시스템문장2'], row['사람문장3'], row['시스템문장3']]

    for i, sentence in enumerate(sentences):
        print(i, sentence)
        if pd.isna(sentence):  # NaN 값 처리
            sentence_embedding = torch.zeros(max_sentence_length, 1024).to(device)  # 0 벡터로 처리
        else:
            sentence = preprocessing(sentence)
            # 문장을 형태소로 분리
            tokens = tokenize_korean_sentence(sentence)
            # 문장을 ELMo 임베딩으로 변환
            character_ids = batch_to_ids([tokens]).to(device)  # GPU로 이동
            embeddings_output = elmo(character_ids)  # GPU로 이동
            sentence_embedding = torch.mean(embeddings_output["elmo_representations"][0], dim=0).to('cpu')
            # 문장 특징 추출
            sentence_length, num_pos_tags, top_words, top_scores, top_word_vectors = extract_sentence_features(sentence)

            # 변환된 특징 데이터를 텐서로 변환
            sentence_embedding_tensor = torch.tensor(sentence_embedding)

            # 문장의 길이를 확인하고 부족한 부분을 0 벡터로 패딩
            if sentence_embedding_tensor.size(0) < max_sentence_length:
                padding_size = max_sentence_length - sentence_embedding_tensor.size(0)
                padding_tensor = torch.zeros(padding_size, 1024)
                sentence_embedding_tensor = torch.cat([sentence_embedding_tensor, padding_tensor], dim=0)

            # 문장의 길이가 최대 길이를 초과하는 경우, 최대 길이까지만 남기고 잘라줌
            sentence_embedding_tensor = sentence_embedding_tensor[:max_sentence_length]

            length_tensor = torch.tensor(sentence_length)
            pos_tags_tensor = torch.tensor(num_pos_tags)
            top_word_vectors_tensor = torch.tensor(top_word_vectors)

        G.add_node(i)

        # 노드 특성 추가
        G.nodes[i]['feature'] = sentence_embedding_tensor
        G.nodes[i]['length'] = length_tensor
        G.nodes[i]['pos_tags'] = pos_tags_tensor
        G.nodes[i]['top_word_vectors'] = top_word_vectors_tensor

        if i > 0:
            # 엣지 정보 생성
            edge_index = i-1  # 엣지 번호
            G.add_edge(i-1, i, edge_index=edge_index)  # 엣지 정보는 인덱스로 설정

            # 엣지의 추가적인 특성 추가 (sentence_similarity, edge_type)
            calculate_and_add_similarity(G, i, i - 1, i)

            if i in [1, 3]:
                edge_type = 1 # 사람
                G.add_edge(i-1, i+1, edge_type=edge_type)

            elif i in [2, 4]:
                edge_type = 2 # 시스템
                G.add_edge(i-1, i+1, edge_type=edge_type)
                calculate_and_add_similarity(G, i, i - 2, i)
            else :
                continue

    # 감정 및 상황 레이블 할당
    y_emotion = label_mapping_emotion[row['감정_대분류']]
    y_situation = label_mapping_situation[row['상황키워드']]

    graphs.append((G, y_emotion, y_situation))

In [None]:
# 그래프 시각화
for i in range(4):  # 3개의 그래프만 표시
    graph, _, _ = graphs[i]  # graph, y_emotion, y_situation 중 graph만 사용
    pos = nx.spring_layout(graph)  # 그래프의 노드 위치 결정
    nx.draw(graph, pos, with_labels=True, node_color='lightblue', edge_color='gray', font_size=8)  # 그래프 그리기
    node_attributes = graph.nodes(data=True)  # 노드의 속성 정보 가져오기
    edge_attributes = graph.edges(data=True)  # 엣지의 속성 정보 가져오기


    # for node, attributes in node_attributes:
    #     x, y = pos[node]
    #     # 노드 속성 표시
    #     attribute_text = f"Node: {node}\n"
    #     for attr_name, attr_value in attributes.items():
    #         attribute_text += f"{attr_name.capitalize()}: {attr_value}\n"
    #     plt.text(x, y + 0.05, s=attribute_text, fontsize=6, ha='center', va='bottom')

    for edge in edge_attributes:
        x1, y1 = pos[edge[0]]
        x2, y2 = pos[edge[1]]
        # 엣지 속성 표시
        attribute_text = f"Edge: {edge[0]}-{edge[1]}\n"
        for attr_name, attr_value in edge[2].items():
            attribute_text += f"{attr_name.capitalize()}: {attr_value}\n"
        plt.text((x1 + x2) / 2, (y1 + y2) / 2, s=attribute_text, fontsize=6, ha='center', va='bottom')

    # y_emotion과 y_situation 레이블 표시
    # y_emotion = label_mapping_emotion[row['감정_대분류']]
    # y_situation = label_mapping_situation[row['상황키워드']]
    # plt.text(0, 1.1, s=f"y_emotion: {y_emotion}", fontsize=8, ha='center', va='bottom')
    # plt.text(0, 1.05, s=f"y_situation: {y_situation}", fontsize=8, ha='center', va='bottom')

    plt.show()

In [None]:
dgl_graphs = []
for (G, y_emotion, y_situation) in graphs:
    # Convert NetworkX graph to DGL graph
    dgl_G = dgl.DGLGraph(G)
    # 노드 및 엣지 속성을 graph.ndata 및 graph.edata에 저장
    num_nodes = len(G.nodes())
    num_edges = len(G.edges())
    dgl_G.ndata['feature'] = torch.zeros(num_nodes, 1024)
    dgl_G.ndata['length'] = torch.zeros(num_nodes , 1)
    dgl_G.ndata['pos_tags'] = torch.zeros(num_nodes, 1)
    dgl_G.ndata['top_word_vectors'] = torch.zeros(num_nodes, 3)  # top_word_vectors의 크기를 3으로 고정
    dgl_G.edata['edge_type'] = torch.zeros(num_edges * 2, 1)
    dgl_G.edata['sentence_similarity'] = torch.zeros(num_edges * 2, 1)

    for i in G.nodes():
        feature = G.nodes[i]['feature']
        if feature.shape != (1024,):
            # feature의 크기가 (1024,)가 아닌 경우, 1024 차원의 평균을 취하여 크기를 (1024,)로 축소
            feature = torch.mean(feature, dim=0)
        dgl_G.ndata['feature'][i] = feature
        dgl_G.ndata['length'][i] = G.nodes[i]['length']
        dgl_G.ndata['pos_tags'][i] = G.nodes[i]['pos_tags']
        top_word_vectors = G.nodes[i]['top_word_vectors']
        if top_word_vectors.shape != (3,):
            # top_word_vectors의 크기가 (3,)가 아닌 경우, 3 차원으로 크기를 고정하고 0으로 채워진 텐서로 변환
            top_word_vectors = torch.tensor(top_word_vectors)
            expanded_top_word_vectors = torch.zeros(3)
            expanded_top_word_vectors[:top_word_vectors.size(0)] = top_word_vectors
            top_word_vectors = expanded_top_word_vectors
        dgl_G.ndata['top_word_vectors'][i] = top_word_vectors

    for i, j in G.edges():
        if 'edge_type' in G.edges[i, j]:
            dgl_G.edata['edge_type'][j-1] = G.edges[i, j]['edge_type']
        else:
            dgl_G.edata['edge_type'][j-1] = 0

        if 'sentence_similarity' in G.edges[i, j]:
            dgl_G.edata['sentence_similarity'][j] = G.edges[i, j]['sentence_similarity']
        else:
            dgl_G.edata['sentence_similarity'][j] = 0

    dgl_G.ndata['y_emotion'] = torch.tensor([y_emotion] * num_nodes)
    dgl_G.ndata['y_situation'] = torch.tensor([y_situation] * num_nodes)

    dgl_graphs.append(dgl_G)

In [None]:
for i in range(4):
    graph = dgl_graphs[i]
    print(f"Graph {i+1}:")
    print("Number of nodes:", graph.number_of_nodes())
    print("Number of edges:", graph.number_of_edges())
    print("Node features:")
    print(graph.ndata)
    print("Edge features:")
    print(graph.edata)
    print()

In [None]:
# 데이터 분할
train_graphs, test_graphs = train_test_split(dgl_graphs, test_size=0.2, random_state=42)

In [None]:
class GCNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(GCNModel, self).__init__()
        self.conv1 = dgl.nn.SAGEConv(input_dim, hidden_dim, 'mean')  # SAGEConv를 사용하여 엣지 속성을 노드로 전파
        self.conv2 = dgl.nn.SAGEConv(hidden_dim, hidden_dim, 'mean')
        self.conv3 = dgl.nn.SAGEConv(hidden_dim, hidden_dim, 'mean')
        self.fc_emotion = nn.Linear(hidden_dim, output_dim['emotion'])
        self.fc_situation = nn.Linear(hidden_dim, output_dim['situation'])
        self.linear_transform = nn.Linear(input_dim-5, hidden_dim)
        self.linear_transform2 = nn.Linear(hidden_dim * 2, hidden_dim)

    def forward(self, g):
        x = g.ndata['feature']
        length = g.ndata['length'].view(-1, 1)
        pos_tags = g.ndata['pos_tags'].view(-1, 1)
        top_word_vectors = g.ndata['top_word_vectors']
        sentence_similarity = g.edata['sentence_similarity']
        sentence_similarity = self.linear_transform(sentence_similarity)

        # 속성들을 모델에 입력으로 추가
        x = torch.cat([x, length, pos_tags, top_word_vectors], dim=1)
        x = self.conv1(g, x, edge_weight=sentence_similarity)  # SAGEConv에 sentence_similarity 엣지 속성 전달
        x = F.relu(x)
        x = self.conv2(g, x, edge_weight=sentence_similarity)  # SAGEConv에 sentence_similarity 엣지 속성 전달
        x = F.relu(x)
        x = self.conv3(g, x, edge_weight=sentence_similarity)  # SAGEConv에 sentence_similarity 엣지 속성 전달
        x = F.relu(x)

        sentence_similarity=sentence_similarity[:192]
        if sentence_similarity.shape == (192, 128) and x.shape == (192, 128):
            # 엣지 속성을 노드 속성과 함께 연결하여 모델의 입력으로 사용
            x = torch.cat([x, sentence_similarity], dim=1)
            x = self.linear_transform2(x)
            emotion_out = self.fc_emotion(x)
            situation_out = self.fc_situation(x)
        else :
            emotion_out = torch.zeros(192, 6)
            situation_out = torch.zeros(192, 12)

        return emotion_out, situation_out

In [None]:
# 학습 모델 초기화 및 손실 함수, 옵티마이저 설정
input_dim = 1029
hidden_dim = 128
learning_rate = 0.001
output_dim = {'emotion': len(label_mapping_emotion), 'situation': len(label_mapping_situation)}
model = GCNModel(input_dim, hidden_dim, output_dim)
loss_function = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# 데이터 로더 생성
batch_size = 32
train_loader = GraphDataLoader(train_graphs, batch_size=batch_size, shuffle=True)
test_loader = GraphDataLoader(test_graphs, batch_size=batch_size, shuffle=True)


In [None]:
# 평가 함수 정의
def evaluate(model, dataloader):
    model.eval()
    y_true_emotion = []
    y_pred_emotion = []
    y_true_situation = []
    y_pred_situation = []

    with torch.no_grad():
        for batch in dataloader:
            g = batch
            labels_emotion = g.ndata['y_emotion']
            labels_situation = g.ndata['y_situation']

            # Pad labels_emotion and labels_situation to size (192,) if they are smaller
            if labels_emotion.shape[0] < 192:
                padding_dim = 192 - labels_emotion.shape[0]
                labels_emotion = F.pad(labels_emotion, (0, padding_dim), value=0)
                labels_emotion = labels_emotion.clone().detach()  # Detach the tensor from the computation graph
            if labels_situation.shape[0] < 192:
                padding_dim = 192 - labels_situation.shape[0]
                labels_situation = F.pad(labels_situation, (0, padding_dim), value=0)
                labels_situation = labels_situation.clone().detach()  # Detach the tensor from the computation graph


            outputs_emotion, outputs_situation = model(g)
            _, predicted_emotion = torch.max(outputs_emotion, 1)
            _, predicted_situation = torch.max(outputs_situation, 1)
            y_true_emotion.extend(labels_emotion.tolist())
            y_pred_emotion.extend(predicted_emotion.tolist())
            y_true_situation.extend(labels_situation.tolist())
            y_pred_situation.extend(predicted_situation.tolist())

    accuracy_emotion = accuracy_score(y_true_emotion, y_pred_emotion)
    recall_emotion = recall_score(y_true_emotion, y_pred_emotion, average='macro')
    f1_emotion = f1_score(y_true_emotion, y_pred_emotion, average='macro')
    accuracy_situation = accuracy_score(y_true_situation, y_pred_situation)
    recall_situation = recall_score(y_true_situation, y_pred_situation, average='macro')
    f1_situation = f1_score(y_true_situation, y_pred_situation, average='macro')

    return accuracy_emotion, recall_emotion, f1_emotion, accuracy_situation, recall_situation, f1_situation



In [None]:
num_epochs = 200
train_acc_emotion_list = []
train_acc_situation_list = []
test_acc_emotion_list = []
test_acc_situation_list = []

for epoch in range(num_epochs):

    model.train()
    for batch in test_loader:
        g = batch
        labels_emotion = g.ndata['y_emotion']
        labels_situation = g.ndata['y_situation']

        # Pad labels_emotion and labels_situation to size (192,) if they are smaller
        if labels_emotion.shape[0] < 192:
            padding_dim = 192 - labels_emotion.shape[0]
            labels_emotion = F.pad(labels_emotion, (0, padding_dim), value=0)
            labels_emotion = labels_emotion.clone().detach()  # Detach the tensor from the computation graph
        if labels_situation.shape[0] < 192:
            padding_dim = 192 - labels_situation.shape[0]
            labels_situation = F.pad(labels_situation, (0, padding_dim), value=0)
            labels_situation = labels_situation.clone().detach()  # Detach the tensor from the computation graph


        optimizer.zero_grad()
        outputs_emotion, outputs_situation = model(g)
        loss_emotion = loss_function(outputs_emotion, labels_emotion)
        loss_situation = loss_function(outputs_situation, labels_situation)
        loss = loss_emotion + loss_situation
        if loss.requires_grad:  # loss를 계산할 필요가 있는 경우에만 backward를 수행
            loss.backward()
            optimizer.step()
        else:
            pass

    for batch in train_loader:
        g = batch
        labels_emotion = g.ndata['y_emotion']
        labels_situation = g.ndata['y_situation']

        # Pad labels_emotion and labels_situation to size (192,) if they are smaller
        if labels_emotion.shape[0] < 192:
            padding_dim = 192 - labels_emotion.shape[0]
            labels_emotion = F.pad(labels_emotion, (0, padding_dim), value=0)
            labels_emotion = labels_emotion.clone().detach()  # Detach the tensor from the computation graph
        if labels_situation.shape[0] < 192:
            padding_dim = 192 - labels_situation.shape[0]
            labels_situation = F.pad(labels_situation, (0, padding_dim), value=0)
            labels_situation = labels_situation.clone().detach()  # Detach the tensor from the computation graph


        optimizer.zero_grad()
        outputs_emotion, outputs_situation = model(g)
        loss_emotion = loss_function(outputs_emotion, labels_emotion)
        loss_situation = loss_function(outputs_situation, labels_situation)
        loss = loss_emotion + loss_situation
        if loss.requires_grad:  # loss를 계산할 필요가 있는 경우에만 backward를 수행
            loss.backward()
            optimizer.step()
        else:
            pass

    # 학습 데이터에 대한 평가
    train_accuracy_emotion, train_recall_emotion, train_f1_emotion, train_accuracy_situation, train_recall_situation, train_f1_situation = evaluate(model, train_loader)
    # 테스트 데이터에 대한 평가
    test_accuracy_emotion, test_recall_emotion, test_f1_emotion, test_accuracy_situation, test_recall_situation, test_f1_situation = evaluate(model, test_loader)

    # Append accuracy values to the lists
    train_acc_emotion_list.append(train_accuracy_emotion)
    train_acc_situation_list.append(train_accuracy_situation)
    test_acc_emotion_list.append(test_accuracy_emotion)
    test_acc_situation_list.append(test_accuracy_situation)

    print(f"Epoch {epoch+1}/{num_epochs}:")
    print(f"Train Emotion Accuracy: {train_accuracy_emotion:.4f} Recall: {train_recall_emotion:.4f} F1-score: {train_f1_emotion:.4f}")
    print(f"Train Situation Accuracy: {train_accuracy_situation:.4f} Recall: {train_recall_situation:.4f} F1-score: {train_f1_situation:.4f}")
    print(f"Test Emotion Accuracy: {test_accuracy_emotion:.4f} Recall: {test_recall_emotion:.4f} F1-score: {test_f1_emotion:.4f}")
    print(f"Test Situation Accuracy: {test_accuracy_situation:.4f} Recall: {test_recall_situation:.4f} F1-score: {test_f1_situation:.4f}")
    print()


In [None]:
# Plotting the accuracy trend
epochs = range(1, num_epochs+1)

plt.plot(epochs, train_acc_emotion_list, label='Train Emotion Accuracy')
plt.plot(epochs, train_acc_situation_list, label='Train Situation Accuracy')
plt.plot(epochs, test_acc_emotion_list, label='Test Emotion Accuracy')
plt.plot(epochs, test_acc_situation_list, label='Test Situation Accuracy')

plt.title('Accuracy Trend')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()