In [1]:
# Data 불러오기
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
cd /content/drive/MyDrive/Colab Notebooks/OTT_team_project

/content/drive/MyDrive/Colab Notebooks/OTT_team_project


## LDA 전처리 py파일 생성

In [6]:
cd ./utils/

/content/drive/MyDrive/Colab Notebooks/OTT_team_project/utils


In [None]:
%%writefile LDA_utils.py

## 전처리 필요 라이브러리
import re
import time
import datetime
import numpy as np
import pandas as pd

# Tokenize
# from pykospacing import Spacing # 띄어쓰기
from gensim import corpora # 단어 빈도수 계산 패키지
import pyLDAvis.gensim_models # LDA 시각화용 패키지?

# 한국어 형태소 분석기 중 성능이 가장 우수한 Mecab 사용
from konlpy.tag import *
mecab = Mecab()


## 토픽모델링 필요 라이브러리
import gensim
from gensim.corpora import Dictionary
from gensim.models import ldaseqmodel
from gensim.models import CoherenceModel
from gensim.models.callbacks import CoherenceMetric
from gensim.models.callbacks import PerplexityMetric

# 기타
from tqdm import tqdm
import warnings # 경고 메시지 무시
import matplotlib.pyplot as plt
warnings.filterwarnings(action='ignore')

import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)



## --------------- 데이터 전처리 --------------- ##
def date_extract(dataset, start_day:str, printing=False):
  """
  Input: OTT별 전체 데이터프레임
  Output: 최근 n년의 기간에 해당하는 데이터 추출, 결측처리 완료

  start_day : yyyy-mm-dd
  """

  dataset['at'] = pd.to_datetime(dataset['at'])
  dataset = dataset[dataset['at'] >= start_day]
  dataset = dataset.dropna(subset=['content'])
  dataset['year'] = dataset['at'].dt.year
  dataset = dataset.sort_values(by='at', ascending=True)
  
  if printing:
    for y in dataset['year'].unique():
      tmp = dataset[dataset['year'] == y]
      print(f'{y}: {len(tmp)} rows\n')
  return dataset


class Data_processing:
  '''
  LDA 적용을 위한 모든 전처리
  '''
  def __init__(self, dataset):
    self.dataset = dataset
    self.replace_list = pd.read_excel('./replace_list.xlsx')
    self.stopwords_list = list(pd.read_excel('./stopword_list.xlsx')['stopword'])

  def ko_language(self, text):
    # 한글 외 문자 제거 & 띄어쓰기 맞추기
    hangul = re.compile('[^가-힣 ]')
    result = hangul.sub('', text)
    return result

  def replace_word(self, text):
    # 단어 치환
    for i, word in enumerate(self.replace_list['before_replacement']):
      if word in text:
        result = re.sub(word, self.replace_list['after_replacement'][i], text)
        return result
    return text

  def tokenize(self, text):
    # 토큰화
    return mecab.nouns(text)

  def remove_stopwords(self, text, add_stopwords=None):
    # 불용어제거
    if add_stopwords:
      self.stopwords_list += add_stopwords
    result = [x for x in text if x not in self.stopwords_list]
    return result
  
  def select_review(self, data, min_token_n:int, max_token_n:int):
    # 특정 토큰 개수의 리뷰 선별
    remove_idx_list = []
    for i in range(len(data)):
      if min_token_n <= len(data.iloc[i]['review_prep']) <= max_token_n:
        continue
      else:
        remove_idx_list.append(i)

    return data.drop(remove_idx_list, axis=0)

  def get_token(self, add_stopwords=None, min_token_n:int=3, max_token_n:int=1000):
    self.dataset['review_prep'] = self.dataset['content'].apply(lambda x:self.ko_language(x))
    self.dataset = self.dataset.reset_index(drop=True)
    print('ko_language done..')
    self.dataset['review_prep'] = self.dataset['review_prep'].apply(lambda x:self.replace_word(x))
    self.dataset = self.dataset.reset_index(drop=True)
    print('replace_word done..')
    self.dataset['review_prep'] = self.dataset['review_prep'].apply(lambda x:self.tokenize(x))
    self.dataset = self.dataset.reset_index(drop=True)
    print('tokenize done..')
    self.dataset['review_prep'] = self.dataset['review_prep'].apply(lambda x:self.remove_stopwords(x, add_stopwords))
    self.dataset = self.dataset.reset_index(drop=True)
    print('remove_stopwords done..')
    self.dataset = self.select_review(self.dataset, min_token_n, max_token_n)
    return list(self.dataset['review_prep']), self.dataset



