# Imports

In [1]:
import os
import csv
import re
import collections
import logging
import itertools
import pickle
import psycopg2
import psycopg2.extras
import dedupe
import numpy as np
import pandas as pd
import simplejson as json

from io import StringIO
from unidecode import unidecode

# Variables

In [2]:
logging.getLogger().setLevel(logging.INFO)

In [3]:
configuration_file_name = 'configuration_file_dedupe.json'
config_file_data = {}

In [4]:
with open(configuration_file_name, 'r') as config_file:
    config_file_data = json.load(config_file)

In [5]:
# input files
input_file_1 = config_file_data.get('input_file_1')
input_file_2 = config_file_data.get('input_file_2')

# Setup

In [6]:
def preProcess(column):
    """
    This function does a little bit of data cleaning with the help of Unidecode and Regex libraries.
    Things like casing, extra spaces and new lines can be ignored.
    
    :param column: string object which represents a cell from the csv file
    :return: the preprocessed column
    """
    
    column = unidecode(column)
    column = re.sub('\n', ' ', column)
    column = re.sub('  +', ' ', column)
    column = column.strip().strip('"').strip("'").lower().strip()
    
    if not column :
        column = None
        
    return column

In [7]:
def read_data(filename):
    """
    This function reads CSV file and creates a dictionary of records, 
    where the key is a unique record ID (name of the file + index).
    
    :param filename: string object which represents the name of the
                     input file
    :return: a dictionary object containing all the rows read from the CSV file
    """
    
    data_d = {}
    
    partial_key = filename
    
    with open(filename) as f:
        reader = csv.DictReader(f)
        
        # delete the first 4 characters of the partial key if they are "tmp_"
        if partial_key.find("tmp_") == 0:
            partial_key = partial_key[4:]
            
        for i, row in enumerate(reader):
            clean_row = dict([(k, preProcess(v)) for (k,v) in row.items()])
            data_d[partial_key + str(i)] = dict(clean_row)
            
    return data_d

In [8]:
def read_dataset_containing_only_the_common_fields_in_both_datasets(input_file, common_fields):
    """
    Read our data from a CSV file only the common fields of the both datasets.
    
    Input: 'input_file'    - string object which represents the name of the
                             input file
           'common_fields' - a list of string objects containing the name of the 
                             common fields of the both datasets
    """
    
    #create a temporary file with only common columns
    tmp_input_file_with_common_cols = "tmp_" + input_file
    
    df = pd.read_csv(input_file, dtype = object)
    df = df[common_fields]
    
    df.to_csv(tmp_input_file_with_common_cols, index = False)
    
    #read the data from this temporary file
    data_d = read_data(tmp_input_file_with_common_cols)
    
    #remove the temporary file 
    os.remove(tmp_input_file_with_common_cols)
    
    return data_d

In [9]:
def get_common_fields_of_the_datasets_or_the_given_fields(config_file_data, input_file_1, input_file_2):
    """
    Get the common columns (fields) of both datasets if the user did not specify 
    them in the configuration file.
    
    Input: 'config_file_data' - a dictionary containing the information of the
                                configuration file
           'input_file_1' - string object which represents the name of the first dataset
           'input_file_2' - string object which represents the name of the second dataset
    """
    given_fields = []
    
    # if some fields were specified, get them in a list
    if config_file_data['training'].get('field_definitions'):
        field_definitions = config_file_data['training'].get('field_definitions')
        
        for f in field_definitions:
            given_fields.append(f['field'])  
        
    # get the common columns of both datasets
    df1 = pd.read_csv(input_file_1, dtype = object)
    df2 = pd.read_csv(input_file_2, dtype = object)
      
    # postgres only has lower case column names --> make lower case the dataframe column names    
    f1_header_columns = set([x.lower() for x in list(df1.columns.values)])
    f2_header_columns = set([x.lower() for x in list(df2.columns.values)])
    
    common_cols_from_datasets = list(f1_header_columns.intersection(f2_header_columns))
    
    # make sure that the given fields (if given) can be found in the common columns of the datasets
    if len(given_fields) > 0:
        return list(set(given_fields).intersection(set(common_cols_from_datasets)))
    
    return common_cols_from_datasets

