# Build Fasttext embedding model

In [None]:
# Fasttext 모델 학습 (gensim)
!pip install gensim

In [None]:
!pip install soynlp

In [None]:
# 구글 코랩 환경에서 실행, 구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 구글 드라이브 경로 지정
path = '/content/drive/My Drive/Colab Notebooks/2020-PoscoICT/Data/'

In [None]:
import re
import soynlp

# Fasttext 임베딩 모델 생성

In [None]:
from typing import List, Callable
from gensim.models.fasttext import FastText


def make_fasttext_model(model_path: str = None,
                        corpus_path: str = None,
                        tokenizer: Callable = lambda x: x.split(),
                        window: int = 5,
                        embedding_dim: int = 300,
                        min_count: int = 10,
                        epochs: int = 50) -> FastText:
    """
    전처리된 데이터, 모델 저장 경로, 토크나이져 등을 입력으로 받아 Fasttext 모델을 저장하고 출력하는 함수
    :param model_path: FastText 모델 저장 경로
    :param corpus_path: 입력 텍스트 데이터 경로
    :param tokenizer: 임베딩 전 적용할 토크나이져, 기본값은 어절 단위로 토크나이징하는 함수(띄어쓰기를 기준으로 split)
    :param window: 컨텍스트 윈도우 사이즈
    :param embedding_dim: 임베딩 차원
    :param min_count: 최소 단어 빈도
    :param epochs: 학습 횟수
    :return: 학습된 fasttext 모델 (FastText 오브젝트)
    """
    print(f'Load data at {corpus_path}')
    with open(corpus_path, 'r', encoding='utf-8') as f:
        corpus = f.readlines()
    corpus_tokenized = [tokenizer(text) for text in corpus]
    
    print('Training FastText') 
    model = FastText(corpus_tokenized,
                     window=window,
                     size=embedding_dim,
                     min_count=min_count,
                     iter=epochs) 
    
    model.save(model_path) 
    print(f'Model saved at {model_path}')

    return model

In [None]:
from typing import List

# 참고 url: https://rachelsdotcom.tistory.com/79
def transform_char_into_jamo(char: str = None) -> str:
    """
    문자를 자모음 시퀀스로 변환하는 함수
    :param char: 자모음으로 분리할 문자
    :return: 분리된 자모음 시퀀스 스트링
    """
    BASE_CODE, CHOSUNG, JUNGSUNG = 44032, 588, 28
    CHOSUNG_LIST = ['ㄱ', 'ㄲ', 'ㄴ', 'ㄷ', 'ㄸ', 'ㄹ', 'ㅁ', 'ㅂ', 'ㅃ', 'ㅅ', 'ㅆ',
                    'ㅇ', 'ㅈ', 'ㅉ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']
    JUNGSUNG_LIST = ['ㅏ', 'ㅐ', 'ㅑ', 'ㅒ', 'ㅓ', 'ㅔ', 'ㅕ', 'ㅖ', 'ㅗ', 'ㅘ', 'ㅙ',
                     'ㅚ', 'ㅛ', 'ㅜ', 'ㅝ', 'ㅞ', 'ㅟ', 'ㅠ', 'ㅡ', 'ㅢ', 'ㅣ']
    JONGSUNG_LIST = [' ', 'ㄱ', 'ㄲ', 'ㄳ', 'ㄴ', 'ㄵ', 'ㄶ', 'ㄷ', 'ㄹ', 'ㄺ', 'ㄻ',
                     'ㄼ', 'ㄽ', 'ㄾ', 'ㄿ', 'ㅀ', 'ㅁ', 'ㅂ', 'ㅄ', 'ㅅ', 'ㅆ', 'ㅇ',
                     'ㅈ', 'ㅊ', 'ㅋ', 'ㅌ', 'ㅍ', 'ㅎ']

    # 파일 ->  ㅍㅏ#
    # 일 -> ㅇㅣㄹ
    jamo_token = ''
    if re.match('.*[ㄱ-ㅎㅏ-ㅣ가-힣]+.*', char) is not None:
        if ord(char) >= BASE_CODE:
            char_code = ord(char) - BASE_CODE
            cho_idx = int(char_code / CHOSUNG)
            jamo_token += CHOSUNG_LIST[cho_idx]

            jung_idx = int((char_code - (CHOSUNG * cho_idx)) / JUNGSUNG)
            jamo_token += JUNGSUNG_LIST[jung_idx]

            jong_idx = int((char_code - (CHOSUNG * cho_idx) -
                            (JUNGSUNG * jung_idx)))
            if jong_idx == 0:
                jamo_token += '#'
            else:
                jamo_token += JONGSUNG_LIST[jong_idx]
        else:
            jamo_token += char
    else:
        jamo_token += char
    return jamo_token


def tranform_token_into_jamo(token: str = None) -> str:
    """
    토큰(단어)를 자모음 시퀀스로 변환하는 함수
    :param char: 자모음으로 분리할 토큰(단어)
    :return: 분리된 자모음 시퀀스 스트링
    """
    jamo_token = ''
    for char in list(token):
        jamo_token += transform_char_into_jamo(char)
    return jamo_token

