#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# Identify Duplicated Products Using TF-IDF

Entity Resolution, or "[Record linkage][wiki]" is the term used by statisticians, epidemiologists, and historians, among others, to describe the process of joining records from one data source with another that describe the same entity. Our terms with the same meaning include, "entity disambiguation/linking", duplicate detection", "deduplication", "record matching", "(reference) reconciliation", "object identification", "data/information integration", and "conflation".
 
Entity Resolution (ER) refers to the task of finding records in a dataset that refer to the same entity across different data sources (e.g., data files, books, websites, databases). ER is necessary when joining datasets based on entities that may or may not share a common identifier (e.g., database key, URI, National identification number), as may be the case due to differences in record shape, storage location, and/or curator style or preference. A dataset that has undergone ER may be referred to as being cross-linked.

Entity resolution is a common, yet difficult problem in data cleaning and integration. This lab will demonstrate how we can use Apache Spark to apply powerful and scalable text analysis techniques and perform entity resolution across two datasets of commercial products.

[wiki]: https://en.wikipedia.org/wiki/Record_linkage

## Data
Data files for this assignment are from the [metric-learning](https://code.google.com/p/metric-learning/) project and can be found at:
`cs100/lab3`
 
The directory contains the following files:
* **Google.csv**, the Google Products dataset
* **Amazon.csv**, the Amazon dataset
* **Google_small.csv**, 200 records sampled from the Google data
* **Amazon_small.csv**, 200 records sampled from the Amazon data
* **Amazon_Google_perfectMapping.csv**, the "gold standard" mapping
* **stopwords.txt**, a list of common English words
 
Besides the data files, there is a "gold standard" file that contains all of the true mappings between entities in the two datasets. Every row in the gold standard file has a pair of record IDs (one Google, one Amazon) that belong to two record that describe the same thing in the real world. We will use the gold standard to evaluate our algorithms.

In [4]:
dirPath = '/databricks-datasets/cs100/lab3/data-001/'
display(dbutils.fs.ls(dirPath))

## Part 1: Preliminaries

### 1.1 Import data
We read in each of the files and create spark DataFrames using `spark.read.csv()`. We take the first line of each file as its header and use `inferSchema=True` to let it automatically set schemas.

In [7]:
import os

print(dbutils.fs.head(os.path.join(dirPath, 'Amazon.csv'), 500))
print('\n')
print(dbutils.fs.head(os.path.join(dirPath, 'Google.csv'), 500))
print('\n')
print(dbutils.fs.head(os.path.join(dirPath, 'Amazon_Google_perfectMapping.csv'), 500))

In [8]:
amazonDF = spark.read.csv(path=os.path.join(dirPath, 'Amazon.csv'),
                          header=True, 
                          inferSchema=True, )
amazonDF.printSchema()
amazonDF.show(5)

In [9]:
googleDF = spark.read.csv(path=os.path.join(dirPath, 'Google.csv'),
                        header=True, 
                        inferSchema=True)
googleDF.printSchema()
googleDF.show(5)

In [10]:
perfectMapping = spark.read.csv(path=os.path.join(dirPath, 'Amazon_Google_perfectMapping.csv'), 
                                header=True,
                                inferSchema=True)
perfectMapping.printSchema()
perfectMapping.show(5)

### 1.2 Impute data
Here are a few changes we need to make on the two dataframes:
- Change the `price` column in **googleDF** from `string` type to `double`.
- Change the `name` column in **googleDF** to `title` to make the two dataframes have consistent headers.
- Fill all `null` (string) fields with `''` in both **amazonDF** and **googleDF** (`null` value will cause error in text analysis later).

In [12]:
amazonDF = amazonDF.na.fill({'price':0.0}).na.fill('')

googleDF = googleDF\
  .withColumn('price', googleDF['price'].cast('double'))\
  .withColumnRenamed('name', 'title')\
  .na.fill({'price':0.0})\
  .na.fill('')

In [13]:
amazonDF.printSchema()
googleDF.printSchema()

### 1.3 Filter rows

We want to make sure all the `id`s in both `amazonDF` and `googleDF` can be found in `perfectMapping`. Therefore we will eliminate those that do not exist in `perfectMapping`.

In [15]:
amazonId = perfectMapping.select('idAmazon').distinct().withColumnRenamed('idAmazon', 'id')
amazonDF = amazonDF.join(amazonId, 'id', 'inner')
print '# of items in amazonDF:', amazonDF.count()

In [16]:
googleId = perfectMapping.select('idGoogleBase').distinct().withColumnRenamed('idGoogleBase', 'id')
googleDF = googleDF.join(googleId, 'id', 'inner')
print '# of items in googleDF:', googleDF.count()

### 1.4 Repartition data

The default dataFrames all have numbers of partitons equal 1, whereas the vCPU has 8 cores. So we want to repartition the dataframe for a better performance.

In [18]:
print 'amazonDF part #:', amazonDF.rdd.getNumPartitions()
print 'googleDF part #:', googleDF.rdd.getNumPartitions()
print 'perfectMapping part #:', perfectMapping.rdd.getNumPartitions()
print 'Default parallelism:', sc.defaultParallelism

In [19]:
amazonDF = spark.createDataFrame(amazonDF.rdd.repartition(4))
googleDF = spark.createDataFrame(googleDF.rdd.repartition(4))

## Part 2: Bag-of-Words Model and TF-IDF


**Bag-of-Words**

A simple approach to entity resolution is to treat all records as strings and compute their similarity with a string distance function. In this part, we will build some components for performing bag-of-words text-analysis, and then use them to compute record similarity.
*[Bag-of-words][bag-of-words]* is a conceptually simple yet powerful approach to text analysis.
 
The idea is to treat strings, a.k.a. **documents**, as *unordered collections* of words, or **tokens**, i.e., as bags of words.
> **Note on terminology**: a "token" is the result of parsing the document down to the elements we consider "atomic" for the task at hand.  Tokens can be things like words, numbers, acronyms, or other exotica like word-roots or fixed-length character strings.

> Bag of words techniques all apply to any sort of token, so when we say "bag-of-words" we really mean "bag-of-tokens," strictly speaking.
Tokens become the atomic unit of text comparison. If we want to compare two documents, we count how many tokens they share in common. If we want to search for documents with keyword queries (this is what Google does), then we turn the keywords into tokens and find documents that contain them. The power of this approach is that it makes string comparisons insensitive to small differences that probably do not affect meaning much, for example, punctuation and word order.

**Tokenize a String**: *[Tokenization][Tokenization]* is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). 