In [10]:
common_columns = get_common_fields_of_the_datasets_or_the_given_fields(config_file_data, input_file_1, input_file_2)

In [11]:
logging.info('reading records from {}'.format(input_file_1))
first_dataset = read_dataset_containing_only_the_common_fields_in_both_datasets(input_file_1, common_columns)
logging.info('{} records read'.format(len(first_dataset)))

INFO:root:reading records from new_first_dataset_1000.csv


INFO:root:1000 records read


In [12]:
logging.info('reading records from {}'.format(input_file_2))
second_dataset = read_dataset_containing_only_the_common_fields_in_both_datasets(input_file_2, common_columns)
logging.info('{} records read'.format(len(second_dataset)))

INFO:root:reading records from new_second_dataset_1000.csv


INFO:root:1000 records read


# Training

In [13]:
logging.info('starting training..')

INFO:root:starting training..


In [14]:
def get_training_fields(config_file_data, common_columns):
    """
    Define the common columns (fields) the library will pay attention to 
    by creating a list of dictionaries where each dictionary's keys will be 'field' and
    'type'.
    E.g.: [{'field' : 'field1', 'type' : 'String'}, {'field' : 'field2', 'type' : 'String'}]
    For more information about the accepted types, read more here:
    https://docs.dedupe.io/en/latest//Variable-definition.html
    
    Input: 'config_file_data' - dictionary containg the the information of the
                                configuration file
           'common_columns' - a list of string objects containing the name of the 
                             common columns(fields) of the both datasets
    """
    if config_file_data['training'].get('field_definitions'):
        fields = []
        for c in common_columns:
            for f in config_file_data['training']['field_definitions']:
                if f['field'] == c:
                    fields.append({'field' : c, 'type' : f['type']})
                    break
                    
        return fields
    
    #if the fields are not specified in configuration file, then create
    #a list of dictionaries where the type of the field will be "String" 
    #by default
    fields = [{'field' : f, 'type' : 'String'} for f in common_columns]
    
    return fields

In [15]:
def get_uncertain_pairs(deduper, nr_uncertain_pairs):
    """
    This function gets all the uncertain pairs by using the library's function uncertainPairs.
    
    :param deduper: the library object
    :param nr_uncertain_pairs: how many uncertain pairs the library should give to the user to label
    :return: a list of tuples where every tuple represents a pair of examples which library is uncertain to label them.
    """
    
    uncertain_pairs = [] 
    
    for i in range(0, nr_uncertain_pairs):
        uncertain_pair = deduper.uncertainPairs()
        uncertain_pairs.append(uncertain_pair.pop())
    
    return uncertain_pairs