## ------------------ 모델링 ------------------ ##
class Model:
  '''
  LDA 모델

  no_below = 분석에 사용할 단어의 최소 빈도 수 제약 (ex) 2이면, 빈도가 최소 2이상 넘어간 단어만 취급)
  no_above = 전체의 몇 %로 이상 차지하는 단어를 필터링 할 것인지?
  '''
  def __init__(self, inputs, num_topics:int, no_below:int=2):
    self.dictionary = corpora.Dictionary(inputs)
    self.dictionary.filter_extremes(no_below=no_below)
    self.corpus = [self.dictionary.doc2bow(x) for x in inputs]
    self.inputs = inputs
    self.num_topics = num_topics

  def LDAseqmodel(self, timeslices, chunksize=2000, passes=10):
    '''
    CTM 함수
    '''

    self.model = ldaseqmodel.LdaSeqModel(
        corpus=self.corpus,
        id2word=self.dictionary,
        time_slice=timeslices,
        num_topics=self.num_topics,
        chunksize=chunksize,
        passes=passes)
    
  
  def LDA_model(self, chunksize=2000, passes=20, iterations=400, eval_every=None):
    '''
    num_topics: 생성될 토픽의 개수
    chunksize: 한번의 트레이닝에 처리될 문서의 개수
    passes: 딥러닝에서 Epoch와 같은 개념으로, 전체 corpus로 모델 학습 횟수 결정
    interations: 문서 당 반복 횟수
    '''
    temp = self.dictionary[0]
    id2word = self.dictionary.id2token

    self.model = LdaModel(
      corpus=self.corpus,
      id2word=id2word,
      chunksize=chunksize,
      alpha='auto',
      eta='auto',
      iterations=iterations,
      num_topics=self.num_topics,
      passes=passes,
      eval_every=eval_every)


  def print_topic_prop(self, topn=10, num_words=20):
    self.LDA_model()
    topics = self.model.print_topics(num_words=num_words)


    # 토픽별 포함 단어 추출
    topic_words = {}
    for idx, words in topics:
      topic_words[idx] = words.split('+')

    topic_table = pd.DataFrame(topic_words)
    topic_table.columns = [f'topic_{t+1}' for t in range(len(topics))]

    # coherence
    coherence_model_lda = CoherenceModel(model=self.model, texts=self.inputs, dictionary = self.dictionary, topn=10)
    coherence_lda = coherence_model_lda.get_coherence()
    print('LDA done..')

    return topic_table, coherence_lda, topics

Writing LDA_utils.py


## 분류모델 추론

In [14]:
%%writefile MODEL_utils.py

## 전처리 필요 라이브러리
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd
import re
import random
from tqdm import tqdm, tqdm_notebook

# 자칫하면 오류날 수도 있는 부분
import gluonnlp as nlp

# Model
from kobert_tokenizer import KoBERTTokenizer
from transformers import BertModel
from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup
from transformers import ElectraForSequenceClassification, ElectraTokenizer


seed = 2021
deterministic = True

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# GPU 있으면 할당
device = torch.device("cuda:0" if torch.cuda.is_available() else 'cpu')

######################### KoBERT #########################
bert = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]')

######################### KoELECTRA #########################
electramodel = ElectraForSequenceClassification.from_pretrained("monologg/koelectra-base-v3-discriminator")  # KoELECTRA-Small-v3
tokenizer_electra = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")