`RegexTokenizer` allows more advanced tokenization based on regular expression (regex) matching. By default, the parameter “pattern” (regex, default: `"\\s+"`) is used as delimiters to split the input text. Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.

**Remove stopwords**: *[Stopwords][stopwords]* are common (English) words that do not contribute much to the content or meaning of a document (e.g., "the", "a", "is", "to", etc.). Stopwords add noise to bag-of-words comparisons, so they are usually excluded.

`StopWordsRemover` takes as input a sequence of strings (e.g. the output of a Tokenizer) and drops all the stop words from the input sequences. The list of stopwords is specified by the `stopWords` parameter.

**TF-IDF**

Bag-of-words comparisons are not very good when all tokens are treated the same: some tokens are more important than others. Weights give us a way to specify which tokens to favor. With weights, when we compare documents, instead of counting common tokens, we sum up the weights of common tokens. A good heuristic for assigning weights is called "Term-Frequency/Inverse-Document-Frequency," or TF-IDF for short.

Term frequency-inverse document frequency ([TF-IDF][tfidf]) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. Denote a term by *t*, a document by *d*, and the corpus by *D*. Term frequency *TF(t,d)* is the number of times that term *t* appears in document *d*, while document frequency *DF(t,D)* is the number of documents that contains term *t*. If we only use term frequency to measure the importance, it is very easy to over-emphasize terms that appear very often but carry little information about the document, e.g. “a”, “the”, and “of”. If a term appears very often across the corpus, it means it doesn’t carry special information about a particular document. Inverse document frequency is a numerical measure of how much information a term provides:

$$IDF(t, D) = \log\frac{\left|D\right| + 1}{DF(t, D) + 1}.$$

where *|D|* is the total number of documents in the corpus. Since logarithm is used, if a term appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:

$$TFIDF(t,d,D)=TF(t,d)\cdot IDF(t,D).$$


There are several variants on the definition of term frequency and document frequency. In MLlib, we separate TF and IDF to make them flexible.

**TF**: Both `HashingTF` and `CountVectorizer` can be used to generate the term frequency vectors.

`HashingTF` is a *Transformer* which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a "set of terms" might be a bag of words. HashingTF utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. The hash function used here is MurmurHash 3. Then term frequencies are calculated based on the mapped indices. This approach avoids the need to compute a global term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing. To reduce the chance of collision, we can increase the target feature dimension, i.e. the number of buckets of the hash table. Since a simple modulo is used to transform the hash function to a column index, it is advisable to use a power of two as the feature dimension, otherwise the features will not be mapped evenly to the columns. 

