# GloVe Pytorch实现

日志参数设置

In [26]:
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

## 加载语料库

In [27]:
import spacy
from collections import defaultdict

class SpacyTokenizer:
    """ Tool for tokenize powered by spacy module
    """

    def __init__(self, lang: str, disable=['parser', 'tagger', 'ner']):
        """ Initialize the language type for token
        Args:
            lang (str): language type for tokenizer
        """
        self._nlp = spacy.load(lang)

    def tokenize(self, text: str) -> list:
        # we don't need new line as token
        lines = text.splitlines()

        doc = [[token.text for token
                in self._nlp.tokenizer(text.strip())] for text in lines]

        return doc

    
class Dictionary:
    """ Tool to build word2idx and doc2idx
    Args:
        doc {list}: list of documents contains words
    """

    def __init__(self, doc=None):

        self.vocab_size = 0
        self.word2idx = defaultdict(int)

        self.update(doc)

    def update(self, doc: list):
        """ Update word2idx information by doc
        Args:
            doc (list): list of words
        """

        if doc is None:
            return

        vocab_size, word2idx = self.vocab_size, self.word2idx

        # count word occurrance and vocab size
        tokens = set()
        for line in doc:
            tokens.update(line)

        for token in tokens:
            if token not in word2idx:
                word2idx[token] = vocab_size
                vocab_size += 1

        self.vocab_size = vocab_size

    def corpus(self, doc: list) -> list:
        """ Convert text of documents to idx of documents
        Args:
            doc (list): text of documents
        Returns:
            list: idx of documents
        """

        word2idx = self.word2idx
        corpus = [[word2idx[word] for word in line if word in word2idx]
                  for line in doc]
        return corpus


In [44]:
import torch
import pickle
import zipfile

def read_data(file_path, type='file'):
    """ Read data into a string
    Args:
        file_path (str): path for the data file
    """
    text = None
    if type is 'file':
        with open(file_path, mode='r', encoding='utf-8') as fp:
            text = fp.read()
    elif type is 'zip':
        with zipfile.ZipFile(file_path) as fp:
            text = fp.read(fp.namelist()[0]).decode()
    return text

def preprocess(file_path):
    """ Get corpus and vocab_size from raw text
    Args:
        file_path (str): raw file path
    Returns:
        corpus (list): list of idx words
        vocab_size (int): vocabulary size
    """

    # preprocess read raw text
    text = read_data(FILE_PATH, type='file')
    logging.info("read raw data")

    # init base model
    tokenizer = SpacyTokenizer(LANG)
    dictionary = Dictionary()

    # build corpus
    doc = tokenizer.tokenize(text)
    logging.info("after generate tokens from text")

    # save doc
    with open(DOC_PATH, mode='wb') as fp:
        pickle.dump(doc, fp)
    logging.info("tokenized documents saved!")
    
    # load doc
    with open(DOC_PATH, 'rb') as fp:
        doc = pickle.load(fp)

    dictionary.update(doc)
    logging.info("after generate dictionary")
    corpus = dictionary.corpus(doc)
    word2idx = dictionary.word2idx
    vocab_size = dictionary.vocab_size

    return corpus, vocab_size, word2idx

## 构建GloVe模型

In [36]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from collections import Counter, defaultdict