# KoBERT 모델 입력 전처리
class BERTSentenceTransform:
    def __init__(self, tokenizer, max_seq_length,vocab, pad=True, pair=True):
        self._tokenizer = tokenizer
        self._max_seq_length = max_seq_length
        self._pad = pad
        self._pair = pair
        self._vocab = vocab 

    def __call__(self, line):
        # convert to unicode
        text_a = line[0]
        if self._pair:
            assert len(line) == 2
            text_b = line[1]

        ##### 여기 수정!! #####
        tokens_a = self._tokenizer.tokenize(text_a)
        tokens_b = None

        if self._pair:
            tokens_b = self._tokenizer(text_b)

        if tokens_b:
            self._truncate_seq_pair(tokens_a, tokens_b,
                                    self._max_seq_length - 3)
        else:
            if len(tokens_a) > self._max_seq_length - 2:
                tokens_a = tokens_a[0:(self._max_seq_length - 2)]
        vocab = self._vocab
        tokens = []
        tokens.append(vocab.cls_token)
        tokens.extend(tokens_a)
        tokens.append(vocab.sep_token)
        segment_ids = [0] * len(tokens)

        if tokens_b:
            tokens.extend(tokens_b)
            tokens.append(vocab.sep_token)
            segment_ids.extend([1] * (len(tokens) - len(segment_ids)))

        input_ids = self._tokenizer.convert_tokens_to_ids(tokens)

        # The valid length of sentences. Only real  tokens are attended to.
        valid_length = len(input_ids)

        if self._pad:
            # Zero-pad up to the sequence length.
            padding_length = self._max_seq_length - valid_length
            # use padding tokens for the rest
            input_ids.extend([vocab[vocab.padding_token]] * padding_length)
            segment_ids.extend([0] * padding_length)

        return np.array(input_ids, dtype='int32'), np.array(valid_length, dtype='int32'),\
            np.array(segment_ids, dtype='int32')

class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len, pad, pair):
        transform = BERTSentenceTransform(bert_tokenizer, max_seq_length=max_len, vocab=vocab, pad=pad, pair=pair)

        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))


# KoBERT 모델 구조
class KoBERTClassifier(nn.Module):
  def __init__(self, bert, hidden_size=768, num_classes=2, dr_rate=None, params=None):
    super(KoBERTClassifier, self).__init__()
    self.bert = bert
    self.dr_rate = dr_rate

    self.classifier = nn.Linear(hidden_size, num_classes)
    if dr_rate:
      self.dropout = nn.Dropout(p=dr_rate)

  def forward(self, token_ids, valid_length, segment_ids, attention_mask):
    _, pooler = self.bert(input_ids=token_ids, token_type_ids=segment_ids.long(), attention_mask=attention_mask.float().to(token_ids.device))
    if self.dr_rate:
      out = self.dropout(pooler)
    out = self.classifier(out)
    return out


# KoBERT 추론
def Custom_KoBERT_Predict(test:str, max_len=146, dr_rate=0.3):
  if '"' in test:
    test = re.sub('"', '', test)

  test = [[test, '0']] # 입력 포맷

  test_data = BERTDataset(test, 0, 1, tokenizer, vocab, max_len, True, False)
  test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1)

  # model 불러오기
  model = KoBERTClassifier(bert, dr_rate=0.3).to(device)

  model_state_dict = torch.load("./models/KoBERT_state_dict.pt", map_location=device)
  model.load_state_dict(model_state_dict)

  def gen_attention_mask(token_ids, valid_length):
    attention_mask = torch.zeros_like(token_ids)
    for i, v in enumerate(valid_length):
      attention_mask[i][:v] = 1
    return attention_mask.float()

  # 테스트
  with torch.no_grad():
    model.eval()

    token_ids, valid_length, segment_ids, label = next(iter(test_dataloader))
    
    token_ids = token_ids.long().to(device)
    segment_ids = segment_ids.long().to(device)
    label = label.long().to(device)
    valid_length = valid_length

    attention_mask = gen_attention_mask(token_ids, valid_length)

    out = model(token_ids, valid_length, segment_ids, attention_mask)
    softmax_out = nn.functional.softmax(out, dim=1)
    pred_label = softmax_out.argmax(dim=1).item()

    return pred_label


######################### KoELECTRA #########################
# KoELECTRA 모델 입력 전처리
class ElectraClassificationDataset(Dataset):
  def __init__(self, test, tokenizer, max_len):
    self.input_data = list(test)
    self.tokenizer = tokenizer
    self.max_len = max_len

  def __len__(self):
    return len(self.input_data)

  def __getitem__(self, idx):
    inputs = self.tokenizer(self.input_data[idx], 
                            return_tensors='pt', 
                            truncation=True, 
                            max_length=self.max_len, 
                            padding='max_length',
                            add_special_tokens=True)

    return {
        'input_ids': inputs['input_ids'][0],
        'attention_mask': inputs['attention_mask'][0],
        'token_type_ids': inputs['token_type_ids'][0]
          }