`CountVectorizer` converts text documents to vectors of term counts.

**IDF**: IDF is an *Estimator* which is fit on a dataset and produces an *IDFModel*. The *IDFModel* takes feature vectors (generally created from `HashingTF` or `CountVectorizer`) and scales each column. Intuitively, it down-weights columns which appear frequently in a corpus.

[tfidf]: https://en.wikipedia.org/wiki/Tf%E2%80%93idf

[stopwords]: https://en.wikipedia.org/wiki/Stop_words

[Tokenization]: (https://en.wikipedia.org/wiki/Lexical_analysis#Tokenization)

[bag-of-words]: https://en.wikipedia.org/wiki/Bag-of-words_model

### 2.1 Build a bag-of-words-TFIDF pipeline

Now we want to create a pipeline to include all the above stages and then transform the `title`, `description`, and `manufacturer` columns in the combined dataset (`amazonDF` + `googleDF`).

Stages:

- RegexTokenizer
- StopWordsRemover
- CountVectorizer
- IDF

After the transformation, we will only keep `id`, `price`, `titleIDF`, `descriptionIDF`, and `manufacturerIDF` for the next step.

In [22]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF

## columns for the stages
columns = ['title', 'description', 'manufacturer'] 
## minDFs for CountVectorizer
minDFs = {'title':2.0, 'description':4.0, 'manufacturer':2.0}

preProcStages = []

for col in columns:
  regexTokenizer = RegexTokenizer(gaps=False, pattern='\w+', inputCol=col, outputCol=col+'Token')
  stopWordsRemover = StopWordsRemover(inputCol=col+'Token', outputCol=col+'SWRemoved')
  countVectorizer = CountVectorizer(minDF=minDFs[col], inputCol=col+'SWRemoved', outputCol=col+'TF')
  idf = IDF(inputCol=col+'TF', outputCol=col+'IDF') 
  preProcStages += [regexTokenizer, stopWordsRemover, countVectorizer, idf]
  
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=preProcStages)

In [23]:
## combine two dataframes
dataCombined = amazonDF.union(googleDF)
## pipeline fit/transform
model = pipeline.fit(dataCombined)
dataCombined = model.transform(dataCombined)
## select columns
dataCombined = dataCombined.select('id', 'price', 'titleIDF', 'descriptionIDF', 'manufacturerIDF')

In [24]:
dataCombined.sample(False, .01).show()

## Part 3: Text Similarity and Price Similarity

### 3.1 Text Similarity

Now we are ready to do text comparisons in a formal way. The metric of string distance we will use is called *[cosine similarity][cosine]*. We will treat each document as a vector in some high dimensional space. Then, to compare two documents we compute the cosine of the angle between their two document vectors.
 
Given two vectors *a* and *b*, the cosine of the angle between them can be calculated via the formula below:

\\[ similarity = \cos \theta = \frac{a \cdot b}{\|a\| \|b\|} = \frac{\sum a_i b_i}{\sqrt{\sum a_i^2} \sqrt{\sum b_i^2}} \\]
 
Setting aside the algebra, the geometric interpretation is more intuitive. The angle between two document vectors is small if they share many tokens in common, because they are pointing in roughly the same direction. For that case, the cosine of the angle will be large. Otherwise, if the angle is large (and they have few words in common), the cosine is small. Therefore, cosine similarity scales proportionally with our intuitive sense of similarity.

In Spark, the `SparseVector` class provides methods `SparseVector.dot(other)` and `SparseVector.norm(p)`, which can be used for calculating the cosine similarity between two SparseVectors. [[source](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.SparseVector)]
[cosine]: https://en.wikipedia.org/wiki/Cosine_similarity

In [27]:
import math

def cosine_similarity(X, Y):
  denom = X.norm(2) * Y.norm(2)
  if denom == 0.0:
    return -1.0
  else:
    return X.dot(Y) / float(denom)

### 3.2 Price Similarity

For the prices of any two products, we define their price similarity via the formula below:

\\[ price\\_similarity = 1 - \frac{\| X - Y \|}{ X + Y } \\]

In [29]:
def price_similarity(X, Y):
  denom = X + Y
  if denom == 0.0:
    return -1.0
  else:
    return 1 - abs(X - Y) / float(denom)

Let's test these functions with two matching products:

In [31]:
aID, gID = u'b000g80lqo', u'http://www.google.com/base/feeds/snippets/18441188461196475272'

