In [None]:
from IPython.core.display import HTML
display(HTML(filename="./Static/Helpfunction.html"))

In [4]:
import re # import "re" function
import nltk # import nltk library

from nltk.corpus import stopwords
nltk.download('stopwords')
from nltk.probability import FreqDist
from nltk.util import ngrams
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
from sklearn.feature_selection import chi2, SelectKBest, f_classif
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import roc_curve, auc 

import spacy                    #import spacy module

import en_core_web_sm
nlp = en_core_web_sm.load()

from contractions import CONTRACTION_MAP

[nltk_data] Downloading package stopwords to c:\users\dapeng\appdata\l
[nltk_data]     ocal\programs\python\python37\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


ModuleNotFoundError: No module named 'contractions'

In [None]:
def remove_special_characters(text, remove_digits=False):
    pattern = r'[^a-zA-Z0-9\s]' if not remove_digits else r'[^a-zA-z\s]'
    text = re.sub(pattern, '', text)
    return text

def remove_stopwords(text, is_lower_case=False):
    tokens = tokenizer.tokenize(text)
    tokens = [token.strip() for token in tokens]
    if is_lower_case: 
        filtered_tokens = [token for token in tokens if token not in stopword_list]
    else:
        filtered_tokens = [token for token in tokens if token.lower() not in stopword_list]
  
    return ' '.join(filtered_tokens)  

def simple_stemmer(text):
    ps = nltk.porter.PorterStemmer() 
    
    return ' '.join([ps.stem(word) for word in text.split()]) 
   
def lemmatize_text(text):
    s = " "
    t_l = []
    t_w = nltk.word_tokenize(text) 
    for w in t_w:
        l_w = wordnet_lemmatizer.lemmatize(w, pos="v")
        t_l.append(l_w)
        
    return s.join(t_l)  

def expand_contractions(text):
    
    contractions_pattern = re.compile('({})'.format('|'.join(CONTRACTION_MAP.keys())), 
                                      flags=re.IGNORECASE|re.DOTALL)
    def expand_match(contraction):
        match = contraction.group(0)
        first_char = match[0]
        expanded_contraction = CONTRACTION_MAP.get(match)\
                                if CONTRACTION_MAP.get(match)\
                                else CONTRACTION_MAP.get(match.lower())                       
        expanded_contraction = first_char+expanded_contraction[1:]
        return expanded_contraction
        
    expanded_text = contractions_pattern.sub(expand_match, text)

    return re.sub("'", "", expanded_text)

In [None]:
def freq_ngram(document, N = 1, allgram = False): 
    
    agg_words = ' '.join([text for text in document]) 
    tok_list = agg_words.split() 
    ngram_tok_list = [' '.join(toks) for toks in ngrams(tok_list, N)]
    if allgram and N>1:
        ngram = [ngrams(tok_list, i) for i in range(1,N)]
        ngram_tok_list.extend(' '.join(toks) for ng in ngram for toks in ng)
    fdist = nltk.FreqDist(ngram_tok_list) 
    words_df = pd.DataFrame({'word':list(fdist.keys()), 
                             'count':np.array(list(fdist.values())), 
                             'frequency': np.array(list(fdist.values()))/sum(fdist.values())}) 
    return words_df

    
def viz_ngram_freq(df, figsize = (8,8)):
    plt.figure(figsize = figsize) 
    ax = sns.barplot(data=df, x= "count", y = "word") 
    ax.set(ylabel = 'Word') 
    plt.show()   

In [None]:
def create_bow_matrix(document, tfidf = True):
    
    vectorizer = TfidfVectorizer(document) if tfidf else CountVectorizer(document)
    bow_matrix = vectorizer.fit_transform(document)
    df = pd.DataFrame(bow_matrix.toarray(), columns = vectorizer.get_feature_names())
    
    return df

def get_text_similarity(doc_a, doc_b, method = "cosine"):
    
    if isinstance(doc_a, str):
        doc_a = [doc_a]
    if isinstance(doc_b, str):
        doc_b = [doc_b]
    doc = doc_a + doc_b
    bow_matrix = create_bow_matrix(doc)
    sim_matrix = cosine_similarity(bow_matrix) if method == "cosine" else euclidean_distances(bow_matrix)
    
    return pd.DataFrame(sim_matrix)
    

