# Assignment 2: Entity Resolution (Part 1)

## Objective

Data scientists often spend 80% of their time on [data preparation](https://www.infoworld.com/article/3228245/the-80-20-data-science-dilemma.html). If your career goal is to become a data scientist, you have to master data cleaning and data integration skills. In this assignment, you will learn how to solve the Entity Resolution (ER) problem, a very common problem in data cleaning and integration. After completing this assignment, you should be able to answer the following questions:

1. What is ER?
2. What are the applications of ER in data integration and cleaning?
3. How to avoid $n^2$ comparisons?
4. How to compute Jaccard Similarity?
5. How to evaluate an ER result?

**Requirements:**

1. Please use [pandas.DataFrame](http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html) rather than spark.DataFrame to manipulate data.

2. Please follow python code style (https://www.python.org/dev/peps/pep-0008/). If TA finds your code hard to read, you will lose points. This requirement will stay for the whole semester.

The data for Assignment 2 (Part 1 and Part 2) can be downloaded from [A2-data.zip](A2-data.zip).

## Overview

ER is defined as finding different records that refer to the same real-world entity, e.g., iPhone 4-th generation vs. iPhone four. It is central to data integration and cleaning. In this assignment, you will learn how to apply ER in a data integration setting. But the program that you are going to write can be easily extended to a data-cleaning setting, being used to detect _duplication values_.   

Imagine that you want to help your company's customers to buy products at a cheaper price. In order to do so, you first write a [web scraper](https://nbviewer.jupyter.org/github/sfu-db/bigdata-cmpt733/blob/master/Assignments/A1/A1-1.ipynb) to crawl product data from Amazon.com and Google Shopping, respectively, and then integrate the data together. Since the same product may have different representations in the two websites, you are facing an ER problem.

Existing ER techniques can be broadly divided into two categories: similarity-based (Part 1) and learning-based (Part 2).


## Similarity Join

Unlike a learning-based technique, a similarity-based technique (a.k.a similarity join) does not need any label data. It first chooses a similarity function and a threshold, and then returns the record pairs whose similarity values are above the threshold. These returned record pairs are thought of as matching pairs, i.e., referring to the same real-world entity.

Depending on particular applications, you may need to choose different similarity functions. In this assignment, we will use Jaccard similarity, i.e., $\textsf{Jaccard}(r, s) = \big|\frac{r \cap s}{r \cup s}\big|$. Here is the formal definition of this problem.

> **Jaccard-Similarity Join**: Given two DataFrames, R and S, and a threshold $\theta \in (0, 1]$, the jaccard-similarity join problem aims to find all record pairs $(r, s) \in R \times S$ such that $\textsf{Jaccard}(r, s) \geq \theta$  

To implement similarity join, you need to address the following challenges:

1. Jaccard is used to quantify the similarity between two sets instead of two records. You need to convert each record to a set.

2. A naive implementation of similarity join is to compute Jaccard for all $|R \times S|$ possible pairs. Imagine R and S have one million records. This requires doing 10^12 pair comparisons, which is extremely expensive. Thus, you need to know how to avoid n^2 comparisons.

3. The output of ER is a set of matching pairs, where each pair is considered as referring to the same real-world entity. You need to know how to evaluate the quality of an ER result.

Next, you will be guided to complete four tasks. After finishing these tasks, I suggest going over the above challenges again, and understand how they are addressed.

Read the code first, and then implement the remaining four functions: `preprocess_df`, `filtering`, `verification`, and `evaluate` by doing Tasks A-D, respectively.

In [46]:
# similarity_join.py
import ast
import re
import pandas as pd

class SimilarityJoin:
    def __init__(self, data_file1, data_file2):
        self.df1 = pd.read_csv(data_file1)
        self.df2 = pd.read_csv(data_file2)

    def preprocess_df(self, df, cols):
        """
            Input:  df represents a pandas DataFrame
                    cols represents the list of columns in df that will be concatenated and tokenized.

            Output: Return a new DataFrame that adds the "joinKey" column to the input df
        """

        def split_and_concat(row):

            # Create one large string from the concatenation of each specified cols

            concatenated = ' '.join([str(row[col]) for col in cols if row[col] is not None])

            # Split the string into tokens, removing puncutation, and convert to lowercase

            tokens = [token.lower() for token in re.split(r'\W+', concatenated) if token and token.lower() != 'nan']
            return str(tokens)

        # Create a new df and apply the split_and_concat function to each row

        new_df = df.copy()
        new_df['joinKey'] = new_df.apply(split_and_concat, axis=1)

        return new_df

    def filtering(self, df1, df2):
        """
            Input: $df1 and $df2 are two input DataFrames, where each of them
                has a 'joinKey' column added by the preprocess_df function

            Output: Return a new DataFrame $cand_df with four columns: 'id1', 'joinKey1', 'id2', 'joinKey2',
                    where 'id1' and 'joinKey1' are from $df1, and 'id2' and 'joinKey2'are from $df2.
                    Intuitively, $cand_df is the joined result between $df1 and $df2 on the condition that
                    their joinKeys share at least one token.

            Comments: Since the goal of the "filtering" function is to avoid n^2 pair comparisons,
                    you are NOT allowed to compute a cartesian join between $df1 and $df2 in the function.
                    Please come up with a more efficient algorithm (see hints in Lecture 2).
        """

        # Convert all 'joinKey' strings to lists

        df1['joinKey'] = df1['joinKey'].apply(ast.literal_eval)
        df2['joinKey'] = df2['joinKey'].apply(ast.literal_eval)

        # Step 1: Build inverse indices for the second dataframe

        def build_token_index(df):
            token_index = {}

            # Iterate over the rows of the frst dataframe

            for index, row in df.iterrows():

                # Get the row id associated with the token and add it to the set

                for token in row['joinKey']:
                    if token not in token_index:
                        token_index[token] = set()
                    token_index[token].add(row['id'])
            return token_index

        token_index_df2 = build_token_index(df2)

        # Step 2: Build a list of matches

        matches = []

        for index, row in df1.iterrows():

            # Build a set containing all of the ids in token_index_df2 that share a token with the current row

            matched_ids = set()
            for token in row['joinKey']:
                matched_ids.update(token_index_df2.get(token, []))

            # For each id, make a dict and add to the list of matches

            for id2 in matched_ids:
                new_tuple = {'id1': row['id'],
                            'joinKey1': row['joinKey'],
                            'id2': id2,
                            'joinKey2': df2[df2['id'] == id2]['joinKey'].iloc[0]}
                matches.append(new_tuple)

        # 'joinKey' columns is a list, not a string, so convert to string

        df1['joinKey'] = df1['joinKey'].apply(lambda x: str(x))
        df2['joinKey'] = df2['joinKey'].apply(lambda x: str(x))


        # Step 3: Build the resultant dataframe

        cand_df = pd.DataFrame(matches)

        # Convert all 'joinKey' lists back to strings

        cand_df['joinKey1'] = cand_df['joinKey1'].apply(lambda x: str(x))
        cand_df['joinKey2'] = cand_df['joinKey2'].apply(lambda x: str(x))

        return cand_df


    def verification(self, cand_df, threshold):
        """
        Input: $cand_df is the output DataFrame from the 'filtering' function.
               $threshold is a float value between (0, 1]

        Output: Return a new DataFrame $result_df that represents the ER result.
                It has five columns: id1, joinKey1, id2, joinKey2, jaccard

        Comments: There are two differences between $cand_df and $result_df
                  (1) $result_df adds a new column, called jaccard, which stores the jaccard similarity
                      between $joinKey1 and $joinKey2
                  (2) $result_df removes the rows whose jaccard similarity is smaller than $threshold
        """

        # Convert both columns to sets for easier computation

        cand_df['joinKey1'] = cand_df['joinKey1'].apply(lambda x: set(ast.literal_eval(x)))
        cand_df['joinKey2'] = cand_df['joinKey2'].apply(lambda x: set(ast.literal_eval(x)))

        # Compute the Jaccard similarity for each pair,
        # computed as the length of their intersection divided by the length of their union

        cand_df['jaccard'] = cand_df.apply(lambda x:
            len(x['joinKey1'].intersection(x['joinKey2'])) /
            len(x['joinKey1'].union(x['joinKey2'])), axis=1)

        # Remove the rows whose Jaccard similarity is smaller than the threshold

        result_df = cand_df[cand_df['jaccard'] >= threshold].copy()

        # Convert both columns back to strings

        result_df['joinKey1'] = result_df['joinKey1'].apply(lambda x: str(x))
        result_df['joinKey2'] = result_df['joinKey2'].apply(lambda x: str(x))

        return result_df


    def evaluate(self, result, ground_truth):
        """
            Input: $result is a list of matching pairs identified by the ER algorithm
                $ground_truth is a list of matching pairs labeld by humans

            Output: Compute precision, recall, and fmeasure of $result based on $ground_truth, and
                    return the evaluation result as a triple: (precision, recall, fmeasure)

        """

        # Calculate T, the number of found results also appearing in the ground truth
        num = 0

        for pair in result:
            if pair in ground_truth:
                num += 1

        T = num
        R = len(result)
        A = len(ground_truth)

        # Calculate precision, |T| / |R| where R is the results found by our algorithm
        precision = T / R

        # Calculate recall, |T| / |A| where A is the labelled ground truth
        recall = T / A

        # Calculate fmeasure
        fmeasure = (2 * precision * recall) / (precision + recall)

        return (precision, recall, fmeasure)

    def jaccard_join(self, cols1, cols2, threshold):
        new_df1 = self.preprocess_df(self.df1, cols1)
        new_df2 = self.preprocess_df(self.df2, cols2)
        print ("Before filtering: %d pairs in total" %(self.df1.shape[0] *self.df2.shape[0]))

        cand_df = self.filtering(new_df1, new_df2)
        print ("After Filtering: %d pairs left" %(cand_df.shape[0]))

        result_df = self.verification(cand_df, threshold)
        print ("After Verification: %d similar pairs" %(result_df.shape[0]))

        return result_df



if __name__ == "__main__":
    er = SimilarityJoin("Amazon_sample.csv", "Google_sample.csv")
    amazon_cols = ["title", "manufacturer"]
    google_cols = ["name", "manufacturer"]
    result_df = er.jaccard_join(amazon_cols, google_cols, 0.5)

    result = result_df[['id1', 'id2']].values.tolist()
    ground_truth = pd.read_csv("Amazon_Google_perfectMapping_sample.csv").values.tolist()
    print ("(precision, recall, fmeasure) = ", er.evaluate(result, ground_truth))

Before filtering: 256 pairs in total
After Filtering: 84 pairs left
After Verification: 6 similar pairs
(precision, recall, fmeasure) =  (1.0, 0.375, 0.5454545454545454)



The program will output the following when running on the sample data:


> Before filtering: 256 pairs in total

> After Filtering: 84 pairs left

> After Verification: 6 similar pairs

> (precision, recall, fmeasure) =  (1.0, 0.375, 0.5454545454545454)


### Task A. Data Preprocessing (Record --> Token Set)

Since Jaccard needs to take two sets as input, your first job is to preprocess DataFrames by transforming each record into a set of tokens. Please implement the following function.  

In [25]:
def preprocess_df(self, df, cols):
    """
        Input: $df represents a DataFrame
               $cols represents the list of columns (in $df) that will be concatenated and be tokenized

        Output: Return a new DataFrame that adds the "joinKey" column to the input $df

        Comments: The "joinKey" column is a list of tokens, which is generated as follows:
                 (1) concatenate the $cols in $df;
                 (2) apply the tokenizer to the concatenated string
        Here is how the tokenizer should work:
                 (1) Use "re.split(r'\W+', string)" to split a string into a set of tokens
                 (2) Convert each token to its lower-case
    """

    def split_and_concat(row):
        # Create one large string from the concatenation of each specified cols
        concatenated = ' '.join([str(row[col]) for col in cols if row[col] is not None])
        # Split the string into tokens, removing puncutation, and convert to lowercase
        tokens = [token.lower() for token in re.split(r'\W+', concatenated) if token]
        return tokens

    # Create a new df and apply the split_and_concat function to each row
    new_df = df.copy()
    new_df['joinKey'] = new_df.apply(split_and_concat, axis=1)

    return new_df

In [41]:
### preprocess_df test ###

# Read in sample data
amazon_sample = pd.read_csv("Amazon_sample.csv")
google_sample = pd.read_csv("Google_sample.csv")

# Expected outputs
new_df1_expected = pd.read_csv("new_df1.csv")
new_df2_expected = pd.read_csv("new_df2.csv")

# Instantiate the class
er = SimilarityJoin("Amazon_sample.csv", "Google_sample.csv")

# Columns to be used for preprocessing
amazon_cols = ["title", "manufacturer"]
google_cols = ["name", "manufacturer"]

# Preprocess the data
new_df1 = er.preprocess_df(amazon_sample, amazon_cols)
new_df2 = er.preprocess_df(google_sample, google_cols)

# Compare the output with the expected DataFrames
assert new_df1['joinKey'].equals(new_df1_expected['joinKey']), "preprocess_df output for Amazon data does not match expected output"
assert new_df2['joinKey'].equals(new_df2_expected['joinKey']), "preprocess_df output for Google data does not match expected output"

For the purpose of testing, you can compare your outputs with new_df1 and new_df2 that can be found from the `Amazon-Google-Sample` folder.

### Task B. Filtering Obviously Non-matching Pairs

To avoid $n^2$ pair comparisons, ER algorithms often follow a filtering-and-verification framework. The basic idea is to first filter obviously non-matching pairs and then only verify the remaining pairs.  

In Task B, your job is to implement the <font color="cyan">filtering</font> function. This function will filter all the record pairs whose joinKeys do not share any token. This is because based on the definition of Jaccard, we can deduce that **if two sets do not share any element (i.e., $r\cap s = \phi$), their Jaccard similarity values must be zero**. Thus, we can safely remove them.

In [None]:
def filtering(self, df1, df2):
    """
        Input: $df1 and $df2 are two input DataFrames, where each of them
            has a 'joinKey' column added by the preprocess_df function

        Output: Return a new DataFrame $cand_df with four columns: 'id1', 'joinKey1', 'id2', 'joinKey2',
                where 'id1' and 'joinKey1' are from $df1, and 'id2' and 'joinKey2'are from $df2.
                Intuitively, $cand_df is the joined result between $df1 and $df2 on the condition that
                their joinKeys share at least one token.

        Comments: Since the goal of the "filtering" function is to avoid n^2 pair comparisons,
                you are NOT allowed to compute a cartesian join between $df1 and $df2 in the function.
                Please come up with a more efficient algorithm (see hints in Lecture 2).
    """

    # Convert all 'joinKey' strings to lists
    import ast

    df1['joinKey'] = df1['joinKey'].apply(ast.literal_eval)
    df2['joinKey'] = df2['joinKey'].apply(ast.literal_eval)

    # Step 1: Build inverse indices for the second dataframe

    def build_token_index(df):
        token_index = {}
        # Iterate over the rows of the frst dataframe
        for index, row in df.iterrows():
            # Get the row id associated with the token and add it to the set
            for token in row['joinKey']:
                if token not in token_index:
                    token_index[token] = set()
                token_index[token].add(row['id'])
        return token_index

    token_index_df2 = build_token_index(df2)

    # Step 2: Build a list of matches

    matches = []

    for index, row in df1.iterrows():
        # Build a set containing all of the ids in token_index_df2 that share a token with the current row
        matched_ids = set()
        for token in row['joinKey']:
            matched_ids.update(token_index_df2.get(token, []))
        # For each id, make a dict and add to the list of matches
        for id2 in matched_ids:
            new_tuple = {'id1': row['id'],
                        'joinKey1': row['joinKey'],
                        'id2': id2,
                        'joinKey2': df2[df2['id'] == id2]['joinKey'].iloc[0]}
            matches.append(new_tuple)

    # 'joinKey' columns is a list, not a string, so convert to string
    df1['joinKey'] = df1['joinKey'].apply(lambda x: str(x))
    df2['joinKey'] = df2['joinKey'].apply(lambda x: str(x))


    # Step 3: Build the resultant dataframe

    cand_df = pd.DataFrame(matches)

    # Convert all 'joinKey' lists back to strings
    cand_df['joinKey1'] = cand_df['joinKey1'].apply(lambda x: str(x))
    cand_df['joinKey2'] = cand_df['joinKey2'].apply(lambda x: str(x))

    return cand_df

In [42]:
### filtering test ###

# Get pre processed df's
new_df1 = pd.read_csv("new_df1.csv")
new_df2 = pd.read_csv("new_df2.csv")

# Expected output
cand_df_expected = pd.read_csv("cand_df.csv")
cand_df_expected = cand_df_expected.sort_values(by=['id1', 'joinKey1', 'id2', 'joinKey2']).reset_index().drop(columns='index')

# Apply the filtering function
cand_df = er.filtering(new_df1, new_df2)
cand_df = cand_df.sort_values(by=['id1', 'joinKey1', 'id2', 'joinKey2']).reset_index().drop(columns='index')

# for index, row in cand_df.head(90).iterrows():
#     print(row.to_dict())

# print()
# for index, row in cand_df_expected.head(90).iterrows():
#     print(row.to_dict())

# print(cand_df)
# print(cand_df_expected)

# Compare the output with the expected DataFrame
assert cand_df.equals(cand_df_expected), "filtering output does not match expected output"


For the purpose of testing, you can compare your output with cand_df that can be found from the `Amazon-Google-Sample` folder.

### Task C. Computing Jaccard Similarity for Survived Pairs

In the second phase of the filtering-and-verification framework, we will compute the Jaccard similarity for each survived pair and return those pairs whose jaccard similarity values are no smaller than the specified threshold.

In Task C, your job is to implement the <font color="cyan">verification</font> function. This task looks simple, but there are a few small "traps".

In [None]:
# prompt: Compute the jacquard similarity between columns 'joinKey1' and 'joinKey2' and store the result in a new column, called jacquard, in result_df

def verification(self, cand_df, threshold):
    """
        Input: $cand_df is the output DataFrame from the 'filtering' function.
               $threshold is a float value between (0, 1]

        Output: Return a new DataFrame $result_df that represents the ER result.
                It has five columns: id1, joinKey1, id2, joinKey2, jaccard

        Comments: There are two differences between $cand_df and $result_df
                  (1) $result_df adds a new column, called jaccard, which stores the jaccard similarity
                      between $joinKey1 and $joinKey2
                  (2) $result_df removes the rows whose jaccard similarity is smaller than $threshold
    """

    # Convert both columns to sets for easier computation
    import ast

    cand_df['joinKey1'] = cand_df['joinKey1'].apply(lambda x: set(ast.literal_eval(x)))
    cand_df['joinKey2'] = cand_df['joinKey2'].apply(lambda x: set(ast.literal_eval(x)))

    # Compute the Jaccard similarity for each pair, computed as the length of their intersection divided by the length of their union

    cand_df['jaccard'] = cand_df.apply(lambda x:
        len(x['joinKey1'].intersection(x['joinKey2'])) /
        len(x['joinKey1'].union(x['joinKey2'])), axis=1)

    # Remove the rows whose Jaccard similarity is smaller than the threshold

    result_df = cand_df[cand_df['jaccard'] >= threshold]

    # Convert both columns back to strings

    result_df['joinKey1'] = result_df['joinKey1'].apply(lambda x: str(x))
    result_df['joinKey2'] = result_df['joinKey2'].apply(lambda x: str(x))

    return result_df


For the purpose of testing, you can compare your output with result_df that can be found from the `Amazon-Google-Sample` folder.

### Task D. Evaluating an ER result

How should we evaluate an ER result? Before answering this question, let's first recall what the ER result looks like. The goal of ER is to identify all matching record pairs. Thus, the ER result should be a set of identified matching pairs, denoted by R. One thing that we want to know is that what percentage of the pairs in $R$ that are truly matching? This is what Precision can tell us. Let $T$ denote the truly matching pairs in $R$. Precision is defined as:
$$Precision = \frac{|T|}{|R|}$$

In addition to Precision, another thing that we care about is that what percentage of truly matching pairs that are identified. This is what Recall can tell us. Let $A$ denote the truly matching pairs in the entire dataset. Recall is defined as:

$$Recall = \frac{|T|}{|A|}$$

There is an interesting trade-off between Precision and Recall. As more and more pairs that are identified as matching, Recall increases while Precision potentially decreases. For the extreme case, if we return all the pairs as matching pairs, we will get a perfect Recall (i.e., Recall = 100%) but precision will be the worst. Thus, to balance Precision and Recall, people often use FMeasure to evaluate an ER result:

$$FMeasure = \frac{2*Precision*Recall}{Precision+Recall}$$

In Task D, you will be given an ER result as well as the ground truth that tells you what pairs are truly matching. Your job is to calculate Precision, Recall and FMeasure for the result.


In [None]:
def evaluate(self, result, ground_truth):
    """
        Input: $result is a list of matching pairs identified by the ER algorithm
            $ground_truth is a list of matching pairs labeld by humans

        Output: Compute precision, recall, and fmeasure of $result based on $ground_truth, and
                return the evaluation result as a triple: (precision, recall, fmeasure)

    """

    # Calculate T, the number of found results also appearing in the ground truth
    num = 0

    for pair in result:
        if pair in ground_truth:
            num += 1

    T = num
    R = len(result)
    A = len(ground_truth)

    # Calculate precision, |T| / |R| where R is the results found by our algorithm
    precision = T / R

    # Calculate recall, |T| / |A| where A is the labelled ground truth
    recall = T / A

    # Calculate fmeasure
    fmeasure = (2 * precision * recall) / (precision + recall)

    return (precision, recall, fmeasure)

## Submission

Implement `preprocess_df`, `filtering`, `verification`, and `evaluate` functions in `similarity_join.py`. Submit your code file (`similarity_join.py`)  to the CourSys activity Assignment 2.