aProd = dataCombined.filter(dataCombined['id'] == aID).collect()[0]
gProd = dataCombined.filter(dataCombined['id'] == gID).collect()[0]

print amazonDF.filter(amazonDF['id'] == aID)\
    .select('title', 'price')\
    .union(googleDF.filter(googleDF['id'] == gID).select('title', 'price'))\
    .show(truncate=False)

print 'Title similarity: ', cosine_similarity(aProd['titleIDF'], gProd['titleIDF'])
print 'DES similarity:   ', cosine_similarity(aProd['descriptionIDF'], gProd['descriptionIDF'])
print 'MFR similarity:   ', cosine_similarity(aProd['manufacturerIDF'], gProd['manufacturerIDF'])
print 'Price similarity: ', price_similarity(aProd['price'], gProd['price'])

### 3.3 Calculate similarity scores

Now let's create a new dataframe which contains all the paired products from **amazon** and **google**, and then add similarity scores to each pair.

To do that, our plan is:

1. Broadcast `dataCombined` as a lookup table.
2. Define a function `similarities` to calculate all the four similarity scores for given `idAmazon` and `idGoogleBase`.
3. Calculate the similarity scores for all pairs of `idAmazon` and `idGoogleBase` and then convert them to a dataframe.
4. Create `label` column by performing `LEFT JOIN` with `perfectMapping` dataframe and impute `null` with 0.

In [33]:
# Step 1
# Broadcast dataCombined as a lookup table.
# use collectAsMap() to create a python dict object for fast lookup
lookupTable = sc.broadcast(dataCombined.rdd.map(lambda x: (x['id'], 
                                                           {'price':x['price'], 
                                                            'titleIDF':x['titleIDF'], 
                                                            'descriptionIDF':x['descriptionIDF'],
                                                            'manufacturerIDF':x['manufacturerIDF']})).collectAsMap())

In [34]:
# to get the value of an item, for example, u'b00002s8if', we can do:
print(lookupTable.value[u'b00002s8if']['price'])

In [35]:
# Step 2
# Define a function similarities to calculate all the similarity scores for the given pair of idAmazon and idGoogleBase.
def similarities(idAmazon, idGoogleBase, lookupTable):
  X, Y = lookupTable.value[idAmazon], lookupTable.value[idGoogleBase]
  price_simi = price_similarity(X['price'], Y['price'])
  title_simi = cosine_similarity(X['titleIDF'], Y['titleIDF'])
  descr_simi = cosine_similarity(X['descriptionIDF'], Y['descriptionIDF'])
  manuf_simi = cosine_similarity(X['manufacturerIDF'], Y['manufacturerIDF'])
  return price_simi, title_simi, descr_simi, manuf_simi

In [36]:
# Step 3
# Calculate the similarity scores for all pairs of idAmazon and idGoogleBase and then convert them to a dataframe.
from pyspark.sql import Row

pairId = amazonDF.select('id').rdd.flatMap(list).cartesian(googleDF.select('id').rdd.flatMap(list))
pairProdDF = pairId.map(lambda x: x + similarities(x[0], x[1], lookupTable))

measureMapping = spark.createDataFrame(pairProdDF.map(lambda x: Row(idAmazon=x[0], 
                                                                    idGoogleBase=x[1], 
                                                                    price_simi=float(x[2]),
                                                                    title_simi=float(x[3]),
                                                                    descr_simi=float(x[4]), 
                                                                    manuf_simi=float(x[5]))))

In [37]:
# Step 4
# Create label column by performing LEFT JOIN with perfectMapping dataframe and impute null with 0.
# For the JOIN operation, broadcast perfectMapping for efficiency
from pyspark.sql.functions import lit
from pyspark.sql.functions import broadcast

measureMapping = measureMapping.join(broadcast(perfectMapping.withColumn('label', lit(1.0))), 
                                     ['idAmazon', 'idGoogleBase'], 
                                     'left')
measureMapping = measureMapping.na.fill({'label':0.0})

In [38]:
measureMapping.sample(True, .001).show()

### 3.4 Cache data

We now can cache the dataframe for the next step - classifier building!

## Part 4: Building a Classifier

### 4.1 Training / Test Splitting

Now we can build our machine learning classifier to identify the true duplicate pairs. Let's first create our training / test datasets.

In [43]:
measureMapping.cache()
measureMapping.groupBy('label').count().show()

In [44]:
train_0, test_0 = measureMapping.filter('label = 0.0').randomSplit([0.8, 0.2])
train_1, test_1 = measureMapping.filter('label = 1.0').randomSplit([0.8, 0.2])
test = test_0.union(test_1)

