In [1]:
import nltk
import pandas as pd
from tqdm import tqdm, trange
import numpy as np
from sklearn.feature_extraction.stop_words import ENGLISH_STOP_WORDS
from os import listdir
from sklearn.naive_bayes import MultinomialNB
from gensim.models import Word2Vec
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
from sklearn.metrics import make_scorer, accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import cross_validate, KFold
tqdm.pandas()
flatten = lambda l: [item for sublist in l for item in sublist]

  from pandas import Panel


In [2]:
# nltk.download('movie_reviews')
# nltk.download('punkt')
# nltk.download('wordnet')

In [3]:
class Text_Processor :
    def __init__(self, path=None, test_percent=20) :
        if path==None :
            self.data = None
            return
        data = {'name':list(), 'txt':list(), 'lbl':list()}
        clasdic = {'pos':'+', 'neg':'-'}
        for clas in ['pos','neg'] :
            files = listdir(path+clas)
            tr = trange(len(files), leave=True)
            for i in tr :
                file = files[i]
                data['name'].append(files[i])
                data['txt'].append(open(path+clas+'/'+file).read())
                data['lbl'].append(clasdic[clas])
                tr.set_description(path+clas+'/'+file)
        self.data = pd.DataFrame({
            'name' : data['name'],
            'text' : data['txt'],
            'label' : data['lbl']
        })
        self.pos_idx = (self.data.label == clasdic['pos'])
        self.neg_idx = (self.data.label == clasdic['neg'])
        self.training_cols = list()
                
    def process_text(self, text) :
        stemmer = nltk.stem.PorterStemmer()
        lemmatizer = nltk.stem.WordNetLemmatizer()
        t = text
        t = t.strip()
        t = t.lower()
        for p,f in {"'s":'is',"'re":'are',"n't":'not',"'d":'had',"'m":'am',"'ve":'have'}.items() :
            t = t.replace(p,' '+f)
        for c in np.unique(list(t)).tolist() :
            if c.isalpha() :
                continue
            t = t.replace(c,' ')
        t = nltk.tokenize.word_tokenize(t)
        t = np.array(t)
        t = t[np.logical_or(~np.isin(t,list(ENGLISH_STOP_WORDS)), np.isin(t,['not','no']))]
        t = list(map(lemmatizer.lemmatize, t))
        n = np.isin(t,['not','no'])
        n = np.array([False]+n.tolist()[:-1])
        for i,w in enumerate(n) :
            if w :
                t[i] = 'not-'+t[i]
        return t
        
    def process(self) :
        self.data['clean_text'] = self.data.text.progress_apply(self.process_text)
        w,c = np.unique(flatten(self.data['clean_text']), return_counts=True)
        self.vocab = pd.DataFrame({'word':w, 'counts':c})
        self.vocab.set_index('word', inplace=True)
    
    @classmethod
    def copy(cls, other) :
        tp = cls()
        tp.data = other.data.copy()
        tp.vocab = other.vocab.copy()
        tp.pos_idx = other.pos_idx
        tp.neg_idx = other.neg_idx
        tp.training_cols = other.training_cols
        return tp

    def add_training_col(self, col) :
        if 'f_'+col in self.training_cols :
            return
        self.training_cols.append('f_'+col)
    
    def add_BOW(self, word) :
        self.data['f_'+word] = self.data.clean_text.apply(lambda x: (np.array(x)==word).sum())    
        
    def make_BOW(self, min_count=10, set_as_feature=False) :
        self.BOW_min_count = min_count
        list(map(self.add_BOW, tqdm(self.vocab[self.vocab.counts >= min_count].index)))
        if set_as_feature :
            list(map(self.add_training_col, tqdm(self.vocab[self.vocab.counts >= min_count].index)))
        
    def make_w2v(self, size=25, window=5, min_count=3) :
        self.w2vsize = size
        self.w2v_min_count = min_count
        self.w2v = Word2Vec(self.data.clean_text, size=size, window=window, min_count=min_count)
    
    def sen2vec(self, sen) :
        words = np.array(sen)
        words = words[self.vocab.counts[words] >= self.w2v_min_count]
        if words.shape[0] == 0 :
            return np.zeros(self.w2vsize)
        vec = self.w2v.wv[words].mean(axis=0)
        return vec
    
    def learn_goodnes(self, min_count=100, min_diff=50) :
        if min_count == None :
            min_count = self.BOW_min_count
        if min_count < self.BOW_min_count :
            print('goodnes min count can not be less than BOW min count:',self.BOW_min_count)
            return
        v = list()
        p = list()
        self.goodnes_vocab = set()
        for w in tqdm(self.w2v.wv.vocab) :
            if prtp.vocab.loc[w][0] < self.BOW_min_count :
                continue
            value = self.data['f_'+w][tp.pos_idx].sum() - self.data['f_'+w][tp.neg_idx].sum()
            if np.abs(value) < min_diff :
                continue
            v.append(tp.w2v.wv[w])
            p.append(value)
            self.goodnes_vocab.add(w)
        v = np.array(v)
        p = np.array(p)
        self.goodnes = LinearRegression()
        self.goodnes.fit(v, p)
        print("goodnes score :", self.goodnes.score(v,p))
        self.goodnes_default = p.mean()
    
    def how_good(self, word) :
        if word not in self.w2v.wv.vocab :
            return 'ne'
        return self.goodnes.predict([self.w2v.wv[word]])
    
    def how_good_sen(self, sen) :
        words = np.array(sen)
        words = np.array([w for w in words if w in self.goodnes_vocab])
        if words.shape[0] == 0 :
            return 0
        return self.goodnes.predict(self.w2v.wv[words]).mean()
    
    def set_goodnes_values(self, set_as_feature=False) :
        self.data['goodnes_value'] = \
            list(map(lambda x: self.how_good_sen(x).mean(), tqdm(self.data.clean_text)))
        if set_as_feature :
            self.add_training_col('f_goodnes_value')

In [None]:
prtp = Text_Processor('./movie_reviews/')
prtp.process()
prtp.make_BOW(min_count=30, set_as_feature=True)

./movie_reviews/pos/cv348_18176.txt: 100%|██████████| 1000/1000 [00:02<00:00, 341.58it/s]
./movie_reviews/neg/cv669_24318.txt: 100%|██████████| 1000/1000 [00:02<00:00, 356.49it/s]
100%|██████████| 2000/2000 [00:18<00:00, 108.43it/s]
  1%|          | 26/3663 [00:05<13:45,  4.41it/s]

In [None]:
estimator = MultinomialNB()
k_fold = KFold(n_splits=5, shuffle=True)
scoring = {'accuracy' : make_scorer(accuracy_score), 
           'precision' : make_scorer(precision_score,pos_label='+'),
           'recall' : make_scorer(recall_score,pos_label='+'), 
           'f1_score' : make_scorer(f1_score,pos_label='+')}
y = prtp.data.label.copy()
validate = cross_validate(estimator=estimator,
                X=prtp.data[prtp.training_cols]>0,
                y=y,
                cv=k_fold,
                scoring=scoring)
validate

In [None]:
for key,val in validate.items() :
    print(key,":\t",val.mean())

In [None]:
prtp.make_w2v()
prtp.learn_goodnes()
prtp.set_goodnes_values()

In [None]:
p = prtp.data['goodnes_value'][:1000]
n = prtp.data['goodnes_value'][1000:]
plt.hist(p,bins=100,label='Positive', color='g')
plt.hist(n,bins=100,label='Negative', color='r')
plt.ylabel('Count')
plt.xlabel('Goodnes Point')
plt.show()