# Importing Packages

In [1]:
from pyspark import SparkContext, SparkFiles
from pyspark.sql import SparkSession
import re
import os
from nltk.util import ngrams
import pandas as pd
import numpy as np
import pickle
import time

SparkContext.setSystemProperty('spark.executor.memory', '8g')
sc = SparkContext(appName='entity_resolution')

# Implementation

In [29]:
# Dictionaries for the asciifying task.
group_a = {'a':[u'\u00C0', u'\u00C1', u'\u00C2', u'\u00C3', u'\u00C4', u'\u00C5', u'\u00E0'
                 , u'\u00E1', u'\u00E2', u'\u00E3', u'\u00E4', u'\u00E5', u'\u0100', u'\u0101'
                 , u'\u0102', u'\u0103', u'\u0104', u'\u0105']}
group_c = {'c':[u'\u00C7', u'\u00E7', u'\u0106', u'\u0107', u'\u0108', u'\u0109', u'\u010A'
                 , u'\u010B', u'\u010C', u'\u010D']}
group_d = {'d':[u'\u00D0', u'\u00F0', u'\u010E', u'\u010F', u'\u0110', u'\u0111']}
group_e = {'e':[u'\u00C8', u'\u00C9', u'\u00CA', u'\u00CB', u'\u00E8', u'\u00E9', u'\u00EA'
                 , u'\u00EB', u'\u0112', u'\u0113', u'\u0114', u'\u0115', u'\u0116', u'\u0117'
                 , u'\u0118', u'\u0119', u'\u011A', u'\u011B']}
group_g = {'g':[u'\u011C', u'\u011D', u'\u011E', u'\u011F', u'\u0120', u'\u0121', u'\u0122'
                 , u'\u0123']}
group_h = {'h':[u'\u0124', u'\u0125', u'\u0126', u'\u0127']}
group_i = {'i':[u'\u00CC', u'\u00CD', u'\u00CE', u'\u00CF', u'\u00EC', u'\u00ED', u'\u00EE'
                 , u'\u00EF', u'\u0128', u'\u0129', u'\u012A', u'\u012B', u'\u012C', u'\u012D'
                 , u'\u012E', u'\u012F', u'\u0130', u'\u0131']}
group_j = {'j':[u'\u0134', u'\u0135']}
group_k = {'k':[u'\u0136', u'\u0137', u'\u0138']}
group_l = {'l':[u'\u0139', u'\u013A', u'\u013B', u'\u013C', u'\u013D', u'\u013E', u'\u013F'
                 , u'\u0140', u'\u0141', u'\u0142']}
group_n = {'n':[u'\u00D1', u'\u00F1', u'\u0143', u'\u0144', u'\u0145', u'\u0146', u'\u0147'
                 , '\u0148', '\u0149', '\u014A', '\u014B']}
group_o = {'o':[u'\u00D2', u'\u00D3', u'\u00D4', u'\u00D5', u'\u00D6', u'\u00D8', u'\u00F2'
                 , u'\u00F3', u'\u00F4', u'\u00F5', u'\u00F6', u'\u00F8', u'\u014C', u'\u014D'
                 , u'\u014E', u'\u014F', u'\u0150', u'\u0151']}
group_r = {'r':[u'\u0154', u'\u0155', u'\u0156', u'\u0157', u'\u0158', u'\u0159']}
group_s = {'s':[u'\u015A', u'\u015B', u'\u015C', u'\u015D', u'\u015E', u'\u015F', u'\u0160'
                 , u'\u0161', u'\u017F']}
group_t = {'t':[u'\u0162', u'\u0163', u'\u0164', u'\u0165', u'\u0166', u'\u0167']}
group_u = {'u':[u'\u00D9', u'\u00DA', u'\u00DB', u'\u00DC', u'\u00F9', u'\u00FA', u'\u00FB'
                 , u'\u00FC', u'\u0168', u'\u0169', u'\u016A', u'\u016B', u'\u016C', u'\u016D'
                 , u'\u016E', u'\u016F', u'\u0170', u'\u0171', u'\u0172', u'\u0173']}