# KoELECTRA 추론
def Custom_KoELECTRA_Predict(test:str, max_len=146, dr_rate=0.3):
  if '"' in test:
    test = re.sub('"', '', test)

  test_data = ElectraClassificationDataset(test, tokenizer_electra, max_len)
  test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1)

  # model 불러오기
  model = electramodel.to(device)

  model_state_dict = torch.load("./models/KoELECTRA_state_dict.pt", map_location=device)
  model.load_state_dict(model_state_dict)

  # 테스트
  with torch.no_grad():
    model.eval()

    data = next(iter(test_dataloader))
    token_ids = data['input_ids'].long().to(device)
    token_type_ids = data['token_type_ids'].long().to(device)
    attention_mask = data['attention_mask'].long().to(device)

    out = model(input_ids=token_ids, 
                token_type_ids=token_type_ids, 
                attention_mask=attention_mask)
    
    softmax_out = nn.functional.softmax(out.logits, dim=1)
    pred_label = softmax_out.argmax(dim=1).item()
    return pred_label

Overwriting MODEL_utils.py


In [None]:
## 전처리 필요 라이브러리
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd
import re
import random
from tqdm import tqdm, tqdm_notebook

# 자칫하면 오류날 수도 있는 부분
import gluonnlp as nlp

# Model
from kobert_tokenizer import KoBERTTokenizer
from transformers import BertModel
from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup
from transformers import ElectraForSequenceClassification, ElectraTokenizer


seed = 2021
deterministic = True

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# GPU 있으면 할당
device = torch.device("cuda:0" if torch.cuda.is_available() else 'cpu')

######################### KoBERT #########################
bert = BertModel.from_pretrained('skt/kobert-base-v1', return_dict=False)
tokenizer = KoBERTTokenizer.from_pretrained('skt/kobert-base-v1')
vocab = nlp.vocab.BERTVocab.from_sentencepiece(tokenizer.vocab_file, padding_token='[PAD]')

######################### KoELECTRA #########################
electramodel = ElectraForSequenceClassification.from_pretrained("monologg/koelectra-base-v3-discriminator")  # KoELECTRA-Small-v3
tokenizer_electra = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")


# KoBERT 모델 입력 전처리
class BERTSentenceTransform:
    def __init__(self, tokenizer, max_seq_length,vocab, pad=True, pair=True):
        self._tokenizer = tokenizer
        self._max_seq_length = max_seq_length
        self._pad = pad
        self._pair = pair
        self._vocab = vocab 

    def __call__(self, line):
        # convert to unicode
        text_a = line[0]
        if self._pair:
            assert len(line) == 2
            text_b = line[1]

        ##### 여기 수정!! #####
        tokens_a = self._tokenizer.tokenize(text_a)
        tokens_b = None

        if self._pair:
            tokens_b = self._tokenizer(text_b)

        if tokens_b:
            self._truncate_seq_pair(tokens_a, tokens_b,
                                    self._max_seq_length - 3)
        else:
            if len(tokens_a) > self._max_seq_length - 2:
                tokens_a = tokens_a[0:(self._max_seq_length - 2)]
        vocab = self._vocab
        tokens = []
        tokens.append(vocab.cls_token)
        tokens.extend(tokens_a)
        tokens.append(vocab.sep_token)
        segment_ids = [0] * len(tokens)

        if tokens_b:
            tokens.extend(tokens_b)
            tokens.append(vocab.sep_token)
            segment_ids.extend([1] * (len(tokens) - len(segment_ids)))

        input_ids = self._tokenizer.convert_tokens_to_ids(tokens)

        # The valid length of sentences. Only real  tokens are attended to.
        valid_length = len(input_ids)

        if self._pad:
            # Zero-pad up to the sequence length.
            padding_length = self._max_seq_length - valid_length
            # use padding tokens for the rest
            input_ids.extend([vocab[vocab.padding_token]] * padding_length)
            segment_ids.extend([0] * padding_length)

        return np.array(input_ids, dtype='int32'), np.array(valid_length, dtype='int32'),\
            np.array(segment_ids, dtype='int32')

