# Readme

Destription:

    Cointains some classes and functions for text cleaning and transformation.
    Meant for use as transformers in pipelines.

Funtions:

    lemmatize - Lemmatizer
    stemming - Porter Stemming 
    
Classes:

    CleanText - some basic text cleaning methods
    Embeddings - build embeddings, tranform text data into numeric representation
    FixLength - standardize length of the sentences (text input)
    Flatten - flatten the output
    Tag - finds part of the speech for every word in sentence
    SentiFeatures - Calculates sentiment for every word in sentence using sentiwordnet
    CustomBinarizer - wraper for LabelBinarizer


## Text cleaning

In [0]:
"""
Funtions:
    lemmatize - Lemmatizer
    stemming - Porter Stemming 
    
Class:
    CleanText - some basic text cleaning methods
"""

from sklearn.base import BaseEstimator, TransformerMixin
import re
import string
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize


def _penn_to_wn(tag):
    """
    Convert between the PennTreebank tags to simple Wordnet tags
    """
    if tag.startswith('J'):
        return wn.ADJ
    elif tag.startswith('N'):
        return wn.NOUN
    elif tag.startswith('R'):
        return wn.ADV
    elif tag.startswith('V'):
        return wn.VERB
    return None

def lemmatize( input_text, lemmatize_apply = 1):
    """
    Lemmatizer function
    input_text - text to lemmatize (sentence)
    """
    lemmatizer =  WordNetLemmatizer()
    input_text = word_tokenize(input_text)
    tagged_sentence = pos_tag(input_text)
    
    lemmatized_text = []
    for word, tag in tagged_sentence:       
        if lemmatize_apply == True:
            wn_tag = _penn_to_wn(tag)
            if not wn_tag:
                lemmatized_text.append(word)
            else:
                lemma = lemmatizer.lemmatize(word, pos=wn_tag)
                lemmatized_text.append(lemma)
    return lemmatized_text
    
def stemming(input_text):
    """
    Porter Stemming 
    input_text - text to stem (sentence)
    """
    porter = PorterStemmer()
    words = word_tokenize(input_text) 
    stemmed_words = [porter.stem(word) for word in words]
    return stemmed_words


class CleanText(BaseEstimator, TransformerMixin):
    """
    Some basic text cleaning methods
    """
    
    def __init__(self, stemming_apply = True, tokenize_apply = True, stopwords_apply = True):
        self.stemming_apply = stemming_apply
        self.tokenize_apply = tokenize_apply
        self.stopwords_apply = stopwords_apply
    
    def remove_mentions(self, input_text):
        return re.sub(r'@\w+', '', input_text)
    
    def remove_urls(self, input_text):
        return re.sub(r'http.?://[^\s]+[\s]?', '', input_text)
    
    def emoji_oneword(self, input_text):
        # By compressing the underscore, the emoji is kept as one word
        return input_text.replace('_','')
    
    def remove_punctuation(self, input_text):
        # Make translation table
        punct = '"#$%&\'()*+,-/:;<=>[\\]^_`{|}~'  #string.punctuation
        trantab = str.maketrans(punct, len(punct)*' ')  # Every punctuation symbol will be replaced by a space
        return input_text.translate(trantab)
    
    def remove_noisy_signs(self, input_text):
        return re.sub(r'[^\w !?.]', '', input_text)
    
    def remove_digits(self, input_text):
        return re.sub('\d+', '', input_text)
    
    def to_lower(self, input_text):
        return input_text.lower()
    
    def remove_stopwords(self, input_text):
        if self.stopwords_apply == True:
            stopwords_list = stopwords.words('english')
            # Some words which might indicate a certain sentiment are kept via a whitelist
            whitelist = ["n't", "not", "no"]
            words = input_text.split() 
            clean_words = [word for word in words if (word not in stopwords_list or word in whitelist) and len(word) > 1] 
            return " ".join(clean_words) 
        else:
            return input_text
    
    def stemming(self, input_text):
        if self.stemming_apply == True:
            porter = PorterStemmer()
            words = input_text.split() 
            stemmed_words = [porter.stem(word) for word in words]
            return " ".join(stemmed_words)
        else:
            return input_text
    
    def tokenize(self, input_text):
        if self.tokenize_apply:
            return word_tokenize(input_text)
        else:
            return input_text
    
    def fit(self, X, y=None, **fit_params):
        return self
    
    def transform(self, X, **transform_params):
        clean_X = X.apply(self.remove_mentions).apply(self.remove_urls).apply(self.to_lower).apply(self.remove_stopwords).apply(self.stemming).apply(self.tokenize)
        return clean_X

## Text transformation: Embeddings, FixLength, Flatten

