Lexical normalization pipeline 

author - AR Dirkson 
date - 2-10-2018

This pipeline takes raw text data and performs: 
- Removes URLs, email addresses and personal pronouns
- Convert to lower-case
- Tokenization with NLTK 
- British English to American English 
- Normalization of generic abbreviations and slang 
- Normalization of domain-specific (patient forum) abbreviations 
- Spelling correction 


All the relevant objects should be in obj_lex

In [196]:
import pandas as pd 
import pickle 
import re
import numpy as np 
from collections import Counter, defaultdict
from nltk import word_tokenize, pos_tag
import editdistance
import csv 
from sklearn.metrics import f1_score
import numpy as np
import scipy.stats 
from nltk.corpus import names


In [251]:
class Normalizer (): 
        
    def __init__(self): 
        pass
        
    #to use this function the files need to be sorted in the same folder as the script under /obj_lex/
    def load_obj(self, name):
        with open('obj_lex/' + name + '.pkl', 'rb') as f:
            return pickle.load(f, encoding='latin1')
        
    def load_files(self): 
        self.ext_vocab2 = self.load_obj('vocabulary_spelling_unique')
        self.abbr_dict = self.load_obj ('abbreviations_dict')
        self.celex_freq_dict = self.load_obj ('celex_lwrd_frequencies')
        self.celex_list = list(celex_freq_dict.keys())
        self.celex_set = set (celex_list)
        self.drug_norm_dict = self.load_obj ('drug_normalize_dict')

    def change_tup_to_list(self, tup): 
        thelist = list(tup)
        return thelist
    
    def change_list_to_tup(self,thelist): 
        tup = tuple(thelist)
        return tup
    
#---------Remove URls, email addresses and personal pronouns ------------------
        
    def replace_urls(self,list_of_msgs): 
        list_of_msgs2 = []
        for msg in list_of_msgs: 
            nw_msg = re.sub(
        r'\b' + r'((\(<{0,1}https|\(<{0,1}http|\[<{0,1}https|\[<{0,1}http|<{0,1}https|<{0,1}http)(:|;| |: )\/\/|www.)[\w\.\/#\?\=\+\;\,\&\%_\n-]+(\.[a-z]{2,4}\]{0,1}\){0,1}|\.html\]{0,1}\){0,1}|\/[\w\.\?\=#\+\;\,\&\%_-]+|[\w\/\.\?\=#\+\;\,\&\%_-]+|[0-9]+#m[0-9]+)+(\n|\b|\s|\/|\]|\)|>)',
        '-URL-', msg)
            list_of_msgs2.append(nw_msg)
        return list_of_msgs2    

    def replace_email(self,list_of_msgs): 
        list_of_msgs2 = []
        for msg in list_of_msgs: 
            nw_msg = re.sub (r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)", '-EMAIL-', msg) 
            list_of_msgs2.append(nw_msg)
        return list_of_msgs2

    def remove_empty (self,list_of_msgs): 
        empty = []
        check_msgs3 =[]
        for a, i in enumerate (list_of_msgs): 
            if len(i) == 0: 
                print('empty')
            else: 
                check_msgs3.append(i)
        return check_msgs3
    

    def create_names_list (self): 
        male_names = names.words('male.txt')
        female_names = names.words('female.txt')
        male_set = set (male_names)
        female_set = set (female_names)
        names_set = male_set.union(female_set) 
        names_list = [] 
        for word in names_set: 
            if (word != 'ned') & (word != 'Ned'): #ned means no evidence and is an important medical term
                word1 = str.lower (word)
                names_list.append(word1) #add the lowered words
                names_list.append(word) #add the capitalized words
        
        self.names_list = names_list
    
    def remove_propernoun_names(self,msg):
        try: 
            nw_msg = [self.change_tup_to_list(token) for token in msg]
            for a, token in enumerate (nw_msg):
                if (token[0] in self.names_list) and ((token[1] == 'NNP') or (token[1]== 'NNPS')): 
                    new_token = token[0].replace (token[0], "-NAME-")
                    nw_msg[a] = [new_token, token[1]]
#             nw_msg2 = [self.change_list_to_tup(token) for token in nw_msg]
            return nw_msg
        except TypeError: 
            pass
    
    
    def anonymize (self, posts): 
        posts2 = self.replace_urls (posts)
        posts3 = self.replace_email (posts2)
        posts4 = self.remove_empty(posts3)
        posts5 = [word_tokenize (sent) for sent in posts4]
        posts6 = [pos_tag(sent) for sent in posts5]
        self.create_names_list()
        posts7 = [self.remove_propernoun_names (m) for m in posts6]
        posts8 = []
        for post in posts7: 
            tg = [m[0] for m in post]
            posts8.append(tg)
        return posts8