In [20]:
def checkdoc(f):
    def wrapper(*args):
        if not args[0].nlpdoc:
            raise ValueError(("""
            The document is empty. 
            You should use start_extract() to initiate the extracter before you use this function
            """))
        return f(*args)
    return wrapper
    
class ExtactInfo:
    ''' ExtractInfo class'''
    
    def __init__(self):
        self.nlpdoc = []

    def start_extract(self, document):
        
        if isinstance(document, str):
            document = [document]        
        
        self.nlpdoc = list(nlp.pipe(document))
    
    @checkdoc
    def get_postag(self, postagtype = "univ"):

        pos_tag = []
        if postagtype == "univ":
            for doc in self.nlpdoc:
                pos_tag.append(dict(Counter([tok.pos_ for tok in doc])))
        else:
            for doc in self.nlpdoc:
                pos_tag.append(dict(Counter([tok.tag_ for tok in doc])))            
        
        return pd.DataFrame(pos_tag, dtype='Int64').fillna(0)
    
    @checkdoc
    def get_noun_phrase(self):
        return [list(doc.noun_chunks) for doc in self.nlpdoc] 
    

In [46]:
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score


HYPERPARAM = {
   'logistic':{
    'logistic__penalty': ['l2'],
    'logistic__C': [0.001, 0.01, 0.1, 1, 10, 100],
    'logistic__class_weight': ['balanced'],
    'logistic__n_jobs': [-1]
    },
    "kn":{
      'kn__n_neighbors':[5,11,19],
      'kn__weights':['uniform', 'distance'],
      'kn__algorithm':['ball_tree'], #'auto','kd_tree','brute'
      'kn__n_jobs':[-1]
    }
} 
SCORE_FUNC = {
    "chi2": chi2,
    "Fvalue": f_classif
}
MODEL_METHOD = {
    'kn':KNeighborsClassifier(),
    'logistic':LogisticRegression()
}
def checkmodel(f):
    def wrapper(*args):
        if not args[0].select_model:
            raise ValueError(("""
            The model has not been trained. You need to fit_model first with training dataset before prediction
            """))
        return f(*args)
    return wrapper

class classifier():

    def __init__(self):
        self.x_train = None
        self.x_test = None
        self.y_train = None
        self.y_test = None
        self.y_pred = None
        self.method = None
        self.select_model = None
        self.feature_set = []
        self.fine_tune_res = None
        
    def fit_model(self, x_train, y_train, 
                  method = "kn", 
                  score_func = 'chi2', 
                  see_tuning = False,
                  feature_list = []):
        ''' Fit model with training data, identified method and score function; 
            This include select feature and parameter tuning
        '''
        if method not in MODEL_METHOD.keys():
            raise ValueError("""
            Choose method from 'kn' or 'logistic'
            """)
        if score_func not in SCORE_FUNC.keys():
            raise ValueError("""
            Choose score function from 'chi2' or 'Fvalue'
            """)

        self.x_train = x_train.copy()
        self.y_train = y_train.copy()
        self.method = method

        (n_samples, n_features) = x_train.shape
        
        feat_selector = SelectKBest(SCORE_FUNC.get(score_func), min(n_samples-1, n_features))    # Feature length is better off less than sample size
        
        clf = MODEL_METHOD[method]
        
        pipe = Pipeline(steps=[('selector', feat_selector), 
                                 (method, clf)])   # clf should have the same name as HYPERPARAM keys
        
        if not HYPERPARAM:
            raise ValueError("Parameters are required to finetune")
        grid_search = GridSearchCV(pipe, 
                                   param_grid=HYPERPARAM[method], 
                                   verbose = 1)
        grid_search.fit(x_train, y_train)
        self.select_model = grid_search.best_estimator_
        self.fine_tune_res = pd.DataFrame(grid_search.cv_results_).sort_values(by='rank_test_score')
        if feature_list:
            self.feature_set = features_list[feat_selector.get_support()]
        if see_tuning:
            import pprint
            print(self.fine_tune_res)
            print("Best score: %0.3f" % grid_search.best_score_)
            pprint.pprint("Best parameters set:", grid_search.best_estimator_.get_params())
                   
    
    @checkmodel
    def predict(self, x_test, y_test):
        '''
        predict and evaluate score
        
        '''
        self.y_pred = self.select_model.predict(x_test)
        score = accuracy_score(y_test, self.y_pred)
        
        print("Prediction accuracy is {}".format(score))
        
        return self.y_pred
        