In [2]:
"""
Classes:
    Embeddings - build embeddings, tranform text data into numeric representation
    FixLength - standardize length of the sentences (text input)
    Flatten - flatten the output
"""

from sklearn.base import BaseEstimator, TransformerMixin
import gensim


class embeddings(BaseEstimator, TransformerMixin):
    """
    Class to build embeddings, and tranform text data into numeric representation
    """ 

    def __init__(self, embeddings = None, emb_source = 'load', emb_type = 'each', emb_size = None):
        """
        build embeddings: own, or load from gensim, or build using gensim package
        embeddings - if we have already loaded embedding we can send it in init, 
            if there is None, depending on setting parameter emb_source embedding is load from
            gensim package or build using gensim package based on dataset in fit method
        emb_source - if paremeter 'embeddings' is not specified (=None) there is need to load embedding
            (set value to 'load') or build own embedding based on input dataset from fit method (value 'own')
        emb_type - method of transforming text date into numeric representation (embeddings) 
        value 'each' stand for making embedding representation for each word in sentence (requires equal sentence length)
        value 'avg' gives average embedding representation from all words in sentence
        """
        self.embeddings = embeddings
        self.emb_source = emb_source
        self.emb_type = emb_type
        self.emb_size = emb_size if embeddings is None else embeddings.vector_size
 
    def _build_embeddings_own(self, X_train, emb_size):
        
        if type(emb_size) == type(None):
            raise ValueError('Pass parameter emb_size in fit function (size/dim of embedding)')
            
        if type(X_train) == type(None):
            raise ValueError('Pass parameter X_train in fit function (dataset for training embeddings)')
        
        if self.embeddings == None:
            own_embeddings = gensim.models.Word2Vec(X_train
                     , min_count=1
                     , size = emb_size
                     , window=5
                     , workers=8
                     , sg = 1
#                      , hs = 0
#                      , negative = 10
                     , seed = 0)
        else:
            own_embeddings = self.embeddings
        own_embeddings.train(X_train, total_examples = own_embeddings.corpus_count, epochs = 20)
        self.source = 'own'
        self.emb_size = emb_size
        self.embeddings = own_embeddings
        
    def _build_embeddings_gensim_load(self, gensim_file_path,
                                     vocabulary = 500000):
        wv_embeddings = gensim.models.KeyedVectors.load_word2vec_format(gensim_file_path, binary = True, limit = vocabulary)
        self.emb_size = 300 # default
        self.embeddings = wv_embeddings
        self.emb_source = 'load'           
    
    def _sentence_to_vec(self, sentence_tokenized):
        """
        Takes a mean from words embeddings in sentence
        This guarantees fix dim
        """
        words = sentence_tokenized

        words_embeddings = [self.embeddings[word] for word in words if word in self.embeddings]
        if not words_embeddings:
            words_embeddings.append( np.zeros(self.emb_size) )

        return np.mean(words_embeddings, axis = 0)

    def _sentence_to_word_vec(self, sentence_tokenized):
        """
        Tranform list of words to array of embeddings
        For words not in embeddings we set embedding for word 'UNK' (undefined)
        """
        words = sentence_tokenized
        
        # embedding for undefined word
        if self.embeddings.wv.__contains__('UNK'):
            UNK = self.embeddings.wv.__getitem__('UNK')
        else:
            UNK = np.zeros(self.emb_size)
            
        words_embeddings = [self.embeddings[word] if self.embeddings.wv.__contains__(word) else UNK for word in words]
        if not words_embeddings:
            words_embeddings.append( np.zeros(self.emb_size) )

        return np.array(words_embeddings)
    
    def fit(self, X = None, y = None,  **fit_params):

        if self.embeddings is None:
            if self.emb_source == 'load':
                self._build_embeddings_gensim_load()
            elif self.emb_source == 'own':
                self._build_embeddings_own(X, self.emb_size)

        return self
    
    def transform(self, X, **transform_params):
        """
        X should be tokenized    
        X type - list of lists, sieries of lists/strings
        """        
        if self.emb_type == 'avg':
            X_trans = np.array( [self._sentence_to_vec(sentence) for sentence in X] )  
        elif self.emb_type == 'each':
            X_trans = np.array( [self._sentence_to_word_vec(sentence) for sentence in X] ) 
        else:
            print( 'ERROR function make_embedding() - unknown type of: ', emb_type )
            
        return X_trans
            

class FixLength(BaseEstimator, TransformerMixin):
    """standardize length of the sentences""" 
    
    def __init__(self, max_length = None, fill = '.'):
        """
        max_length (int) - if None, max length will be set to the longest sentance in data set
        fill (str) - mark which will be used to fill the missing words up to standardized length 
            of the sentence (if shorter)
        """
        self.max_length = max_length
        self.fill = fill

    def _standardize_len(self, sentence_tokenized ):
        sentence_tokenized = sentence_tokenized[0:self.max_length]       
        return sentence_tokenized + [self.fill] * (self.max_length - len(sentence_tokenized) )
      
    def fit(self, X, y = None,  **fit_params):
        """X should be tokenized"""
        if self.max_length is None:
            self.max_length = max(X.apply(len))
            
        return self

    def transform(self, X, **transform_params):
        return X.apply(self._standardize_len)  
    