class GloVe(nn.Module):
    """Implement GloVe model with Pytorch
    """

    def __init__(self, embedding_size, context_size, vocab_size, min_occurrance=1, x_max=100, alpha=3 / 4):
        super(GloVe, self).__init__()

        self.embedding_size = embedding_size
        if isinstance(context_size, tuple):
            self.left_context, self.right_context = context_size
        if isinstance(context_size, int):
            self.left_context = self.right_context = context_size
        else:
            raise ValueError(
                "'context_size' should be an int or a tuple of two ints")
        
        self.vocab_size = vocab_size
        self.alpha = alpha
        self.min_occurrance = min_occurrance
        self.x_max = x_max

        self._focal_embeddings = nn.Embedding(
            vocab_size, embedding_size).type(torch.float64)
        self._context_embeddings = nn.Embedding(
            vocab_size, embedding_size).type(torch.float64)
        
        self._focal_biases = nn.Embedding(vocab_size, 1).type(torch.float64)
        self._context_biases = nn.Embedding(vocab_size, 1).type(torch.float64)
        self._glove_dataset = None

        for params in self.parameters():
            init.uniform_(params, a=-1, b=1)

    def fit(self, corpus):
        """get dictionary word list and co-occruence matrix from corpus
        Args:
            corpus (list): contain word id list
        Raises:
            ValueError: when count zero cocurrences will raise the problems
        """

        left_size, right_size = self.left_context, self.right_context
        vocab_size, min_occurrance = self.vocab_size, self.min_occurrance

        # get co-occurence count matrix X
        word_counts = Counter()
        cooccurence_counts = defaultdict(float)
        for region in corpus:
            word_counts.update(region)
            for left_context, word, right_context in _context_windows(region, left_size, right_size):
                for i, context_word in enumerate(left_context[::-1]):
                    # add (1 / distance from focal word) for this pair
                    cooccurence_counts[(word, context_word)] += 1 / (i + 1)
                for i, context_word in enumerate(right_context):
                    cooccurence_counts[(word, context_word)] += 1 / (i + 1)
        if len(cooccurence_counts) == 0:
            raise ValueError(
                "No coccurrences in corpus, Did you try to reuse a generator?")

        # get words bag information
        tokens = [word for word, count in
                  word_counts.most_common(vocab_size) if count >= min_occurrance]
        coocurrence_matrix = [(words[0], words[1], count)
                              for words, count in cooccurence_counts.items()
                              if words[0] in tokens and words[1] in tokens]
        self._glove_dataset = GloVeDataSet(coocurrence_matrix)

    def train(self, num_epoch, device, batch_size=512, learning_rate=0.05, loop_interval=10):
        """Training GloVe model
        Args:
            num_epoch (int): number of epoch
            device (str): cpu or gpu
            batch_size (int, optional): Defaults to 512.
            learning_rate (float, optional): Defaults to 0.05. learning rate for Adam optimizer
            batch_interval (int, optional): Defaults to 100. interval time to show average loss
        Raises:
            NotFitToCorpusError: if the model is not fit by corpus, the error will be raise
        """

        if self._glove_dataset is None:
            raise NotFitToCorpusError(
                "Please fit model with corpus before training")

        # basic training setting
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)
        glove_dataloader = DataLoader(self._glove_dataset, batch_size)
        total_loss = 0
        
        for epoch in range(num_epoch):
            for idx, batch in enumerate(glove_dataloader):
                optimizer.zero_grad()

                i_s, j_s, counts = batch
                i_s = i_s.to(device)
                j_s = j_s.to(device)
                counts = counts.to(device)
                loss = self._loss(i_s, j_s, counts)

                total_loss += loss.item()
                if idx % loop_interval == 0:
                    avg_loss = total_loss / loop_interval
                    print("epoch: {}, current step: {}, average loss: {}".format(
                        epoch, idx, avg_loss))
                    total_loss = 0

                loss.backward()
                optimizer.step()

        print("finish glove vector training")

    def get_coocurrance_matrix(self):
        """ Return co-occurance matrix for saving
        Returns:
            list: list itam (word_idx1, word_idx2, cooccurances)
        """

        return self._glove_dataset._coocurrence_matrix

    def embedding_for_tensor(self, tokens):
        if not torch.is_tensor(tokens):
            raise ValueError("the tokens must be pytorch tensor object")

        return self._focal_embeddings(tokens) + self._context_embeddings(tokens)

    def _loss(self, focal_input, context_input, coocurrence_count):
        x_max, alpha = self.x_max, self.alpha

        focal_embed = self._focal_embeddings(focal_input)
        context_embed = self._context_embeddings(context_input)
        focal_bias = self._focal_biases(focal_input)
        context_bias = self._context_biases(context_input)

        # count weight factor f(x)
        weight_factor = torch.pow(coocurrence_count / x_max, alpha)
        weight_factor[weight_factor > 1] = 1

        embedding_products = torch.sum(focal_embed * context_embed, dim=1)
        log_cooccurrences = torch.log(coocurrence_count)

        distance_expr = (embedding_products + focal_bias +
                         context_bias + log_cooccurrences) ** 2

        single_losses = weight_factor * distance_expr
        mean_loss = torch.mean(single_losses)
        return mean_loss


class GloVeDataSet(Dataset):

    def __init__(self, coocurrence_matrix):
        self._coocurrence_matrix = coocurrence_matrix

    def __getitem__(self, index):
        return self._coocurrence_matrix[index]

    def __len__(self):
        return len(self._coocurrence_matrix)

def _context_windows(region, left_size, right_size):
    """generate left_context, word, right_context tuples for each region
    Args:
        region (str): a sentence
        left_size (int): left windows size
        right_size (int): right windows size
    """

    for i, word in enumerate(region):
        start_index = i - left_size
        end_index = i + right_size
        left_context = _window(region, start_index, i - 1)
        right_context = _window(region, i + 1, end_index)
        yield (left_context, word, right_context)


def _window(region, start_index, end_index):
    """Returns the list of words starting from `start_index`, going to `end_index`
    taken from region. If `start_index` is a negative number, or if `end_index`
    is greater than the index of the last word in region, this function will pad
    its return value with `NULL_WORD`.
    Args:
        region (str): the sentence for extracting the token base on the context
        start_index (int): index for start step of window
        end_index (int): index for the end step of window
    """
    last_index = len(region) + 1
    selected_tokens = region[max(start_index, 0):
                             min(end_index, last_index) + 1]
    return selected_tokens

class NotTrainedError(Exception):
    pass


class NotFitToCorpusError(Exception):
    pass

## 训练

In [None]:
FILE_PATH = './bioCorpus_5000.txt'
MODLE_PATH = './glove.pkl'
DOC_PATH = './bioCorpus_5000.pickle'
COMATRIX_PATH = './comat.pickle'
LANG = 'en_core_web_sm'
EMBEDDING_SIZE = 128
CONTEXT_SIZE = 3
NUM_EPOCH = 50
BATHC_SIZE = 512
LEARNING_RATE = 0.01

corpus, vocab_size, word2idx = preprocess(FILE_PATH)

# specify device type
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# init vector model
logging.info("init model hyperparameter")
model = GloVe(EMBEDDING_SIZE, CONTEXT_SIZE, vocab_size)
model.to(device)

# fit corpus to count cooccurance matrix
model.fit(.weight.data.cpu().numpy())

cooccurance_matrix = model.get_coocurrance_matrix()
# saving cooccurance_matrix
with open(COMATRIX_PATH, mode='wb') as fp:
    pickle.dump(cooccurance_matrix, fp)

model.train(NUM_EPOCH, device, learning_rate=LEARNING_RATE)

# save model for evaluation
torch.save(model, MODLE_PATH)

## 测试

In [None]:
# load model
model = torch.load(MODLE_PATH)

for word, idx in word2idx.items():
    emd = model.embedding_for_tensor(torch.tensor(idx).to(device)).data.cpu()
#     print(word, emd, '\n')