The first part of the algorithm is the training part. In this part it is used the [RecordLink](https://docs.dedupe.io/en/latest/API-documentation.html#RecordLink) class from dedupe. This class is used because the algorithm takes two input files and try to find one-to-one match between different examples from both datasets.
A RecordLink object is created using the common fields of the both datasets. If the user specifies these fields in the configuration file then these columns will be used (they may not be all the common columns). These fields are used as features for the classifier. In this part the algorithm learns different weights for these fields and then builds a model. Basically, the model is represented by the the fields and their weights.
You can notice that here we have three cases:
* In the first case it doesn't exist a training and a settings file so the user should create them. First the algorithm samples some examples to do a mini training used for giving to the user the examples that the library is uncertain about. The user should label this examples by typing y(yes) in case the user considers that these examples match, n(no) in case the examples don't match, u(unsure) if the user is unsure to label, p(previous) if the user figures out that he/she don't label correctly the previous example or f(finish) to finish the labeling part. At the end of this process, which is called active learning, a training file is created and the library uses it to continue the training. The result is a model which is saved in a settings file. For building the model, the library uses [Logistic Regression](https://github.com/dedupeio/rlr) [classifier](https://docs.dedupe.io/en/latest/API-documentation.html#Dedupe.classifier).
* In the second case a training file was created. Then the labeling part is skipped and a new model is created.
* In the third case a settings file was created and the labeling part and training part is skipped.

In [None]:
# if 'create_training_file_by_client' parameter is true this cell will create the 'uncertain_pairs_file'
# configuration parameters used for the training part
nr_examples_sampled_for_training = config_file_data['training'].get('nr_of_examples_for_training')
training_fields = get_training_fields(config_file_data, common_columns)
create_training_file = config_file_data['training']['create_training_file_by_client']
training_file = config_file_data['training'].get('training_file')
settings_file = config_file_data['training'].get('settings_file')

if settings_file:
    logging.info('reading from {}'.format(settings_file))
    with open(settings_file, 'rb') as sf :
        linker = dedupe.StaticRecordLink(sf)
        
else:
    linker = dedupe.RecordLink(training_fields)

    linker.sample(first_dataset, second_dataset, nr_examples_sampled_for_training)
     
    # if the user wants to create a training file on the client side we will make available
    # through a GET request a binary file containing 200 pairs of examples 
    # that Dedupe doesn't know if they match or not; then, on the client side
    # the user will label the pairs (match or distinct) and the client will
    # make a POST request with the newly created training file
    if create_training_file:
        uncertain_pairs = get_uncertain_pairs(linker, 200)
        with open("uncertain_pairs_file", "wb") as f:
            pickle.dump(uncertain_pairs, f)

    else:     
        if training_file:
            logging.info('reading labeled examples from {}'.format(training_file))
            with open(training_file) as tf :
                linker.readTraining(tf)
        else:    
            logging.info('starting active labeling...')
            dedupe.consoleLabel(linker)

        linker.train()

        # if the training and settings files were not specified, but you want to keep them between runs, 
        # rename them or save them somewhere else, because they will be overwritten every time the algorithm is run
        if not training_file:
            with open("training_file.json", 'w') as tf :
                linker.writeTraining(tf)

        if not settings_file:
            with open("settings_file", 'wb') as sf :
                linker.writeSettings(sf)

        linker.cleanupTraining()

In [None]:
if settings_file is None and create_training_file:
    logging.info('reading labeled examples from the newly created training file {}'.format(training_file))
    
    with open(training_file) as tf :
        linker.readTraining(tf)

    linker.train()
    
    with open("settings_file", 'wb') as sf :
        linker.writeSettings(sf)

    linker.cleanupTraining()

# Clustering

In [None]:
# parameters used for the 'threshold' method from Dedupe
threshold_value = None

if config_file_data.get('threshold') or config_file_data.get('compute_threshold'):
    if config_file_data.get('threshold'):
        threshold_value = config_file_data.get('threshold')
    else:
        logging.info('find the best threshold...')

        recall_weight = config_file_data['compute_threshold'].get('recall_weight')

        sample_nr_of_examples_for_threshold = \
                config_file_data['compute_threshold'].get('nr_of_sample_data_for_threshold')

        # get n examples from the input dataset, where n = 'sample_nr_of_examples_for_threshold' 
        sample_data_threshold_first_dataset = {
            k: first_dataset[k] for k in list(first_dataset)[:int(sample_nr_of_examples_for_threshold)]
        }
        sample_data_threshold_second_dataset = {
            k: second_dataset[k] for k in list(second_dataset)[:int(sample_nr_of_examples_for_threshold)]
        }

        threshold_value = linker.threshold(sample_data_threshold_first_dataset, 
                                            sample_data_threshold_second_dataset, 
                                           recall_weight)

In [None]:
logging.info('clustering...')
print(threshold_value)

if threshold_value:
    linked_records = linker.match(first_dataset, second_dataset, threshold_value)
else:
    linked_records = linker.match(first_dataset, second_dataset)

logging.info('# duplicate sets {}'.format(len(linked_records)))

In [None]:
# the maximum value for cluster_id found in the database
# we will create new clusters, which will have ids starting from this value onward
cluster_id = int(config_file_data.get('last_cluster_id')) if config_file_data.get('last_cluster_id') else 0

In [None]:
cluster_membership = {}

#linked_records is a list which contains a tuple of record ids
#of the examples which match and their score

#create a dictionary where the keys will be the record id given
#in the read_data method and the values will be tuples which
#contain the cluster id and the score
for cluster, score in linked_records:
    cluster_id += 1    
    for record_id in cluster:
        cluster_membership[record_id] = (cluster_id, score) 

unique_id = cluster_id + 1

# Output

In [None]:
def create_output_file(filename, output_file):
    """
    Create an output file which contains cluster id and the link score columns
    besides the initial columns from the input file 
    
    Input: 'filename' - string object which represents the name of one dataset
           'output_file' - string object which represents the name of the output
                           file 
    """
    
    global unique_id
    
    with open(output_file, 'w') as f:
        writer = csv.writer(f)
        with open(filename) as f_input :
            reader = csv.reader(f_input)

            heading_row = next(reader)
            heading_row.insert(0, 'link_score')
            heading_row.insert(0, 'cluster_id')
            writer.writerow(heading_row)

            for row_id, row in enumerate(reader):
                cluster_details = cluster_membership.get(filename + str(row_id))
                
                #the examples which have not a match with other examples will be put
                #in their own cluster
                if cluster_details is None:
                    cluster_id = unique_id
                    unique_id += 1
                    score = None
                else:
                    cluster_id, score = cluster_details
                    
                row.insert(0, score)
                row.insert(0, cluster_id)
                writer.writerow(row)

In [None]:
output_file_1 = "output_" + input_file_1
output_file_2 = "output_" + input_file_2

In [None]:
logging.info('create output files...')
create_output_file(input_file_1, output_file_1)
create_output_file(input_file_2, output_file_2)

# Evaluation

In [None]:
logging.info('starting evaluation...')

In [None]:
def get_merged_dataframe_containing_only_cluster_id_and_label_column(output_file_1, output_file_2, columns):
    """
    Create a merged dataframe which contains only cluster id and label column
    
    Input: 'output_file_1' - string object which represents the name of the first output file
           'output_file_2' - string object which represents the name of the second output file
           'columns' - a list of string objects containg the name of the common columns
    """
    df1 = pd.read_csv(output_file_1)
    df2 = pd.read_csv(output_file_2)

    df1 = df1[columns]
    df2 = df2[columns]

    frames = [df1, df2]
    
    return pd.concat(frames, axis = 0)

In [None]:
def sql_statement_for_creating_new_table(column_datatypes, tmp_table_name):
    """
    Create a general statement for creating a table in a database.
    
    Input: 'column_datatypes' - list of string objects, where the strings represents the datatypes
                               of each column
           'tmp_table_name' - string containing the name of the new table
    """
    create_table_sql_statement = "CREATE TABLE " + tmp_table_name + " (company_id SERIAL PRIMARY KEY"

    for k,v in column_datatypes.items():
        create_table_sql_statement += ","
        create_table_sql_statement += k
        create_table_sql_statement += " "
        create_table_sql_statement += v

    return create_table_sql_statement + ")"

In [None]:
def sql_statement_for_copying_values_from_file(columns, tmp_table_name):
    """
    Create a general statement for copying values from a csv file in a specific table.
    
    Input: 'columns' - list of string objects, where the strings are names of columns
           'tmp_table_name' - string containing the name of the table where values will
                         be inserted (copied)    
    """
    
    sql_copy_statement = "COPY " + tmp_table_name + " ("
    
    for c in columns:
        sql_copy_statement += c + ", "
    
    return sql_copy_statement[:len(sql_copy_statement) - 2] + ") FROM STDIN CSV HEADER"

In [None]:
def get_columns_and_their_datatypes(result_df):
    """
    Convert the dataframe's datatypes into postgres datatypes
    
    Input: 'result_df' - pandas dataframe 
    """
    
    column_datatypes = {}

    for k,v in dict(result_df.dtypes).items():
        if v == 'int64':
            column_datatypes[k] = 'INT'
        elif v == 'float64':
            column_datatypes[k] = 'FLOAT'
        else:
            column_datatypes[k] = 'VARCHAR(500)'
    
    return column_datatypes

In [None]:
def evaluateDuplicates(found_dupes, true_dupes):
    """
    Calculate precision and recall.
    
    Input: 'found_dupes' - result from 'dupePairs' function (the result after we apply the algorithm)
                          
           'true_dupes' - result from 'dupePairs' function (the ground truth)                          
            
    """
    
    true_positives = found_dupes.intersection(true_dupes)
    false_positives = found_dupes.difference(true_dupes)
    
    if len(found_dupes) == 0:
        precision = None
    else:
        precision = 1 - len(false_positives) / float(len(found_dupes))
        
    if len(true_dupes) == 0:
        recall = None
    else:
        recall = len(true_positives) / float(len(true_dupes))
        

    logging.info('precision {}'.format(precision))

    logging.info('recall {}'.format(recall))

In [None]:
def dupePairs(colname_id, colname_index, table_name):
    """
    Returns a set which contains pairs (tuples) of numbers.
    The pair is an ordered tuple containing two record_ids. The tuple contains records that were put in
    the same cluster.
    
    A record id is the primary key of a table, where we have all the companies, thus, this record id
    is unique in that table.
    
    When we do the evaluation, we extract the record ids by querying the cluster_id column to find the
    clusters that Dedupe made, and, with another call to this function we extract the record ids by
    querying the official ids of the companies to get the true clusters (the ground truth). Then,
    the 'evaluateDuplicates' function receives these two sets of tuples and compares them to calculate
    precision and recall
    
    Input: 'colname_id' - string object containg the name of the column which represents the label
                          of the examples (e.g cluster id or id)
           'colname_index' - string object which represents the record id (the PK from the table)
           'table_name' - string object which represents the name of the table from where we extract
                          the examples
    """
    # TODO better comments
    colname_id = colname_id.split()[0]
    colname_index = colname_index.split()[0]
    table_name = table_name.split()[0]
    
    dupe_d = collections.defaultdict(list)
    
    db_cursor.execute("SELECT " + colname_id + " FROM " + table_name)
    rows_id = np.array(db_cursor.fetchall())
    
    db_cursor.execute("SELECT " + colname_index + " FROM " + table_name)
    rows_company = np.array(db_cursor.fetchall())
    
    for row_id, company_id in zip(rows_id, rows_company):
        dupe_d[row_id[colname_id]].append(company_id[colname_index])
    
    
    dupe_s = set([])
    for (unique_id, cluster) in dupe_d.items():
        if len(cluster) > 1:
            for pair in itertools.combinations(cluster, 2):
                dupe_s.add(frozenset(pair))

    return dupe_s

In [None]:
if config_file_data.get('evaluation'):
    # evaluation parameters
    # precision and recall is done with a database
    db_connection = psycopg2.connect(
        database = config_file_data['database_config'].get('database_name'),
        user =     config_file_data['database_config'].get('username'),
        password = config_file_data['database_config'].get('password'),
        host =     config_file_data['database_config'].get('host'),
        port =     config_file_data['database_config'].get('port'),
        cursor_factory = psycopg2.extras.RealDictCursor
    )
    db_cursor = db_connection.cursor()

    # this is the name of the column based on which we can create correct clusters (the label)
    # for example if the column is a unique id, then we can see which companies match based on it
    # N.B.: This only makes sense if you will NOT use this column in the training process, i.e.,
    # don't give it as a training field!
    label_column_name = config_file_data['evaluation'].get('label_column_name')
    tmp_table_name = "tmp_test_table"
    columns = ['cluster_id', label_column_name]

    result_df = get_merged_dataframe_containing_only_cluster_id_and_label_column(output_file_1, output_file_2,\
                                                                                  columns)

    # write the dataframe as a csv looking file into a string and explicitly make the file pointer point at 
    # the beginning of the file
    s_buf = StringIO()
    result_df.to_csv(s_buf, index = False)
    s_buf.seek(0)

    # convert the dataframe datatypes into postgres datatypes
    column_datatypes = get_columns_and_their_datatypes(result_df)

    logging.info('importing raw data from csv...')
    db_cursor.execute("DROP TABLE IF EXISTS " + tmp_table_name)

    db_cursor.execute(sql_statement_for_creating_new_table(column_datatypes, tmp_table_name))
    db_connection.commit()

    db_cursor.copy_expert(sql_statement_for_copying_values_from_file(columns, tmp_table_name), s_buf)
    db_connection.commit()
    
    logging.info('generating the true and test clusters...')
    true_dupes = dupePairs(label_column_name, 'company_id', tmp_table_name)
    test_dupes = dupePairs('cluster_id', 'company_id', tmp_table_name)

    logging.info("True dupes: {}".format(len(true_dupes)))
    logging.info("Found dupes: {}".format(len(test_dupes)))
    evaluateDuplicates(test_dupes, true_dupes)

    db_cursor.close()
    db_connection.close()