group_w = {'w':[u'\u0174', u'\u0175']}
group_y = {'y':[u'\u00DD', u'\u00FD', u'\u00FF', u'\u0176', u'\u0177', u'\u0178']}
group_z = {'z':[u'\u0179', u'\u017A', u'\u017B', u'\u017C', u'\u017D', u'\u017E']}

group_list = [group_a, group_c, group_d, group_e, group_g, group_h, group_i, group_j,
             group_k, group_l, group_n, group_o, group_r, group_s, group_t, group_u,
             group_w, group_y, group_z]
translate_dict = {}
for group in group_list:
    return_value = group.keys()[0]
    for char in group[return_value]:
        translate_dict[char] = return_value
sorted(translate_dict.items(), key=lambda kv: kv[1])
#pickle_out('translate_dict', translate_dict)

In [66]:
class EntityResolution:
    """
    Authors: Myoungsu Choi (myoungsu@usc.edu), 
             Jingying Yin (yinjingy@usc.edu),
             Yining Zhang (yzhang27@usc.edu)
    """
    
    default_parameters = {
        'method': 'fingerprint',
        'partition_num': 8,
        'default_file_path': './sub_result/', 
        'signature_len': 15,
        'band_size': 5,
        'annotate': False
    }
    
    def __init__(self,
                 input_file_path=None,
                 partition_num=default_parameters['partition_num'],
                 method=default_parameters['method'],
                 signature_len=default_parameters['signature_len'],
                 band_size=default_parameters['band_size'],
                 default_file_path=default_parameters['default_file_path'],
                 annotate=default_parameters['annotate'],
                 n=2):
        """
        Parameters
        ----------
        input_file_path: String, a path of the input file. 
        partition_num: Integer, a number of the partition for spark.
        method: String, a method for the entity resolution.
        signature_len: Integer, the row length of the signature matix.
        band_size: Integer, a band size of the LSH.
        default_file_path: String, a default file path for the pickling in and out.
        annotate: Boolean, whether to print the tree or not.
        """
        
        method_available = ['fingerprint', 'ngram_fingerprint',
                            'fingerprint_lhs', 'ngram_fingerprint_lhs']
        method_mapper = {
            'fingerprint':self.extract_fingerprint,
            'ngram_fingerprint':self.extract_ngram_fingerprint,
        }
        
        if not input_file_path:
            raise ValueError('Input_file_path should be given.')
        if method not in method_available:
            raise ValueError('Available methods are ' + str(method_available))
            
        self.method = method
        self.partition_num = partition_num
        self.input_file_path = input_file_path
        self.default_file_path = default_file_path
        self.signature_len = signature_len
        self.band_size = band_size
        self.banded_output = False
        translate_dict = self.pickle_in('translate_dict')
        
        self.func = method_mapper[self.method]
        self.translate_dict = translate_dict
        self.non_ascii_set = set(translate_dict.keys())
        self.n = n
        self.annotate = annotate
        
        if self.method == 'fingerprint':
            self.preprocess_func = self.make_tokens
        elif self.method == 'ngram_fingerprint':
            self.preprocess_func = self.make_ngrams
        
    def pickle_out(self, name, obj):
        """
        Parameters
        ----------
        name: String, a file name for the saving in pickle object.
        obj: Object, an object for the saving
        """
        if not os.path.exists(self.default_file_path):
            os.makedirs(self.default_file_path)
        pickle_out = open(self.default_file_path + name + '.pickle','wb')
        pickle.dump(obj, pickle_out)
        pickle_out.close()
        return

    def pickle_in(self, name):
        """
        Parameters
        ----------
        name: String, a file name for the saving in pickle object.
        
        Return
        ------
        A loaded object.
        """
        if not os.path.exists(self.default_file_path):
            raise ValueError(self.default_file_path+' is not existed.')
        pickle_in = open(self.default_file_path + name + '.pickle','rb')
        obj = pickle.load(pickle_in)
        pickle_in.close()
        return obj
    
    # Abstract funtion
    # Implemented in EntityResolutionLsh
    def make_tokens(self, string):
        return
    
    # Abstract funtion
    # Implemented in EntityResolutionLsh
    def make_ngrams(self, string):
        return
    
    def strip(self, string):
        """
        Parameters
        ----------
        string: String, a string for the strip task.
        
        Return
        ------
        A string after removing leading and trailing whitespace.
        """
        return string.strip()

    def lower(self, string):
        """
        Parameters
        ----------
        string: String, a string for the lower task.
        
        Return
        ------
        A string in a lowercase representation form
        """
        return string.lower()

    def remove_punctuation(self, string):
        """
        Parameters
        ----------
        string: String, a string for the removing punctuation task.
        
        Return
        ------
        A string after removing punctuation.
        """
        pattern = r'[^\w\s]'
        return re.sub(pattern, '', string)
    
    def remove_whitespace(self, string):
        """
        Parameters
        ----------
        string: String, a string for the removing whitespace task.
        
        Return
        ------
        A string after removing whitespace in any position of the string.
        """
        pattern = r'[\s]'
        return re.sub(pattern, '', string)
    
    def asciify(self, string):
        """
        Parameters
        ----------
        string: String, a string for the asciifying task.
        [Asciifying task: normalizing extended western characters to their ASCII representation]
        
        Return
        ------
        A string after asciifying task.
        """
        if not isinstance(string, unicode):
            string = unicode(string, 'utf-8')
        return_str = ''
        for char in string:
            if char in self.non_ascii_set:
                return_str += self.translate_dict[char]
            else:
                return_str += char
        return str(return_str)

    def tokenize(self, string):
        """
        Parameters
        ----------
        string: String, a string for the tokenizing task based on the whitespace.
        
        Return
        ------
        A list of the tokens after tokenizing.
        """
        return string.split()

    def remove_duplicates(self, token_list):
        """
        Parameters
        ----------
        token_list: List, a list for the removing the duplicates in the list.
        
        Return
        ------
        A list of the tokens after the removing dupliacates.
        """
        token_series = pd.Series(token_list)
        return list(token_series.unique())

    def sort_list(self, token_list):
        """
        Parameters
        ----------
        token_list: List, a list for the sorting task.
        
        Return
        ------
        A list of the tokens after the sorting task.
        """
        return sorted(token_list, reverse=False)

    def join_string(self, token_list, ngram=False):
        """
        Parameters
        ----------
        token_list: List, a list for the joining all the tokens based on the whitespace.
        ngram: Boolean, an indicator whether method is ngram based or not. 
        
        Return
        ------
        A string after the joining all the tokens based on whitespace.
        """
        if not ngram:
            return ' '.join(token_list)
        else:
            temp_list = []
            for tuple_item in token_list:
                for i in range(len(tuple_item)):
                    temp_list.append(tuple_item[i])
            return ''.join(temp_list)

    def get_ngram(self, token_list, n=2):
        """
        Parameters
        ----------
        token_list: List, a list for getting a ngram list.
        n: Integer, an number of near items gathered during the ngram.
        
        Return
        ------
        A list of the ngram items.
        """
        return list(ngrams(token_list, n))

    def extract_fingerprint(self, string):
        """
        Parameters
        ----------
        string: String, a string for the extracting a fingerprint.
        
        Return
        ------
        A fingerprint based on the fingerprinting method.
        """
        string = self.strip(string)
        string = self.lower(string)
        string = self.remove_punctuation(string)
        string = self.asciify(string)
        token_list = self.tokenize(string)
        token_list = self.sort_list(token_list)
        token_list = self.remove_duplicates(token_list)
        fingerprint = self.join_string(token_list)
        fingerprint = self.strip(fingerprint)
        return fingerprint

    def extract_ngram_fingerprint(self, string):
        """
        Parameters
        ----------
        string: String, a string for the extracting a fingerprint based on ngram.
        
        Return
        ------
        A fingerprint based on the ngram-fingerprinting method.
        """
        string = self.lower(string)
        string = self.remove_punctuation(string)
        string = self.remove_whitespace(string)
        ngram_list = self.get_ngram(list(string), n=self.n)
        ngram_list = self.sort_list(ngram_list)
        ngram_list = self.remove_duplicates(ngram_list)
        fingerprint = self.join_string(ngram_list, ngram=True)
        fingerprint = self.asciify(fingerprint)
        return fingerprint
    
    def get_normalized_entity(self, entity_list):
        """
        Parameters
        ----------
        entity_list: List, a list of the entities for the extracting fingerprint task.
        
        Return
        ------
        A list of the tuples of fingerprint and original entity string.
        """
        result = []
        for entity in entity_list:
            normalized_entity = self.func(entity)
            result.append((normalized_entity, entity))
        return result
    
    def clustering_entity(self, column=None, sc=None):
        """
        Parameters
        ----------
        column: String, a name of the column of the DataFrame, which is supposed to be worked on.
        sc: SparkContext instance.
        
        Return
        ------
        A DataFrame that includes 3 columns(fingerprint, original column, newly assigned column).
        """
        time1 = time.time()
        if not isinstance(column, str):
            raise ValueError('column should be string.')
        if not sc:
            raise ValueError('Spark Context should be given.')
        df_raw = pd.read_csv(self.input_file_path) 
        
        if column not in df_raw.columns:
            raise ValueError(column +' is not in the column list.')
        series_raw = df_raw[column]
        lines = sc.parallelize(series_raw, self.partition_num)
        
        entity_fingerprint_list = lines.mapPartitions(self.get_normalized_entity).collect()
                 
        df_result = pd.DataFrame(data=entity_fingerprint_list, 
                                 columns=['fingerprint', 'old_entity'])
        
        unique_list = df_result.groupby('fingerprint')['old_entity'].nunique()
        cluster_set = set(unique_list[unique_list>1].index)
        df_clustered = df_result[df_result['fingerprint'].apply(lambda x: x in cluster_set)]
        series_new_entity = df_clustered.groupby('fingerprint')['old_entity']\
                            .agg(lambda x: x.value_counts().index[0])
        series_new_entity.name = 'new_entity'
        
        df_output = \
            pd.merge(df_result, series_new_entity, 
                     how='left', left_on='fingerprint', right_index=True)
        df_output['new_entity'].fillna(df_output['old_entity'], inplace=True)
        
        time2 = time.time()
        if self.annotate:
            print '• Function took %0.1f sec' % ((time2-time1))
            print '• '+ str(len(df_output.query('old_entity!=new_entity')['fingerprint'].unique())) \
                  + ' clusters founded.'
        return df_output
    
