## 준비

### 패키지 설치
- KoBERT 오픈소스 내 requirements.txt를 참고
- https://github.com/SKTBrain/KoBERT/blob/master/kobert_hf/requirements.txt

In [1]:
!pip install mxnet
!pip install gluonnlp==0.8.0
!pip install tqdm pandas
!pip install torch
!pip install sentencepiece
!pip install transformers
!pip install 'git+https://github.com/SKTBrain/KoBERT.git#egg=kobert_tokenizer&subdirectory=kobert_hf'

Collecting mxnet
  Downloading mxnet-1.9.1-py3-none-manylinux2014_x86_64.whl (49.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.1/49.1 MB[0m [31m17.2 MB/s[0m eta [36m0:00:00[0m
Collecting graphviz<0.9.0,>=0.8.1 (from mxnet)
  Downloading graphviz-0.8.4-py2.py3-none-any.whl (16 kB)
Installing collected packages: graphviz, mxnet
  Attempting uninstall: graphviz
    Found existing installation: graphviz 0.20.1
    Uninstalling graphviz-0.20.1:
      Successfully uninstalled graphviz-0.20.1
Successfully installed graphviz-0.8.4 mxnet-1.9.1
Collecting gluonnlp==0.8.0
  Downloading gluonnlp-0.8.0.tar.gz (235 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m235.5/235.5 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: gluonnlp
  Building wheel for gluonnlp (setup.py) ... [?25l[?25hdone
  Created wheel for gluonnlp: filename=gluonnlp-0.8.0-py3-none-

### 필요한 라이브러리 임포트
- KoBERT & Transformers 관련 라이브러리
- 딥러닝 관련 라이브러리

In [2]:
# KoBERT + Transformers
from kobert_tokenizer import KoBERTTokenizer
from transformers import BertModel
from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup

# 딥러닝 관련
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm.notebook import tqdm, tqdm_notebook



### 토크나이저 & KoBERT 모델 & 설정값 준비
1. 토크나이저 + vocab 준비
2. 프리트레인된 모델 준비(KoBERT)
3. 다양한 설정값 준비

In [3]:
# 1. 토크나이저 + vocab 준비
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]')

# 2. 프리트레인된 모델 준비(KoBERT)
bertmodel = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)


# 3. 다양한 설정값 준비
# parameter 값 출처 : https://github.com/SKTBrain/KoBERT/blob/master/scripts/NSMC/naver_review_classifications_pytorch_kobert.ipynb
max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 5
max_grad_norm = 1
log_interval = 200
learning_rate =  5e-5
device = torch.device("cuda:0") #Colab의 GPU 활성화

tokenizer_config.json:   0%|          | 0.00/432 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/371k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/244 [00:00<?, ?B/s]

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'XLNetTokenizer'. 
The class this function is called from is 'KoBERTTokenizer'.


config.json:   0%|          | 0.00/535 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/369M [00:00<?, ?B/s]

## 토큰화 클래스
- 토크나이저와 vocab을 이용하여 문장 -> 토큰 시퀀스로 변경

In [4]:
class BERTSentenceTransform:
  """
  초기 설정
  - tokenizer:
      토크나이저 설정(BERTTokenizer)
  - max_seq_length: int
      토큰 시퀀스 최대 길이 설정
  - vocab:
      토큰화에 사용할 vocab 설정
  - pad : bool, default True
      토큰 시퀀스 최대 길이를 채우기 위한 패딩 토큰[PAD] 설정
  - pair : bool, default True
      단일 문장만을 처리할 것인지, 아니면 문장 쌍을 처리할 것인지를 설정
  """
  def __init__(self, tokenizer, max_seq_length, vocab, pad=True, pair=True):
      self._tokenizer = tokenizer
      self._max_seq_length = max_seq_length
      self._pad = pad
      self._pair = pair
      self._vocab = vocab

  """
  토큰 시퀀스 변환 과정
    - vocab을 활용하여 문장 -> 토큰 시퀀스로 변환
    - 토큰 시퀀스 앞뒤에 시작과 끝을 알리는 토큰 삽입([CLS], [SEP])
    - 토큰이 어디 문장에 속하는지 알아내기 위해 type id를 생성
    - 패딩 토큰[PAD]를 제외한 실제 의미를 가지는 토큰수를 생성

    예시1) 문장쌍 (text_a, text_b)
      Inputs:
          text_a: 'is this jacksonville ?'
          text_b: 'no it is not'
      Tokenization:
          text_a: 'is this jack ##son ##ville ?'
          text_b: 'no it is not .'
      Processed:
          tokens: '[CLS] is this jack ##son ##ville ? [SEP] no it is not . [SEP]'
          type_ids: 0     0  0    0    0     0       0 0     1  1  1  1   1 1
          valid_length: 14

    예시2) 단일 문장 (text_a)
      Inputs:
          text_a: 'the dog is hairy .'
      Tokenization:
          text_a: 'the dog is hairy .'
      Processed:
          text_a: '[CLS] the dog is hairy . [SEP]'
          type_ids: 0     0   0   0  0     0 0
          valid_length: 7

  파라미터
    line: 문자열 튜플
      만약 문장 쌍이라면, (text_a, text_b)
      만약 단일 문장이라면, (text_a)

  반환값
      input_ids: np.array, shape (batch_size, seq_length)
          문장에 대한 토큰 시퀀스
      valid_length: np.array, shape (batch_size,)
          실제 의미를 가지는 토큰수
      segment_ids: np.array, shape (batch_size, seq_length)
          토큰 시퀀스에 대한 type ids

  """
  def __call__(self, line):
      # convert to unicode
      text_a = line[0]
      if self._pair: #문장 쌍 처리
          assert len(line) == 2
          text_b = line[1]

      ## 첫번째 문장 토큰화 진행
      tokens_a = self._tokenizer.tokenize(text_a)

      ## 문장쌍인 경우, 두번째 문장 토큰화 진행
      tokens_b = None

      if self._pair:
          tokens_b = self._tokenizer(text_b)

      ## 최대 토큰 길이를 초과하지 않도록 처리
      ### 문장쌍의 경우 [CLS], [SEP], [SEP] 토큰을 고려하여 -3 처리
      if tokens_b:
          self._truncate_seq_pair(tokens_a, tokens_b,
                                  self._max_seq_length - 3)

      ### 단일 문장의 경우 [CLS] and [SEP] 토큰을 고려하여 -2 처리
      else:
          if len(tokens_a) > self._max_seq_length - 2:
              tokens_a = tokens_a[0:(self._max_seq_length - 2)]

      vocab = self._vocab
      tokens = []
      tokens.append(vocab.cls_token) #[CLS] 토큰 추가
      tokens.extend(tokens_a) # 첫 문장에 대한 토큰 추가
      tokens.append(vocab.sep_token) #[SEP] 토큰 추가
      segment_ids = [0] * len(tokens) #첫 문장에 속하는 토큰임을 알리는 type id 설정(0)

      if tokens_b:
          tokens.extend(tokens_b) # 두번째 문장에 대한 토큰 추가
          tokens.append(vocab.sep_token) #[SEP] 토큰 추가
          #두번째 문장에 속하는 토큰임을 알리는 type id 설정(1)
          segment_ids.extend([1] * (len(tokens) - len(segment_ids)))

      # 리턴값1: input_ids
      #   특수 토큰 + 패팅 토큰이 포함된 토큰 시퀀스
      input_ids = self._tokenizer.convert_tokens_to_ids(tokens)

      # 리턴값2: valid_length
      #  패딩 토큰[PAD]를 제외한 실제 의미를 가지는 토큰수
      valid_length = len(input_ids)

      # 최대 토큰 수를 만족하도록 패딩 토큰[PAD] 추가
      if self._pad:
          padding_length = self._max_seq_length - valid_length
          input_ids.extend([vocab[vocab.padding_token]] * padding_length)
          segment_ids.extend([0] * padding_length)

      return np.array(input_ids, dtype='int32'), np.array(valid_length, dtype='int32'),\
          np.array(segment_ids, dtype='int32')

## BERT 모델 입력 데이터 생성 클래스
- 데이터 셋을 BERT 모델에 맞는 입력 데이터 셋으로 변경하는 클래스
- 위에서 정의한 토큰화 클래스를 활용

In [5]:
class BERTDataset(Dataset):
    """
    초기 설정
    - dataset: 2차원 리스트
        모델의 입력에 사용될 데이터셋
    - sent_idx: int
        dataset에서 입력 문장에 해당하는 index
    - label_idx: int
        dataset에서 정답(라벨)에 해당하는 index
    - bert_tokenizer:
        토크나이저 설정(BERTTokenizer)
    - max_len: int
        토큰 시퀀스 최대 길이 설정
    - vocab:
        토큰화에 사용할 vocab 설정
    - pad : bool, default True
        토큰 시퀀스 최대 길이를 채우기 위한 패딩 토큰[PAD] 설정
    - pair : bool, default True
        단일 문장만을 처리할 것인지, 아니면 문장 쌍을 처리할 것인지를 설정
    """
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len, pad, pair):
        transform = BERTSentenceTransform(
            bert_tokenizer, max_seq_length=max_len, vocab=vocab, pad=pad, pair=pair)

        #dataset에서 입력 데이터 추출 -> 토큰화 진행
        self.sentences = [transform([i[sent_idx]]) for i in dataset]

        #dataset에서 정답 데이터(라벨) 추출
        self.labels = [np.int32(i[label_idx]) for i in dataset]
    """
    주어진 index에 해당하는 토큰 시퀀스와 라벨을 반환
    """
    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i],))
    """
    데이터셋의 총 길이 = 총 문장의 수를 반환
    """
    def __len__(self):
        return len(self.labels)


## 문서 분류 모델 클래스
- 프리트레인이 완료된 KoBERT 모델 위에 문서 분류용 태스크 모듈이 덧붙여진 형태의 모델 클래스 정의

In [6]:
class BERTClassifier(nn.Module):
    """
    초기 설정
    - bert:
        프리트레인이 완료된 KoBERT모델
    - hidden_size: int
        은닉층 크기 설정
    - num_classes: int
        분류 클래스 갯수 설정
    - dr_rate: float
        드롭 아웃 비율 설정
    - params:
        추가 매개 변수
    """
    def __init__(self, bert, hidden_size=768, num_classes=7, dr_rate=None, params=None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(hidden_size, num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)
    """
    어텐션 마스크 생성:
      - 토큰 시퀀스에서 실제 입력되는지 여부를 나타내는 것
      - 즉, 토큰 시퀀스에서 패딩 토큰[PAD]를 무시하는 역할 수행

    파라미터
      - token_ids: 토큰 시퀀스
      - valid_length: 실제 의미를 가지는 토큰수(길이)

    """
    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    """
    분류 진행 및 결과 반환

    처리 순서
    1. 어텐션 마스크 생성
    2. 임베딩 진행 :
        - 토큰 시퀀스 -> 각 토큰의 임베딩을 계산(토큰 -> 벡터 변환) -> 여러 레이어를 통과한 후 최종적으로 폴링된 벡터(pooler)를 반환
        - 폴링된 벡터: 전체 시퀀스를 요약한 표현으로, 문장 수준의 정보를 담고 있음
    3. 풀링된 벡터 드롭아웃 진행
    4. 풀링된 벡터를 classifier를 이용하여 분류 결과 반환
    """
    def forward(self, token_ids, valid_length, segment_ids):
        # 어텐션 마스크 생성
        attention_mask = self.gen_attention_mask(token_ids, valid_length)

        # 변환: 토큰 시퀀스 -> 풀링된 벡터
        _, pooler = self.bert(input_ids=token_ids, token_type_ids=segment_ids.long(), attention_mask=attention_mask.float().to(token_ids.device))

        # 풀링된 벡터 드롭아웃 진행
        if self.dr_rate:
            out = self.dropout(pooler)
        else:
            out = pooler

        # 풀링된 벡터를 이용해 분류 결과 반환
        return self.classifier(out)


## 저장된 분류 모델 불러오기

In [7]:
# 모델 불러오기
from google.colab import drive

# 구글 드라이브 마운트
drive.mount('/content/gdrive')

save_path = '/content/gdrive/MyDrive/7_emotion_model.pt'

loaded_model = BERTClassifier(bertmodel, dr_rate=0.5).to(device)
loaded_model.load_state_dict(torch.load(save_path))

Mounted at /content/gdrive


<All keys matched successfully>

In [8]:
# input = 분류하고자 하는 문장
def predict(predict_sentence):
    #print('\n----------------\n' + predict_sentence + '\n----------------\n')

    data = [predict_sentence, '0']
    dataset_another = [data]

    another_test = BERTDataset(dataset_another, 0, 1, tokenizer, vocab, max_len, True, False) # 토큰화한 문장
    test_dataloader = torch.utils.data.DataLoader(another_test, batch_size = batch_size, num_workers = 5) # torch 형식 변환

    loaded_model.eval()

    test_eval = []
    for batch_id, (token_ids, valid_length, segment_ids, label) in enumerate(test_dataloader):
        token_ids = token_ids.long().to(device)
        segment_ids = segment_ids.long().to(device)

        valid_length = valid_length
        label = label.long().to(device)

        out = loaded_model(token_ids, valid_length, segment_ids)


        for i in out:
            logits = i
            logits = logits.detach().cpu().numpy()
            if np.argmax(logits) == 0:
                test_eval.append("행복")
            elif np.argmax(logits) == 1:
                test_eval.append("공포")
            elif np.argmax(logits) == 2:
                test_eval.append("놀람")
            elif np.argmax(logits) == 3:
                test_eval.append("분노")
            elif np.argmax(logits) == 4:
                test_eval.append("슬픔")
            elif np.argmax(logits) == 5:
                test_eval.append("혐오")
            elif np.argmax(logits) == 6:
                test_eval.append("중립")

    return test_eval[0]


## 분류 진행

### 분류할 데이터셋 가져오기

In [9]:
import pandas as pd

In [10]:
# 파일 접두어 정의
file_prefixes = ["국가애도기간", "용어지침", "대통령사과", "경찰책임", "정부책임", "유가족협의회", "특별법"]

# 디렉토리 경로
directory_path = "/content/gdrive/MyDrive/3_processed_dataset/"

dfs=[]

# 각 파일 접두어에 대해 반복
for prefix in file_prefixes:
    # 파일 경로 정의
    file_path = directory_path + f"{prefix}_processed.xlsx"

    df = pd.read_excel(file_path)

    dfs.append(df)

In [11]:
for i, df in enumerate(dfs):
  print(f'{file_prefixes[i]} 감정분석 진행중...')
  df['Emotion'] = df['Comment'].apply(predict)

국가애도기간 감정분석 진행중...




용어지침 감정분석 진행중...




대통령사과 감정분석 진행중...




경찰책임 감정분석 진행중...




정부책임 감정분석 진행중...




유가족협의회 감정분석 진행중...




특별법 감정분석 진행중...




In [12]:
import os

# 결과를 저장할 디렉토리 생성
output_directory = "/content/gdrive/MyDrive/6_emotion/"
os.makedirs(output_directory, exist_ok=True)

# 각각의 통합된 데이터프레임을 Excel 파일로 저장
for prefix, df in zip(file_prefixes, dfs):
    output_file_path = f"{output_directory}{prefix}_emotion.xlsx"
    df.to_excel(output_file_path, index=False)
    print(f"{output_file_path}에 저장 완료")

/content/gdrive/MyDrive/6_emotion/국가애도기간_emotion.xlsx에 저장 완료
/content/gdrive/MyDrive/6_emotion/용어지침_emotion.xlsx에 저장 완료
/content/gdrive/MyDrive/6_emotion/대통령사과_emotion.xlsx에 저장 완료
/content/gdrive/MyDrive/6_emotion/경찰책임_emotion.xlsx에 저장 완료
/content/gdrive/MyDrive/6_emotion/정부책임_emotion.xlsx에 저장 완료
/content/gdrive/MyDrive/6_emotion/유가족협의회_emotion.xlsx에 저장 완료
/content/gdrive/MyDrive/6_emotion/특별법_emotion.xlsx에 저장 완료
