<a href="https://colab.research.google.com/github/Kim-Yeon-Jun/PyTorch/blob/main/pytorch_5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade annoy



In [None]:
import torch
import torch.nn as nn
from annoy import AnnoyIndex
import numpy as np
class PreTrainedEmbeddings(object):
    """ 사전 훈련된 단어 벡터 사용을 위한 래퍼 클래스 """
    def __init__(self, word_to_index, word_vectors):
        """
        매개변수:
            word_to_index (dict): 단어에서 정수로 매핑
            word_vectors (numpy 배열의 리스트)
        """
        self.word_to_index = word_to_index
        self.word_vectors = word_vectors
        self.index_to_word = {v: k for k, v in self.word_to_index.items()}

        self.index = AnnoyIndex(len(word_vectors[0]), metric='euclidean')
        print("인덱스 만드는 중!")
        for _, i in self.word_to_index.items():
            self.index.add_item(i, self.word_vectors[i])
        self.index.build(50)
        print("완료!")

    @classmethod
    def from_embeddings_file(cls, embedding_file):
        """사전 훈련된 벡터 파일에서 객체를 만듭니다.

        벡터 파일은 다음과 같은 포맷입니다:
            word0 x0_0 x0_1 x0_2 x0_3 ... x0_N
            word1 x1_0 x1_1 x1_2 x1_3 ... x1_N

        매개변수:
            embedding_file (str): 파일 위치
        반환값:
            PretrainedEmbeddings의 인스턴스
        """
        word_to_index = {}
        word_vectors = []

        with open(embedding_file) as fp:
            for line in fp.readlines():
                line = line.split(" ")
                word = line[0]
                vec = np.array([float(x) for x in line[1:]])

                word_to_index[word] = len(word_to_index)
                word_vectors.append(vec)

        return cls(word_to_index, word_vectors)

In [None]:
#단어 임베딩을 사용한 유추 작업
import numpy as np
from annoy import AnnoyIndex

class PreTrainedEmbeddings(object):
    def __init__(self, word_to_index, word_vectors):
        """
        매개변수:
            word_to_index (dict): 단어에서 정수로 매핑
            word_vectors (numpy 배열의 리스트)
        """
        self.word_to_index = word_to_index
        self.word_vectors = word_vectors
        self.index_to_word = {v: k for k, v in self.word_to_index.items()}

        self.index = AnnoyIndex(len(word_vectors[0]), metric='euclidean')
        print("인덱스 만드는 중!")
        for _, i in self.word_to_index.items():
            self.index.add_item(i, self.word_vectors[i])
        self.index.build(50)
        print("완료!")

    @classmethod
    def from_embeddings_file(cls, embedding_file):
        """사전 훈련된 벡터 파일에서 객체를 만듭니다.

        벡터 파일은 다음과 같은 포맷입니다:
            word0 x0_0 x0_1 x0_2 x0_3 ... x0_N
            word1 x1_0 x1_1 x1_2 x1_3 ... x1_N

        매개변수:
            embedding_file (str): 파일 위치
        반환값:
            PretrainedEmbeddings의 인스턴스
        """
        word_to_index = {}
        word_vectors = []

        with open(embedding_file) as fp:
            for line in fp.readlines():
                line = line.split(" ")
                word = line[0]
                vec = np.array([float(x) for x in line[1:]])

                word_to_index[word] = len(word_to_index)
                word_vectors.append(vec)
    def get_embedding(self, word):
      """
          매개변수 :
            word(str)
          반환값 :
            임베딩 (numpy.ndarray)
      """
      return self.word_vectors[self.word_to_index[word]]

    def get_closest_to_vector(self, vector, n=1):
      """ 벡터가 주어지면 최근접 이웃을 n개 반환
      매개변수 :
        vector(np.ndarray) : Annoy 인덱스에 있는 벡터의 크기와 같아야 함
        n (int) : 반환될 이웃의 개수
        반환값 :
          [str, str, str ...] : 주어진 벡터와 가장 가까운 단어
              단어는 거리순으로 정렬되어있는 것은 아님.

      """
      nn_indices = self.index_get_nns_by_vector(vector, n)
      return [self.index_to_word[neighbor]
              for neighbor in nn_indices]

    def compute_and_print_analogy(self, word1, word2, word3):
      """ 단어 임베딩을 사용한 유추 결과를 출력

      word1이 word2 일 때 word3은 __입니다.
      이 메서드는 word1 : word2 :: word3 : word4 를 출력함

      매개변수 :
        word1(str)
        word2(str)
        word3(str)
      """
      vec1 = self.get_embedding(word1)
      vec2 = self.get_embedding(word2)
      vec3 = self.get_embedding(word3)

      #단순 가정 : 이 유추는 공간적 관계
      spatial_relationship = vec2 - vec1
      vec4 = vec3 + spatial_relationship

      closest_words = self.get_closest_to_vector(vec4, n=4)
      existing_words = set([word1,word2,word3])
      closest_words = [word for word in closest_words
                       if word not in existing_words]
      if len(closest_words) == 0 :
        print("계산된 벡터와 가장 가까운 이웃을 찾을 수 없습니다.")
        return
      for word4 in closest_words:
        print("{} : {} :: {} : {}".format(word1, word2, word3, word4))