In [None]:
temp_sent = '오늘은 미세먼지가 있습니다'
tranform_token_into_jamo(temp_sent)

In [None]:
# 토크나이징: 어절 단위 epoch 50으로 수정할 것
model = make_fasttext_model(model_path=path + 'fasttext_space_300.model',
                            corpus_path=path + 'news_sentence_train.txt',
                            # tokenizer=mecab.morphs,
                            tokenizer=lambda x: x.split(),
                            window=5,
                            embedding_dim=300,
                            min_count=10,
                            epochs=5)

In [None]:
# 토크나이징: 어절/자모 단위 epoch 50으로 수정할 것
model = make_fasttext_model(model_path=path + 'fasttext_space_jamo_300.model',
                            corpus_path=path + 'news_sentence_train.txt',
                            tokenizer=lambda x: [tranform_token_into_jamo(t) for t in x.split()],
                            window=5,
                            embedding_dim=300,
                            min_count=10,
                            epochs=5)

# Fasttext 임베딩 모델 평가

In [None]:
# 자모음 모델 평가를 위한 함수
from typing import List, Tuple
import numpy as np
from gensim.models import FastText
from soynlp.hangle import compose, character_is_korean

def jamo_to_word(jamo: str = None) -> str:
    """
    분리된 자모음 시퀀스를 단어로 복원하는 함수
    :param jamo: 복원할 자모음 시퀀스
    :return 복원된 단어
    """
    jamo_list, idx = [], 0
    while idx < len(jamo):
        if not character_is_korean(jamo[idx]):
            jamo_list.append(jamo[idx])
            idx += 1
        else:
            jamo_list.append(jamo[idx:idx + 3])
            idx += 3
    word = ''
    for jamo_char in jamo_list:
        if len(jamo_char) == 1:
            word += jamo_char
        elif jamo_char[2] == "#":
            word += compose(jamo_char[0], jamo_char[1], ' ')
        else:
            word += compose(jamo_char[0], jamo_char[1], jamo_char[2])
    return word

def most_similar(model: FastText = None,
                 query: str = None,
                 topn: int = 10) -> List[Tuple]:
    """
    요청된 토큰에 대해 사전학습된 입력 모델의 토큰 벡터 중 가장 유사한 상위 n개 단어와 코사인 유사도 점수를 출력하는 함수
    :param model: 사전학습된 FastTest 모델
    :param query: 유사한 단어를 찾기 위한 요청 단어
    :param topn: 상위 몇개의 유사 토큰을 찾을지 지정
    """
    query_vec = model.wv.__getitem__(tranform_token_into_jamo(query))
    query_vec_norm = np.linalg.norm(query_vec)
    score_dict = {}
    for k in model.wv.vocab.keys():
        target_vec = model.wv.__getitem__(k)
        score_dict[k] = np.dot(query_vec, target_vec) / np.linalg.norm(query_vec) / np.linalg.norm(target_vec)
    #scores = np.dot(model.wv.vectors, query_vec)
    topn_candidates = sorted(score_dict.items(), key=lambda x: x[1], reverse=True)[1:topn+1]
    return [(jamo_to_word(word), score) for word, score in topn_candidates]

def similarity(model: FastText = None,
               query: str = None,
               target: str = None) -> float:
    """
    요청된 토큰과 대상 토큰 간의 코사인 유사도를 계산하는 함수
    :param model: 사전학습된 FastTest 모델
    :param query: 유사한 단어를 찾기 위한 요청 단어
    :param target: 유사한 단어를 찾기 위한 대상 단어
    """
    query_vec = model.wv.__getitem__(tranform_token_into_jamo(query))
    target_vec = model.wv.__getitem__(tranform_token_into_jamo(target))
    similarity = np.dot(query_vec, target_vec) / np.linalg.norm(query_vec) / np.linalg.norm(target_vec)
    return similarity

In [None]:
space_300_model = FastText.load(path + 'fasttext_space_300.model') 
space_jamo_300_model = FastText.load(path + 'fasttext_space_jamo_300.model')

In [None]:
space_300_model.wv.vocab.keys()

In [None]:
space_jamo_300_model.wv.vocab.keys()

In [None]:
from gensim.models import FastText

# 어절 단위 모델
print('어절 단위 모델')
print('-' * 60)
print('임베딩 차원: 300')
print('-' * 60)
# space_300_model = FastText.load(path + 'fasttext_space_300.model') 
print(space_300_model.wv.vectors.shape) 
print(space_300_model.wv.most_similar('청년', topn=5)) 
print('[어절 단위 모델] 청년과 청소년 간 유사도', space_300_model.wv.similarity('청년', '청소년'))
print(' ')


# 어절/자모 단위 모델
print('어절/자모 단위 모델')
print('-' * 60)
print('임베딩 차원: 300')
print('-' * 60)
# space_jamo_300_model = FastText.load(path + 'fasttext_space_jamo_300.model')
print(space_jamo_300_model.wv.vectors.shape) 
print(most_similar(model=space_jamo_300_model, query='청년', topn=5))
print('[어절/자모 단위 모델] 청년과 청소년 간 유사도', similarity(model=space_jamo_300_model, query='청년', target='청소년'))
print(' ')