class EntityResolutionLsh(EntityResolution):
            
    def make_tokens(self, init_string):
        """
        Parameters
        ----------
        init_string: String, an original entity.
        
        Return
        ------
        A tuple of original string and a list of tokens.
        """
        string = self.strip(init_string)
        string = self.lower(string)
        string = self.remove_punctuation(string)
        string = self.asciify(string)
        token_list = self.tokenize(string)
        token_list = self.sort_list(token_list)
        token_list = self.remove_duplicates(token_list)
        return (init_string, token_list) 
    
    def make_ngrams(self, init_string):
        """
        Parameters
        ----------
        init_string: String, an original entity.
        
        Return
        ------
        A tuple of original string and a list of ngrams.
        """
        string = self.lower(init_string)
        string = self.remove_punctuation(string)
        string = self.remove_whitespace(string)
        ngram_list = self.get_ngram(list(string), n=self.n)
        ngram_list = self.sort_list(ngram_list)
        ngram_list = self.remove_duplicates(ngram_list)
        return (init_string, ngram_list)
    
    def get_preprocessed_entity(self, entity_list):
        """
        Parameters
        ----------
        entity_list: List, a list of the entities for the preprocessing task for the LSH task.
        
        Return
        ------
        A list of the tuples from the make_ngrams or the make_tokens function.
        """
        result = []
        for entity in entity_list:
            preprocessed_entity = self.preprocess_func(entity)
            result.append(preprocessed_entity)
        return result
    
    def hash_func(self, mid, i):
        """
        Parameters
        ----------
        mid: Integer, token or ngram id from the token2id.
        i: Integer, indices for the hashing [Scope: (1, signature_len)].
        
        Return
        ------
        An hashed integer. 
        """
        return (3*mid + 11*i) % 100 + 1

    def calc_jaccard(self, list1, list2):
        """
        Parameters
        ----------
        list1, list2: List, lists for the calculating jaccard score.
        
        Return
        ------
        A jaccard score.
        """
        set1 = set(list1)
        set2 = set(list2)
        union = set1 | set2
        intersect = set1 & set2
        return len(intersect)/len(union)
    
    def band2idx(self, band_num, band_size, signature_len):
        """
        Parameters
        ----------
        band_num: Integer, an index of the band.
        band_size: Integer, the size of the band for the LSH task.
        signature_len: Integer, the row length of the signature matix.
        
        Return
        ------
        A tuple of start index and end index for the making banded minhash matrix.
        """
        if band_num > band_size:
            raise ValueError('band_num cannot be larger than band_size')
        unit = signature_len/band_size
        start_idx = band_num*unit
        end_idx = start_idx + unit
        return (start_idx, end_idx)
    
    def minhash(self, iterator):
        """
        Parameters
        ----------
        iterator: List, 
                  a list of (entity, [token1, token2, ,,,]) or (entity, [ngram1, ngram2, ,,,])
                  characteristic matrix
        Return
        ======
        A list of the tuples. (original entity, a signature column[list type])
        • banded_output: (band_index, (original entity, a signature column[list type]))
        """
        result = []
        for entity in iterator:
            signature_col = np.full(self.signature_len, np.inf)
            for token in entity[1]:
                for i in range(self.signature_len):
                    hashed_idx = self.hash_func(self.token2id[token], i+1)
                    if hashed_idx < signature_col[i]:
                        signature_col[i] = hashed_idx
            if self.banded_output:
                for band in range(self.band_size):
                    start_idx, end_idx = self.band2idx(band, 
                                                       self.band_size, 
                                                       self.signature_len)
                    result.append((band, (entity[0], list(signature_col)[start_idx:end_idx])))
            else:
                result.append((entity[0], list(signature_col)))
        return result

    def make_bucket(self, iterator):
        """
        Parameters
        ----------
        iterator: List, a banded minhash matrix.
        
        Return
        ======
        A list of the tuples of band_num and 
        a dictionary of (key:hashed_value, value: a list of entities that shares the same hash.)
        """
        result = []
        for band in iterator:
            itermediate_dict = {}
            for item in band[1]:
                hashed_value = hash(str(item[1]))
                if not hashed_value in itermediate_dict.keys():
                    itermediate_dict[hashed_value] = set([item[0]])
                else:
                    itermediate_dict[hashed_value] = itermediate_dict[hashed_value] | set([item[0]])
            result.append((band[0], itermediate_dict))
        return result

    def find_candidates(self, band, entity):
        """
        Parameters
        ----------
        band: Integer, an index of the band
        entity: String, the original entity.
        
        Return
        ======
        A list of candidates for the similar entities. 
        """
        candidates_list = set()
        for similar_entity_set in band[1].values():
            if entity in similar_entity_set:
                similar_entity_set = similar_entity_set - set([entity])
                candidates_list = candidates_list | similar_entity_set
        return list(candidates_list)

    def find_similar_entity(self, 
                            entity,
                            candidates_list, 
                            minhash_matrix, 
                            top=1):
        """
        Parameters
        ----------
        entity: String, the original entity.
        candidates_list: List, a list of candidates for the similar entities. 
        minhash_matrix: List, a list of the tuples. 
                        (original entity, a signature column[list type])
        top: Integer, a number of how many similar entities will be returned.
        
        Return
        ======
        A list of similar entities.
        """
        if candidates_list <= top:
            return candidates_list
        else:
            entity_col = minhash_matrix[entity]
            similar_entity_list = []
            for cand in candidates_list:
                candidate_col = minhash_matrix[cand]
                jaccard_score = self.calc_jaccard(entity_col, candidate_col)
                similar_entity_list.append((jaccard_score, cand))
            similar_entity_list = sorted(similar_entity_list, reverse=True)
            similar_entity_list = [entity for score, entity in similar_entity_list]
        return similar_entity_list[:top]
    
    def make_token2id_id2token(self, preprocessed_lines):
        """
        Parameters
        ----------
        preprocessed_lines: List, a list of the tuples (init_string, token_list or ngram_list)
                            This is from self.preprocess_func 
                            [self.make_tokens or self.make_ngrams]
        
        """
        all_tokens = []
        for item in preprocessed_lines:
            all_tokens.extend(item[1])
        all_tokens = pd.Series(all_tokens).unique()
        self.token2id = {item:i for i, item in enumerate(all_tokens)}
        self.id2token = {i:item for i, item in enumerate(all_tokens)}
        return 
    
    def predict(self, entity_list):
        """
        Parameters
        ----------
        entity_list: List, a list of the original entities that are supposed to be worked on.
        
        Return
        ======
        A list of the tuples of original entity and newly assigned entity.
        """
        result = []
        for entity in entity_list:
            candidates_list = []
            for band in self.buckets:
                candidates_list.extend(self.find_candidates(band, entity))
            candidates_list = list(set(candidates_list))
            similar_entity = self.find_similar_entity(entity, 
                                                      candidates_list, 
                                                      self.minhash_matrix)
            result.append((entity, similar_entity))
        return result
    
    def clustering_entity(self, column=None, sc=None):
        """
        Parameters
        ----------
        column: String, a name of the column of the DataFrame, which is supposed to be worked on.
        sc: SparkContext.
        
        Return
        ======
        A list of the tuples of original entity and newly assigned entity.
        """
        time1 = time.time()
        if not isinstance(column, str):
            raise ValueError('column should be string.')
        if not sc:
            raise ValueError('Spark Context should be given.')
            
        df_raw = pd.read_csv(self.input_file_path) 
        if column not in df_raw.columns:
            raise ValueError(column +' is not in the column list.')
        series_raw = df_raw[column]
        lines = sc.parallelize(series_raw, self.partition_num)
        
        preprocessed_lines = lines.mapPartitions(self.get_preprocessed_entity).collect()
        self.make_token2id_id2token(preprocessed_lines)
        
        prep_lines_serial = sc.parallelize(preprocessed_lines, self.partition_num)
        
        self.banded_output = True
        minhash_banded_matrix = prep_lines_serial\
                                .mapPartitions(self.minhash)\
                                .collect()
        self.banded_output = False
        minhash_matrix = prep_lines_serial\
                        .mapPartitions(self.minhash).collect()
        minhash_matrix = {entity:column for entity, column in minhash_matrix}
        self.minhash_matrix = minhash_matrix
        
        buckets = sc.parallelize(minhash_banded_matrix, self.partition_num)
        buckets = buckets.groupByKey().map(lambda x: (x[0], list(x[1])))
        buckets = buckets.mapPartitions(self.make_bucket).collect()
        self.buckets = buckets
        
        pred_result = sc.parallelize(series_raw, self.partition_num)
        pred_result = pred_result.mapPartitions(self.predict)\
                      .collect()
        
        time2 = time.time()
        if self.annotate:
            print '• Function took %0.1f sec' % ((time2-time1))
        return pred_result