class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len, pad, pair):
        transform = BERTSentenceTransform(bert_tokenizer, max_seq_length=max_len, vocab=vocab, pad=pad, pair=pair)

        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))

    def __len__(self):
        return (len(self.labels))


# KoBERT 모델 구조
class KoBERTClassifier(nn.Module):
  def __init__(self, bert, hidden_size=768, num_classes=2, dr_rate=None, params=None):
    super(KoBERTClassifier, self).__init__()
    self.bert = bert
    self.dr_rate = dr_rate

    self.classifier = nn.Linear(hidden_size, num_classes)
    if dr_rate:
      self.dropout = nn.Dropout(p=dr_rate)

  def forward(self, token_ids, valid_length, segment_ids, attention_mask):
    _, pooler = self.bert(input_ids=token_ids, token_type_ids=segment_ids.long(), attention_mask=attention_mask.float().to(token_ids.device))
    if self.dr_rate:
      out = self.dropout(pooler)
    out = self.classifier(out)
    return out


# KoBERT 추론
def Custom_KoBERT_Predict(test:str, max_len=146, dr_rate=0.3):
  if '"' in test:
    test = re.sub('"', '', test)

  test = [[test, '0']] # 입력 포맷

  test_data = BERTDataset(test, 0, 1, tokenizer, vocab, max_len, True, False)
  test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1)

  # model 불러오기
  model = KoBERTClassifier(bert, dr_rate=0.3).to(device)

  model_state_dict = torch.load("./models/KoBERT_state_dict.pt", map_location=device)
  model.load_state_dict(model_state_dict)

  def gen_attention_mask(token_ids, valid_length):
    attention_mask = torch.zeros_like(token_ids)
    for i, v in enumerate(valid_length):
      attention_mask[i][:v] = 1
    return attention_mask.float()

  # 테스트
  with torch.no_grad():
    model.eval()

    token_ids, valid_length, segment_ids, label = next(iter(test_dataloader))
    
    token_ids = token_ids.long().to(device)
    segment_ids = segment_ids.long().to(device)
    label = label.long().to(device)
    valid_length = valid_length

    attention_mask = gen_attention_mask(token_ids, valid_length)

    out = model(token_ids, valid_length, segment_ids, attention_mask)
    softmax_out = nn.functional.softmax(out, dim=1)
    pred_label = softmax_out.argmax(dim=1).item()

    return pred_label


######################### KoELECTRA #########################
# KoELECTRA 모델 입력 전처리
class ElectraClassificationDataset(Dataset):
  def __init__(self, test, tokenizer, max_len):
    self.input_data = list(test)
    self.tokenizer = tokenizer
    self.max_len = max_len

  def __len__(self):
    return len(self.input_data)

  def __getitem__(self, idx):
    inputs = self.tokenizer(self.input_data[idx], 
                            return_tensors='pt', 
                            truncation=True, 
                            max_length=self.max_len, 
                            padding='max_length',
                            add_special_tokens=True)

    return {
        'input_ids': inputs['input_ids'][0],
        'attention_mask': inputs['attention_mask'][0],
        'token_type_ids': inputs['token_type_ids'][0]
          }


# KoELECTRA 추론
def Custom_KoELECTRA_Predict(test:str, max_len=146, dr_rate=0.3):
  if '"' in test:
    test = re.sub('"', '', test)

  test_data = ElectraClassificationDataset(test, tokenizer_electra, max_len)
  test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1)

  # model 불러오기
  model = electramodel.to(device)

  model_state_dict = torch.load("./models/KoELECTRA_state_dict.pt", map_location=device)
  model.load_state_dict(model_state_dict)

  # 테스트
  with torch.no_grad():
    model.eval()

    data = next(iter(test_dataloader))
    token_ids = data['input_ids'].long().to(device)
    token_type_ids = data['token_type_ids'].long().to(device)
    attention_mask = data['attention_mask'].long().to(device)

    out = model(input_ids=token_ids, 
                token_type_ids=token_type_ids, 
                attention_mask=attention_mask)
    
    softmax_out = nn.functional.softmax(out.logits, dim=1)
    pred_label = softmax_out.argmax(dim=1).item()
    return pred_label