# 예제 : CBOW 임베딩 학습

In [None]:
class Vocabulary(object):
    """ 매핑을 위해 텍스트를 처리하고 어휘 사전을 만드는 클래스 """

    def __init__(self, token_to_idx=None, mask_token="<MASK>", add_unk=True, unk_token="<UNK>"):
        """
        매개변수:
            token_to_idx (dict): 기존 토큰-인덱스 매핑 딕셔너리
            mask_token (str): Vocabulary에 추가할 MASK 토큰.
                모델 파라미터를 업데이트하는데 사용하지 않는 위치를 나타냅니다.
            add_unk (bool): UNK 토큰을 추가할지 지정하는 플래그
            unk_token (str): Vocabulary에 추가할 UNK 토큰
        """

        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token
                              for token, idx in self._token_to_idx.items()}

        self._add_unk = add_unk
        self._unk_token = unk_token
        self._mask_token = mask_token

        self.mask_index = self.add_token(self._mask_token)
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)

    def to_serializable(self):
        """ 직렬화할 수 있는 딕셔너리를 반환합니다 """
        return {'token_to_idx': self._token_to_idx,
                'add_unk': self._add_unk,
                'unk_token': self._unk_token,
                'mask_token': self._mask_token}

    @classmethod
    def from_serializable(cls, contents):
        """ 직렬화된 딕셔너리에서 Vocabulary 객체를 만듭니다 """
        return cls(**contents)

    def add_token(self, token):
        """ 토큰을 기반으로 매핑 딕셔너리를 업데이트합니다

        매개변수:
            token (str): Vocabulary에 추가할 토큰
        반환값:
            index (int): 토큰에 상응하는 정수
        """
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index

    def add_many(self, tokens):
        """ 토큰 리스트를 Vocabulary에 추가합니다.

        매개변수:
            tokens (list): 문자열 토큰 리스트
        반환값:
            indices (list): 토큰 리스트에 상응되는 인덱스 리스트
        """
        return [self.add_token(token) for token in tokens]

    def lookup_token(self, token):
        """ 토큰에 대응하는 인덱스를 추출합니다.
        토큰이 없으면 UNK 인덱스를 반환합니다.

        매개변수:
            token (str): 찾을 토큰
        반환값:
            index (int): 토큰에 해당하는 인덱스
        노트:
            UNK 토큰을 사용하려면 (Vocabulary에 추가하기 위해)
            `unk_index`가 0보다 커야 합니다.
        """
        if self.unk_index >= 0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        """ 인덱스에 해당하는 토큰을 반환합니다.

        매개변수:
            index (int): 찾을 인덱스
        반환값:
            token (str): 인텍스에 해당하는 토큰
        에러:
            KeyError: 인덱스가 Vocabulary에 없을 때 발생합니다.
        """
        if index not in self._idx_to_token:
            raise KeyError("the index (%d) is not in the Vocabulary" % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

In [None]:
class CBOWVectorizer(object):
    """ 어휘 사전을 생성하고 관리합니다 """
    def __init__(self, cbow_vocab):
        """
        매개변수:
            cbow_vocab (Vocabulary): 단어를 정수에 매핑합니다
        """
        self.cbow_vocab = cbow_vocab

    def vectorize(self, context, vector_length=-1):
        """
        매개변수:
            context (str): 공백으로 나누어진 단어 문자열
            vector_length (int): 인덱스 벡터의 길이 매개변수
        """

        indices = [self.cbow_vocab.lookup_token(token) for token in context.split(' ')]
        if vector_length < 0:
            vector_length = len(indices)

        out_vector = np.zeros(vector_length, dtype=np.int64)
        out_vector[:len(indices)] = indices
        out_vector[len(indices):] = self.cbow_vocab.mask_index

        return out_vector

    @classmethod
    def from_dataframe(cls, cbow_df):
        """데이터셋 데이터프레임에서 Vectorizer 객체를 만듭니다

        매개변수::
            cbow_df (pandas.DataFrame): 타깃 데이터셋
        반환값:
            CBOWVectorizer 객체
        """
        cbow_vocab = Vocabulary()
        for index, row in cbow_df.iterrows():
            for token in row.context.split(' '):
                cbow_vocab.add_token(token)
            cbow_vocab.add_token(row.target)

        return cls(cbow_vocab)

    @classmethod
    def from_serializable(cls, contents):
        cbow_vocab = \
            Vocabulary.from_serializable(contents['cbow_vocab'])
        return cls(cbow_vocab=cbow_vocab)

    def to_serializable(self):
        return {'cbow_vocab': self.cbow_vocab.to_serializable()}

In [None]:
#CBOW 작업을 위한 데이터셋 클래스
class CBOWDataset(Dataset):
    def __init__(self, cbow_df, vectorizer):
        """
        매개변수:
            cbow_df (pandas.DataFrame): 데이터셋
            vectorizer (CBOWVectorizer): 데이터셋에서 만든 CBOWVectorizer 객체
        """
        self.cbow_df = cbow_df
        self._vectorizer = vectorizer

        measure_len = lambda context: len(context.split(" "))
        self._max_seq_length = max(map(measure_len, cbow_df.context))

        self.train_df = self.cbow_df[self.cbow_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.cbow_df[self.cbow_df.split=='val']
        self.validation_size = len(self.val_df)

        self.test_df = self.cbow_df[self.cbow_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train')

    @classmethod
    def load_dataset_and_make_vectorizer(cls, cbow_csv):
        """데이터셋을 로드하고 처음부터 새로운 Vectorizer 만들기

        매개변수:
            cbow_csv (str): 데이터셋의 위치
        반환값:
            CBOWDataset의 인스턴스
        """
        cbow_df = pd.read_csv(cbow_csv)
        train_cbow_df = cbow_df[cbow_df.split=='train']
        return cls(cbow_df, CBOWVectorizer.from_dataframe(train_cbow_df))

    def __getitem__(self, index):
        """파이토치 데이터셋의 주요 진입 메서드

        매개변수:
            index (int): 데이터 포인트의 인덱스
        반환값:
            데이터 포인트의 특성(x_data)과 레이블(y_target)로 이루어진 딕셔너리
        """
        row = self._target_df.iloc[index]

        context_vector = \
            self._vectorizer.vectorize(row.context, self._max_seq_length)
        target_index = self._vectorizer.cbow_vocab.lookup_token(row.target)

        return {'x_data': context_vector,
                'y_target': target_index}

In [None]:
class CBOWClassifier(nn.Module): # Simplified cbow Model
    def __init__(self, vocabulary_size, embedding_size, padding_idx=0):
        """
        매개변수:
            vocabulary_size (int): 어휘 사전 크기, 임베딩 개수와 예측 벡터 크기를 결정합니다
            embedding_size (int): 임베딩 크기
            padding_idx (int): 기본값 0; 임베딩은 이 인덱스를 사용하지 않습니다
        """
        super(CBOWClassifier, self).__init__()

        self.embedding =  nn.Embedding(num_embeddings=vocabulary_size,
                                       embedding_dim=embedding_size,
                                       padding_idx=padding_idx)
        self.fc1 = nn.Linear(in_features=embedding_size,
                             out_features=vocabulary_size)

    def forward(self, x_in, apply_softmax=False):
        """분류기의 정방향 계산

        매개변수:
            x_in (torch.Tensor): 입력 데이터 텐서
                x_in.shape는 (batch, input_dim)입니다.
            apply_softmax (bool): 소프트맥스 활성화 함수를 위한 플래그
                크로스-엔트로피 손실을 사용하려면 False로 지정합니다
        반환값:
            결과 텐서. tensor.shape은 (batch, output_dim)입니다.
        """
        x_embedded_sum = F.dropout(self.embedding(x_in).sum(dim=1), 0.3)
        y_out = self.fc1(x_embedded_sum)

        if apply_softmax:
            y_out = F.softmax(y_out, dim=1)

        return y_out

In [None]:
#CBOW 훈련 매개변수
from argparse import Namespace
args = Namespace(
    # 날짜와 경로 정보
    cbow_csv="data/books/frankenstein_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch5/cbow",
    # 모델 하이퍼파라미터
    embedding_size=50,
    # 훈련 하이퍼파라미터
    seed=1337,
    num_epochs=100,
    learning_rate=0.0001,
    batch_size=32,
    early_stopping_criteria=5,
    # 실행 옵션
    cuda=True,
    catch_keyboard_interrupt=True,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True
)

# 예제 : 문서 분류에 사전 훈련된 임베딩을 사용한 전이학습
###### 전이학습 : 한 작업에서 훈련된 모델을 다른 작업의 초기 모델로 사용하는 방식

In [None]:
#NewsDataset.__getitem__() 메서드
class NewDataset(Dataset):
  @classmethod
  def load_dataset_and_make_vectorizer(cls, news_csv):
    """데이터셋을 로드하고 처음부터 새로운 Vectorizer 만들기
      매개변수 :
        news_csv (str) : 데이터셋의 위치
      반환값 :
        NewsDataset의 인스턴스
    """
    news_df = pd.read_csv(news_csv)
    train_news_df = news_df[news_df.split == 'train']
    return cls(news_df, NewsVectorizer.from_dataframe(train_news_df))

  def __getitem__(self, index):
    """파이토치 데이터셋의 주요 진입 메서드
      매개변수 :
        index(int) : 데이터 포인트의 인덱스

      반환값 :
        데이터 포인트의 특성(x_data)과 레이블(y_target)로 이루어진 딕셔너리
    """
    row = self.target_df.iloc[index]

    title_vector = \
      self._vectorizer.vectorize(row.title, self._max_seq_length)

    category_index = \
      self._vectorizer.category_vocab.lookup_token(row.category)

    return {'x_data' : title_vector,
            'y_target' : category_index}

# SequenceVocabulary
UNK 토큰 : 모델이 드물게 등장하는 단어에 대한 표현을 학습하도록 함\
MASK 토큰 : Embedding 층의 마스킹 역할을 수행하고 가변 길이의 시퀀스가 있을 때 손실계산을 도움\
BEGIN-OF-SEQUENCE와 END-OF-SEQUENCE 토큰 : 시퀀스 경계에 관한 힌트를 신경망에 제공

In [None]:
# AG뉴스 데이터셋을 위한 Vectorizer 구현
class NewsVectorizer(object):
  def vectorize(self, title, vector_length=-1):
    """
      매개변수 :
        title(str) : 공백으로 나누어진 단어 문자열
        vector_length(int) : 인덱스 벡터의 길이 매개변수

      반환값 :
        벡터로 변환된 제목(numpy.array)
    """
    indices = [self.title_vocab.begin_seq_index]
    indices.extend(self.title_vocab.lookup_token(token)
    for token in title.split(" "))
    indices.append(self.title_vocab.end_seq_index)
    if vector_length < 0 :
      vector_length = len(indices)
    out_vector = np.zeros(vector_length, dtype=np.int64)
    out_vector[:len(indices) = indices]
    out_vector[len(indices):] = self.title_vocab.mask_index

    return out_vector

  @classmethod
  def from_dataframe(cls, news_df, cutoff=25):
    """ 데이터셋 데이터프레임에서 Vectorizer 객체를 만듦

      매개변수 :
        news_df (pandas.DataFrame) : 타깃 데이터셋
        cutoff(int) : Vocabulary에 포함할 빈도 임계값
      반환값 :
        NewsVectorizer 객체
    """
    category_vocab = Vocabulary()
    for category in sorted(set(news_df.category)):
      category_vocab.add_token(category)
      word_counts = Counter()
    for title in news_df.title:
      for token in title.split(" "):
        word_coutns[token] += 1
    title_vocab = SequenceVocabulary()
    for word, word_count in word_counts.items():
      if word_count >= cutoff:
        title_vocab.add_token(word)
    return cls(title_vocab, category_vocab)

In [None]:
#어휘 사전에 기반하여 단어 임베딩의 부분 집합을 선택함
def load_glove_from_file(glove_filepath):
    """GloVe 임베딩 로드

    매개변수:
        glove_filepath (str): 임베딩 파일 경로
    반환값:
        word_to_index (dict), embeddings (numpy.ndarary)
    """

    word_to_index = {}
    embeddings = []
    with open(glove_filepath, "r") as fp:
        for index, line in enumerate(fp):
            line = line.split(" ") # each line: word num1 num2 ...
            word_to_index[line[0]] = index # word = line[0]
            embedding_i = np.array([float(val) for val in line[1:]])
            embeddings.append(embedding_i)
    return word_to_index, np.stack(embeddings)

def make_embedding_matrix(glove_filepath, words):
    """
    특정 단어 집합에 대한 임베딩 행렬을 만듭니다.

    매개변수:
        glove_filepath (str): 임베딩 파일 경로
        words (list): 단어 리스트
    """
    word_to_idx, glove_embeddings = load_glove_from_file(glove_filepath)
    embedding_size = glove_embeddings.shape[1]

    final_embeddings = np.zeros((len(words), embedding_size))

    for i, word in enumerate(words):
        if word in word_to_idx:
            final_embeddings[i, :] = glove_embeddings[word_to_idx[word]]
        else:
            embedding_i = torch.ones(1, embedding_size)
            torch.nn.init.xavier_uniform_(embedding_i)
            final_embeddings[i, :] = embedding_i

    return final_embeddings

In [None]:
class NewsClassifier(nn.Module):
    def __init__(self, embedding_size, num_embeddings, num_channels,
                 hidden_dim, num_classes, dropout_p,
                 pretrained_embeddings=None, padding_idx=0):
        """
        매개변수:
            embedding_size (int): 임베딩 벡터의 크기
            num_embeddings (int): 임베딩 벡터의 개수
            num_channels (int): 합성곱 커널 개수
            hidden_dim (int): 은닉 차원 크기
            num_classes (int): 클래스 개수
            dropout_p (float): 드롭아웃 확률
            pretrained_embeddings (numpy.array): 사전에 훈련된 단어 임베딩
                기본값은 None
            padding_idx (int): 패딩 인덱스
        """
        super(NewsClassifier, self).__init__()

        if pretrained_embeddings is None:

            self.emb = nn.Embedding(embedding_dim=embedding_size,
                                    num_embeddings=num_embeddings,
                                    padding_idx=padding_idx)
        else:
            pretrained_embeddings = torch.from_numpy(pretrained_embeddings).float()
            self.emb = nn.Embedding(embedding_dim=embedding_size,
                                    num_embeddings=num_embeddings,
                                    padding_idx=padding_idx,
                                    _weight=pretrained_embeddings)


        self.convnet = nn.Sequential(
            nn.Conv1d(in_channels=embedding_size,
                   out_channels=num_channels, kernel_size=3),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
                   kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
                   kernel_size=3, stride=2),
            nn.ELU(),
            nn.Conv1d(in_channels=num_channels, out_channels=num_channels,
                   kernel_size=3),
            nn.ELU()
        )

        self._dropout_p = dropout_p
        self.fc1 = nn.Linear(num_channels, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, num_classes)

    def forward(self, x_in, apply_softmax=False):
        """분류기의 정방향 계산

        매개변수:
            x_in (torch.Tensor): 입력 데이터 텐서
                x_in.shape는 (batch, dataset._max_seq_length)입니다.
            apply_softmax (bool): 소프트맥스 활성화 함수를 위한 플래그
                크로스-엔트로피 손실을 사용하려면 False로 지정합니다
        반환값:
            결과 텐서. tensor.shape은 (batch, num_classes)입니다.
        """

        # 임베딩을 적용하고 특성과 채널 차원을 바꿉니다
        x_embedded = self.emb(x_in).permute(0, 2, 1)

        features = self.convnet(x_embedded)

        # 평균 값을 계산하여 부가적인 차원을 제거합니다
        remaining_size = features.size(dim=2)
        features = F.avg_pool1d(features, remaining_size).squeeze(dim=2)
        features = F.dropout(features, p=self._dropout_p)

        # MLP 분류기
        intermediate_vector = F.relu(F.dropout(self.fc1(features), p=self._dropout_p))
        prediction_vector = self.fc2(intermediate_vector)

        if apply_softmax:
            prediction_vector = F.softmax(prediction_vector, dim=1)

        return prediction_vector

In [None]:
from argparse import Namespace
# 사전훈련된 임베딩을 사용하는 CNN NewsClassifier의 매개변수
args = Namespace(
    # 날짜와 경로 정보
    news_csv="data/ag_news/news_with_splits.csv",
    vectorizer_file="vectorizer.json",
    model_state_file="model.pth",
    save_dir="model_storage/ch5/document_classification",
    # 모델 하이퍼파라미터
    glove_filepath='data/glove/glove.6B.100d.txt',
    use_glove=False,
    embedding_size=100,
    hidden_dim=100,
    num_channels=100,
    # 훈련 하이퍼파라미터
    seed=1337,
    learning_rate=0.001,
    dropout_p=0.1,
    batch_size=128,
    num_epochs=100,
    early_stopping_criteria=5,
    # 실행 옵션
    cuda=True,
    catch_keyboard_interrupt=True,
    reload_from_files=False,
    expand_filepaths_to_save_dir=True
)

In [None]:
#훈련된 모델로 예측하기
def predict_category(title, classifier, vectorizer, max_length):
    """뉴스 제목을 기반으로 카테고리를 예측합니다

    매개변수:
        title (str): 원시 제목 문자열
        classifier (NewsClassifier): 훈련된 분류기 객체
        vectorizer (NewsVectorizer): 해당 Vectorizer
        max_length (int): 최대 시퀀스 길이
            노트: CNN은 입력 텐서 크기에 민감합니다.
                 훈련 데이터처럼 동일한 크기를 갖도록 만듭니다.
    """
    title = preprocess_text(title)
    vectorized_title = \
        torch.tensor(vectorizer.vectorize(title, vector_length=max_length))
    result = classifier(vectorized_title.unsqueeze(0), apply_softmax=True)
    probability_values, indices = result.max(dim=1)
    predicted_category = vectorizer.category_vocab.lookup_index(indices.item())

    return {'category': predicted_category,
            'probability': probability_values.item()}