#---------Convert to lowercase ----------------------------------------------------
    
    def lowercase (self, post):
        post1 = []
        for word in post: 
            word1 = word.lower()
            post1.append (word1)
        return post1

#----------- Lexical normalization pipeline (Sarker, 2017) -------------------------------

    def loadItems(self):
        '''
        This is the primary load function.. calls other loader functions as required..
        '''    
        global english_to_american
        global noslang_dict
        global IGNORE_LIST_TRAIN
        global IGNORE_LIST

        english_to_american = {}
        lexnorm_oovs = []
        IGNORE_LIST_TRAIN = []
        IGNORE_LIST = []

        english_to_american = self.loadEnglishToAmericanDict()
        noslang_dict = self.loadDictionaryData()
        for key, value in noslang_dict.items (): 
            value2 = value.lower ()
            value3 = word_tokenize (value2)
            noslang_dict[key] = value3

        return None


    def loadEnglishToAmericanDict(self):
        etoa = {}

        english = open('C:\\Users\\dirksonar\\Documents\\Python Scripts\\External\\Lexical_normalization_files\\englishspellings.txt')
        american = open('C:\\Users\\dirksonar\\Documents\\Python Scripts\\External\\Lexical_normalization_files\\americanspellings.txt')
        for line in english:
            etoa[line.strip()] = american.readline().strip()
        return etoa

    def loadDictionaryData(self):
        '''
        this function loads the various dictionaries which can be used for mapping from oov to iv
        '''
        n_dict = {}
        infile = open('C:\\Users\\dirksonar\\Documents\\Python Scripts\\External\\Lexical_normalization_files\\noslang_mod.txt')
        for line in infile:
            items = line.split(' - ')
            if len(items[0]) > 0 and len(items) > 1:
                n_dict[items[0].strip()] = items[1].strip()
        return n_dict



    def preprocessText(self, tokens, IGNORE_LIST, ignore_username=False, ignore_hashtag=False, ignore_repeated_chars=True, eng_to_am=True, ignore_urls=False):
        '''
        Note the reason it ignores hashtags, @ etc. is because there is a preprocessing technique that is 
            designed to remove them 
        '''
        normalized_tokens =[]
        #print tokens
        text_string = ''
        # NOTE: if nesting if/else statements, be careful about execution sequence...
        for t in tokens:
            t_lower = t.strip().lower()
            # if the token is not in the IGNORE_LIST, do various transformations (e.g., ignore usernames and hashtags, english to american conversion
            # and others..
            if t_lower not in IGNORE_LIST:
                # ignore usernames '@'
                if re.match('@', t) and ignore_username:
                    IGNORE_LIST.append(t_lower)
                    text_string += t_lower + ' '
                #ignore hashtags
                elif re.match('#', t_lower) and ignore_hashtag:
                    IGNORE_LIST.append(t_lower)
                    text_string += t_lower + ' '
                #convert english spelling to american spelling
                elif t.strip().lower() in english_to_american.keys() and eng_to_am:    
                    text_string += english_to_american[t.strip().lower()] + ' '
                #URLS
                elif re.search('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', t_lower) and ignore_urls:
                    IGNORE_LIST.append(t_lower)
                    text_string += t_lower + ' '                
                elif not ignore_repeated_chars and not re.search(r'[^a-zA-Z]', t_lower):
                    # if t_lower only contains alphabetic characters
                    t_lower = re.sub(r'([a-z])\1+', r'\1\1', t_lower)
                    text_string += t_lower + ' '  
                    # print t_lower

                # if none of the conditions match, just add the token without any changes..
                else:
                    text_string += t_lower + ' '
            else:  # i.e., if the token is in the ignorelist..
                text_string += t_lower + ' '
            normalized_tokens = text_string.split()
        # print normalized_tokens
        return normalized_tokens, IGNORE_LIST


    def dictionaryBasedNormalization(self, tokens, I_LIST, M_LIST):
        tokens2 =[]
        for t in (tokens):
            t_lower = t.strip().lower()
            if t_lower in noslang_dict.keys() and len(t_lower)>2:
                nt = noslang_dict[t_lower]
                [tokens2.append(m) for m in nt]

                if not t_lower in M_LIST:
                    M_LIST.append(t_lower)
                if not nt in M_LIST:
                    M_LIST.append(nt)
            else: 
                tokens2.append (t)
        return tokens2, I_LIST, M_LIST
    