# df_ratings = pd.read_csv(input_file_path)
# df_ratings_sample = df_ratings.sample(10000)
# df_ratings_sample.to_csv(sample_file_path, index=False)
# df_sample = pd.read_csv(sample_file_path)

In [3]:
input_file_path = './data/restaurant-and-market-health-inspections.csv'
sample_file_path = './data/sample.csv'

In [72]:
er = EntityResolution(input_file_path=input_file_path, 
                      method='fingerprint',
                      partition_num=16,
                      annotate=True,
                      n=3)

In [73]:
result = er.clustering_entity(column='facility_name', sc=sc)

• Function took 10.7 sec
• 107 clusters founded.


#### Difference
    - [1 item]: [MCDONALD'S  #1126]<->[MCDONALD’S #1126] 

In [70]:
result[result['fingerprint']=='1126 mcdonalds']['old_entity']

44327    MCDONALD'S  #1126
51836     MCDONALD’S #1126
53499     MCDONALD’S #1126
55752     MCDONALD’S #1126
65706    MCDONALD'S  #1126
Name: old_entity, dtype: object

In [1]:
er_lsh = EntityResolutionLsh(input_file_path=sample_file_path, 
                             method='fingerprint',
                             partition_num=16,
                             signature_len=20,
                             band_size=10,
                             annotate=True,
                             n=3)

In [2]:
er_lsh.clustering_entity(column='facility_name', sc=sc)