To visualize the distributions of all the similarity scores, we need to undersample the majority class (label 0) since the numbers are way more than (X1100) that of the minority class (label 1).

In [46]:
# trainUnderSampled contains 1% of train_0 and all train_1
trainUnderSampled = train_0.sample(False, .01).union(train_1)
trainUnderSampled = spark.createDataFrame(trainUnderSampled.rdd.repartition(8))
trainUnderSampled.groupBy('label').count().show()

In [47]:
display(trainUnderSampled.sample(False, .07))

### 4.2 DecisionTree Classifier

Let's build a DecisionTree Classifier with the under sampled data first.

In [49]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

assembler = VectorAssembler(inputCols=trainUnderSampled.columns[2:-1],
                            outputCol='features')
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')
pipeline = Pipeline(stages=[assembler, dt])

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [3, 5, 7]) \
    .build()

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', 
                                          metricName='areaUnderROC')


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

In [50]:
cvModel = crossval.fit(trainUnderSampled)

In [51]:
prediction = cvModel.transform(test)
display(prediction.groupBy('label', 'prediction').count())

### 4.3 Model Evaluation

Let's evaluate the decisionTree model with the test datset using a confusion matrix and a roc plot

In [53]:
AUC_score = evaluator.evaluate(prediction)
print 'AUC score: {}'.format(AUC_score)

In [54]:
def roc_plot(prediction, labelCol, scoreCol, pos_label):
  import matplotlib.pyplot as plt
  from sklearn import metrics
  
  scores = prediction.select(scoreCol).rdd.map(lambda x: list(x)[0].toArray()[1]).collect()
  labels = prediction.select(labelCol).rdd.flatMap(list).collect()
  
  fpr, tpr, thresholds = metrics.roc_curve(labels, scores, pos_label=pos_label)
#   auc_score = metrics.auc(fpr, tpr)
  auc_score = evaluator.evaluate(prediction)
  fig = plt.figure()
  plt.plot([0, 1], [0, 1], linestyle='--', lw=2, color='k')
  plt.plot(fpr, tpr, color='g', linestyle='--',
           label='Mean ROC (area = %0.3f)' % auc_score, lw=4)
  plt.legend(loc="lower right")
  display(fig)

In [55]:
roc_plot(prediction, labelCol='label', scoreCol='probability', pos_label=1.0)

In [56]:
prediction.select('probability').show(truncate=False)

### 4.5 Ensemble of DecisionTree Classifiers

In order to use most of the data, we will now build an ensemble model or the DecisionTree classifiers.

In [59]:
train_0 = spark.createDataFrame(train_0.rdd.repartition(4)).cache()
train_1 = spark.createDataFrame(train_1.rdd.repartition(4)).cache()
train_0_splits = train_0.rdd.randomSplit([1]*20)
# measureMapping.unpersist()

In [60]:
train_1.union(spark.createDataFrame(train_0_splits[0].sample(False, .2))).rdd.getNumPartitions()

In [61]:
bestModels = []
print('Begin training')
for i, train_0_split in enumerate(train_0_splits):
  print('Split: {}'.format(i))
  underSampled = train_1.union(spark.createDataFrame(train_0_split.sample(False, .2)))
  cvModel = crossval.fit(underSampled)
  bestModels.append(cvModel.bestModel)

In [62]:
for i, bestModel in enumerate(bestModels):
  print('saving bestModel {}'.format(i))
  bestModel.write().overwrite().save('taxiAnalysisER_bestModels/bm_{}'.format(i))

In [63]:
from pyspark.ml import PipelineModel

bestModels = []

for i in xrange(20):
  bestModels.append(PipelineModel.load('taxiAnalysisER_bestModels/bm_{}'.format(i)))

In [64]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def voting(*votes):
  return float(sum(votes) / len(votes) > .5)

udfVoting=udf(voting, FloatType())

def majority_vote(models, data):
  predictions = data.select('idAmazon', 'idGoogleBase', 'label')
  for i, model in enumerate(models):
    temp = model.transform(data)\
        .select('idAmazon', 'idGoogleBase', 'prediction')\
         .withColumnRenamed('prediction', 'prediction_'+str(i))
    predictions = predictions.join(temp, ['idAmazon', 'idGoogleBase'], 'left')
  predictions = predictions.withColumn('prediction', udfVoting(*predictions.columns[3:]))
  return predictions

In [65]:
predictions = majority_vote(bestModels, test)

In [66]:
display(predictions.groupBy('label', 'prediction').count())