[Reference](https://admantium.medium.com/nlp-text-vectorization-methods-from-scratch-ce3c5822c813)

In [1]:
import numpy as np
import re
from copy import deepcopy
from collections import Counter
from gensim import downloader
from nltk.corpus import stopwords
from nltk.corpus.reader.plaintext import PlaintextCorpusReader
from nltk.tokenize import sent_tokenize, word_tokenize
from sklearn.base import BaseEstimator, TransformerMixin
from time import time

class SciKitTransformer(BaseEstimator, TransformerMixin):
  def fit(self, X=None, y=None):
    return self
  def transform(self, X=None):
    return self

In [2]:
class WikipediaCorpus(PlaintextCorpusReader):
    def __init__(self, root_path):
        PlaintextCorpusReader.__init__(self, root_path, r'.*[0-9].txt')

    def filter(self, word):
        #only keep letters, numbers, and sentence delimiter
        word = re.sub('[\(\)\.,;:+\--"]', '', word)
        #remove multiple whitespace
        word = re.sub(r'\s+', '', word)
        if not word in stopwords.words("english"):
            return word.lower()
        return ''

    def vocab(self):
        return sorted(set([self.filter(word) for word in corpus.words()]))

    def max_words(self):
        max = 0
        for doc in self.fileids():
            l = len(self.words(doc))
            max = l if l > max else max
        return max

    def describe(self, fileids=None, categories=None):
        started = time()
        return {
            'files': len(self.fileids()),
            'paras': len(self.paras()),
            'sents': len(self.sents()),
            'words': len(self.words()),
            'vocab': len(self.vocab()),
            'max_words': self.max_words(),
            'time': time()-started
        }

In [4]:
corpus = WikipediaCorpus('ai_sentences')

print(corpus.fileids())
print(corpus.describe())
print(corpus.vocab())

class OneHotEncoder(SciKitTransformer):
    def __init__(self, vocab):
        self.vocab_dict = dict.fromkeys(vocab, 0.0)

    def one_hot_vector(self, tokens):
        vec_dict = deepcopy(self.vocab_dict)
        for token in tokens:
            if token in self.vocab_dict:
                vec_dict[token] = 1.0
        vec = [v for v in vec_dict.values()]
        return np.array(vec)

encoder = OneHotEncoder(corpus.vocab())

sent1 = [word for word in word_tokenize(corpus.raw('sent1.txt'))]
vec1 = encoder.one_hot_vector(sent1)

print(vec1)
print(vec1.shape)

sent2 = [word for word in word_tokenize(corpus.raw('sent2.txt'))]
vec2 = encoder.one_hot_vector(sent2)

print(vec2)
print(vec2.shape)

In [5]:
from collections import Counter

class CountEncoder(SciKitTransformer):
    def __init__(self, vocab):
        self.vocab = dict.fromkeys(vocab, 0.0)

    def count_vector(self, tokens):
        vec_dict = deepcopy(self.vocab)
        token_vec = Counter(tokens)
        doc_length = len(tokens)
        for token, count in token_vec.items():
            if token in self.vocab:
                vec_dict[token] = count/doc_length
        vec = [v for v in vec_dict.values()]
        return np.array(vec)

encoder = CountEncoder(corpus.vocab())

sent1 = [word for word in word_tokenize(corpus.raw('sent1.txt'))]
vec1 = encoder.count_vector(sent1)

print(vec1)
print(vec1.shape)

sent2 = [word for word in word_tokenize(corpus.raw('sent2.txt'))]
vec2 = encoder.count_vector(sent2)

print(vec2)
print(vec2.shape)

In [6]:
class TfIdfEncoder(SciKitTransformer):
    def __init__(self, doc_arr, vocab):
        self.doc_arr = doc_arr
        self.vocab = vocab
        self.word_frequency = self._word_frequency()

    def _word_frequency(self):
        word_frequency = dict.fromkeys(self.vocab, 0.0)
        for doc_name in self.doc_arr:
            doc_words = Counter([word for word in self.doc_arr[doc_name]])
            for word, _ in doc_words.items():
                if word in self.vocab:
                    word_frequency[word] += 1.0
        return word_frequency

    def TfIdf_vector(self, doc_name):
        if not doc_name in self.doc_arr:
            print(f'Document "{doc_name}" not found.')
            return
        number_of_docs = len(self.doc_arr)
        doc_len = len(self.doc_arr[doc_name])
        doc_words = Counter([word for word in self.doc_arr[doc_name]])
        TfIdf_vec = dict.fromkeys(self.vocab, 0.0)
        for word, word_count in doc_words.items():
            if word in self.vocab:
                tf = word_count/doc_len
                idf = np.log(number_of_docs/self.word_frequency[word])
                idf = 1 if idf == 0 else idf
                TfIdf_vec[word] = tf * idf
        vec = [v for v in TfIdf_vec.values()]
        return np.array(vec)

doc_list = [doc for doc in corpus.fileids()]
words_list = [corpus.words(doc) for doc in [doc for doc in corpus.fileids()]]
doc_arr = dict(zip(doc_list, words_list))

encoder = TfIdfEncoder(doc_arr, corpus.vocab())
vec1 = encoder.TfIdf_vector('sent1.txt')

print(vec1)
print(vec1.shape)
vec2 = encoder.TfIdf_vector('sent2.txt')

print(vec2)
print(vec2.shape)

In [7]:
wv = downloader.load('word2vec-google-news-300')

class Word2VecEncoder(SciKitTransformer):
    def __init__(self, vocab):
        self.vocab = vocab
        self.vector_lookup = downloader.load('word2vec-google-news-300')

    def word_vector(self, tokens):
        vec = np.array([])
        for token in tokens:
            if token in self.vocab:
                if token in self.vector_lookup:
                    print(f'Add {token}')
                    vec = np.append(self.vector_lookup[token], vec)
        return vec

encoder = Word2VecEncoder(corpus.vocab())

sent1 = [word for word in word_tokenize(corpus.raw('sent1.txt'))]
vec1 = encoder.word_vector(sent1)
print(vec1)
print(vec1.shape)

sent2 = [word for word in word_tokenize(corpus.raw('sent2.txt'))]
vec2 = encoder.word_vector(sent2)

print(vec2)
print(vec2.shape)