#----Using the Sarker normalization functions ----------------------------
#Step 1 is the English normalization and step 2 is the abbreviation normalization

    def normalize_step1(self, tokens, oovoutfile=None):
        global IGNORE_LIST
        global il
        MOD_LIST = []
        # Step 1: preprocess the text
        normalized_tokens, il = self.preprocessText(tokens, IGNORE_LIST)
        return normalized_tokens
    
    def normalize_step2(self, normalized_tokens, oovoutfile=None): 
        global IGNORE_LIST
        global il
        MOD_LIST = []    
        ml = MOD_LIST
        normalized_tokens, il, ml = self.dictionaryBasedNormalization(normalized_tokens, il, ml)
        return normalized_tokens

    def sarker_normalize (self,list_of_msgs): 
        self.loadItems()
        msgs_normalized = [self.normalize_step1(m) for m in list_of_msgs]
        msgs_normalized2 = [self.normalize_step2(m) for m in msgs_normalized]    
        return msgs_normalized2

#-------Domain specific abreviation expansion ----------------------------
# The list of abbreviations is input as a dictionary with tokenized output  

    def domain_specific_abbr (self, tokens, abbr): 
        post2 = [] 
        for t in tokens:
            if t in abbr.keys(): 
                nt = abbr[t]
                [post2.append(m) for m in nt]
            else: 
                post2.append(t)
        return post2

    def expand_abbr (self, data, abbr): 
        data2 = []
        for post in data: 
            post2 = self.domain_specific_abbr (tokens = post, abbr= abbr)
            data2.append(post2)
        return data2
    
#-------Spelling correction -------------------------------------------------    

    def create_token_freq (self, data): 
        flat_data = [item for sublist in data for item in sublist]
        self.token_freq = Counter(flat_data)
    
    def flev_impr (self,cand, token): 
        abs_edit_dist = editdistance.eval(cand, token)
        rel_edit_dist = abs_edit_dist / len(token)

#         if cand[0] != token[0]: # if first letters are different
#              rel_edit_dist = rel_edit_dist+1
        return rel_edit_dist

    def run_low (self,word, voc, func): 
        replacement = [' ',100]
        for token in voc: 
            sim = func(word, token)
            if sim < replacement[1]:
                replacement[1] = sim
                replacement[0] = token
        return replacement   
   

    def find_mistakes(self,tokens, min_corpus_freq, max_rel_edit_dist):
        TRUE_WORD = re.compile('[-a-z]+')  # Only letters and dashes 
        output= []
        self.create_token_freq (tokens)
        
        min_corpus_freq = min_corpus_freq * sum(self.token_freq.values())
        
       
        z = self.token_freq.most_common()
        z = [self.change_tup_to_list(m) for m in z]

        spelling_dict = []
        [spelling_dict.append(t) for t in self.celex_list]

        spelling_dict_extra = [t[0] for t in z if t[1]>= min_corpus_freq and t[0] not in self.celex_set]
        spelling_dict_extra = [t for t in spelling_dict_extra if TRUE_WORD.fullmatch(t)]
        [spelling_dict.append(t) for t in spelling_dict_extra]
        
        spelling_corrections = {}
        
        for post in tokens:
            post2 = []
            for a, token in enumerate (post):
                if TRUE_WORD.fullmatch(token):
                    if token in spelling_corrections:
                        correct = spelling_corrections[token] 
                        post2.append(correct)
                    elif token in self.celex_freq_dict: # do not need try here because will not throw error if token is not a key
                        post2.append(token)
                    elif self.token_freq[token] >= min_corpus_freq: # do not need try here because necessarily every token has to be in here
                        post2.append(token)
                    else:
                        #make a subset of hte voc that has the right first letter and not too much difference in length
                        l = len(token)
                        f = token[0]
                        spelling_dict2 = [t for t in spelling_dict if t[0] == f]
                        spelling_dict3 = [t for t in spelling_dict2 if len(t)<= (1.2 *l)] #reduce the search space
                        spelling_dict4 = [t for t in spelling_dict3 if len(t)>= (0.8 *l)]

                        candidate = self.run_low (token, spelling_dict4, flev_impr)

                        if candidate[1] >= max_rel_edit_dist: 
                            post2.append(token)
                        else: 
                            post2.append(candidate[0])
                            spelling_corrections [token] = candidate[0]
                else: post2.append(token) 
            output.append(post2)
        return output
    
#--------Overall normalization function--------------------------------------

    def normalize (self, posts):
        self.load_files ()
        posts1 = self.anonymize(posts)
        posts2 = [self.lowercase (m) for m in posts1]
        posts4 = [self.sarker_normalize(posts2)]
        posts5 = [self.expand_abbr(posts4[0], self.abbr_dict)]
        posts6 = self.find_mistakes (posts5[0], min_corpus_freq = 5e-6, max_rel_edit_dist = 0.19)
        return posts6