class Flatten(BaseEstimator, TransformerMixin):
    """Flatten the output"""
    def fit(self, X, y = None,  **fit_params):
        return self
    
    def transform(self, X, **transform_params):
        return X.reshape(X.shape[0], -1)


## Tag, SentiFeatures

In [5]:
"""
Classes:
    Tag - finds part of the speech for every word in sentence
    SentiFeatures - Calculates sentiment for every word in sentence using sentiwordnet
"""

from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from nltk.corpus import sentiwordnet as swn
from nltk import sent_tokenize, word_tokenize, pos_tag

class Tag(BaseEstimator, TransformerMixin):
    """
    Find part of the speech for every word
    
    Text shouldn't be tokenized, each entry must have fix length, don't use 
    stopwords removal and stemming before
    """
    def __init__(self, only_tags = False):
        self.only_tags = only_tags
    
    def fit(self, X, y = None,  **fit_params):
        return self
    
    def transform(self, X, **transform_params):
        if self.only_tags:
            return  X.apply(pos_tag).apply(lambda tuples_list: [tuples[1] for tuples in tuples_list])
        else:
            return  X.apply(pos_tag)


class SentiFeatures(BaseEstimator, TransformerMixin):
    """
    Calculates sentiment for every word in sentence using sentiwordnet.
    Must be applied after Tag transformation (class Tag).
        
    only_pos_neg - calculate sentiment using only positive and negative feature
        (if False, use positive, negative, objective feature)
    """
    
    def __init__(self, only_pos_neg = False):
        self.lemmatizer =  WordNetLemmatizer()
        self.only_pos_neg = only_pos_neg
    
    def _penn_to_wn(self, tag):
        """Convert between the PennTreebank tags to simple Wordnet tags"""
        if tag.startswith('J'):
            return wn.ADJ
        elif tag.startswith('N'):
            return wn.NOUN
        elif tag.startswith('R'):
            return wn.ADV
        elif tag.startswith('V'):
            return wn.VERB
        return None
    
    def _word_sentiment(self, word, tag):
        """Calculates sentiments for single word, (pos, neg, obj)"""
        wn_tag = self._penn_to_wn(tag)
        default_return = tuple([0.0] * ((not self.only_pos_neg) + 2))
        
        if not wn_tag:
            return default_return

        lemma = self.lemmatizer.lemmatize(word, pos=wn_tag)
        if not lemma:
            return default_return

        synsets = wn.synsets(lemma, pos=wn_tag)
        if not synsets:
            return default_return
        
        SentiSynsets_list = list(map( lambda synset: swn.senti_synset(synset.name()), synsets))
        
        # return (pos, neg, obj) sentiment or just (pos, neg)
        if self.only_pos_neg:
            SentiSynsets_list = [ (SentiSynsets.pos_score(), SentiSynsets.neg_score()) 
                                 for SentiSynsets in SentiSynsets_list]            
        else:
            SentiSynsets_list = [ (SentiSynsets.pos_score(), SentiSynsets.neg_score(), SentiSynsets.obj_score()) 
                                 for SentiSynsets in SentiSynsets_list]
                                
        # because one word can have more than one meaning (so multiple sentiments)
        # we take average of pos, neg and obj sentiment across all meanings
        # return tuple( avg_pos, avg_neg, avg_obj)
        return tuple([np.mean(sent) for sent in zip(*SentiSynsets_list)])    
    
    def sentence_sentiment(self, tagged_sentence):
        """
        Calculates sentiments for each pair (word, tag) in sentence
        tagged_sentence - list of tuples (word, tag)
        """
        sentiments_list = [ self._word_sentiment(word, tag) for word, tag in tagged_sentence] 
              
        return np.array(sentiments_list)
    
    def fit(self, X, y = None,  **fit_params):
        return self
    
    def transform(self, X, y = None, **transform_params):
        return np.stack( X.apply(self.sentence_sentiment).values ) 

## LabelBinarizer

In [0]:
from sklearn.preprocessing import LabelBinarizer

class CustomBinarizer(BaseEstimator, TransformerMixin):
    """
    LabelBinarizer has problem to work by its own, 
    it needed to be wraped to work properly
    """
    
    def fit(self, X, y=None,**fit_params):
        self.lb = LabelBinarizer().fit(X)
        return self
    def transform(self, X):
        return self.lb.transform(X)