<a href="https://colab.research.google.com/github/alexander-n-thomas/spark-nlp-book-prod/blob/master/2_6_Information_Retrieval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.5.1

zsh:1: command not found: apt-get
java version "15.0.1" 2020-10-20
Java(TM) SE Runtime Environment (build 15.0.1+9-18)
Java HotSpot(TM) 64-Bit Server VM (build 15.0.1+9-18, mixed mode, sharing)
Collecting pyspark==2.4.4
  Using cached pyspark-2.4.4-py2.py3-none-any.whl
Collecting py4j==0.10.7
  Using cached py4j-0.10.7-py2.py3-none-any.whl (197 kB)
Installing collected packages: py4j, pyspark
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pyspark-stubs 3.0.0.post2 requires pyspark<3.1.0,>=3.0.0.dev0, but you have pyspark 2.4.4 which is incompatible.[0m
Successfully installed py4j-0.10.9 pyspark-3.1.1
Collecting spark-nlp==2.5.1
  Using cached spark_nlp-2.5.1-py2.py3-none-any.whl (121 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-3.3.2


In [2]:
! mkdir -p data

In [3]:
! wget https://archive.ics.uci.edu/ml/machine-learning-databases/20newsgroups-mld/mini_newsgroups.tar.gz

--2021-11-30 01:52:09--  https://archive.ics.uci.edu/ml/machine-learning-databases/20newsgroups-mld/mini_newsgroups.tar.gz
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1860687 (1.8M) [application/x-httpd-php]
Saving to: ‘mini_newsgroups.tar.gz.1’


2021-11-30 01:52:12 (1.34 MB/s) - ‘mini_newsgroups.tar.gz.1’ saved [1860687/1860687]



In [4]:
! tar xzf mini_newsgroups.tar.gz -C ./data/

# Information Retrieval

In the previous chapter we came across common words that made it difficult to characterize a corpus. This is a problem for different kinds NLP tasks. Fortunately, the field of information retrieval has developed many techniques that can be used to improve a variety of NLP applications.

Earlier, we talked about how text data exists, and more is being generated every day. We need some way to manage and search through this data. If there is an ID or title, we can of course have an index on this data, but how do we search by content? With structured data, we can create logical expressions and retrieve all rows that satisfy the expressions. This can also be done with text, though less exactly.

The foundation of information retrieval predates computers. Information retrieval focuses on how to find specific pieces of information in a larger set of information, especially information in text data. The most common type of task in information retrieval is search—in other words, document search.

The following are the components of a document search:

* Query $q$  
A logical statement describing the document or kind of document you are looking for

* Query term $q_t$  
A term in the query, generally a token

* Corpus of documents $D$  
A collection of documents

* Document $d$  
A document in D with terms t_d that describe the document

* Ranking function $r(q, D)$  
A function that ranks the documents in D according to relevance to the query q

* Result $R$  
The ranked list of documents

Before we get into how to implement these components, we need to consider a technical problem. How can we quickly access documents based on the information within them? If we have to scan every document, then we could not search large collections of documents. To solve this problem we use an inverted index.

## Inverted Indices

Originally, indexing was a means of organizing and labeling information in a way that made retrieving it easier. For example, libraries use indexing to organize and find books. The Dewey Decimal Classification system is a way to index books based on their subject matter. We can also have indices based on titles, authors, publication dates, and so on. Another kind of index can often be found at the back of a book. This is a list of concepts in the book and pages on which to find them.

The index in inverted index is slightly different than the traditional index; instead, it takes inspiration from the mathematical concept of indexing—that is, assigning indices to an element of a set. Recall our set of documents $D$. We can assign a number to each document, creating mapping from integers to documents, $i \rightarrow d$.

Let's create this index for our `DataFrame`. Normally, we would store an inverted index in a data store that allows for quick lookups. Spark `DataFrames` are not for quick lookups. We will introduce the tools used for search.

## Building an Inverted Index

Let's look at how we can build an inverted index in Spark. Here are the steps we will follow:

1. Load the data.  

2. Create the index: $i \rightarrow d*$
  * Since we are using Spark, we will generate this index on the rows.

3. Process the text.

4. Create the inverted index from terms to documents: $t_d \rightarrow i*$

### Step 1

We will be creating an inverted index for the mini_newsgroups data set.

In [5]:
import os

from pyspark.sql.types import *
from pyspark.sql.functions import collect_set
from pyspark.sql import Row
from pyspark.ml import Pipeline

import sparknlp
from sparknlp import DocumentAssembler, Finisher
from sparknlp.annotator import *

spark = sparknlp.start()

ImportError: cannot import name 'print_exec' from 'pyspark.cloudpickle' (/Users/hyun.kim/anaconda3/envs/spark-study2/lib/python3.9/site-packages/pyspark/cloudpickle/__init__.py)

In [6]:
path = os.path.join('data', 'mini_newsgroups', '*')
texts = spark.sparkContext.wholeTextFiles(path)

schema = StructType([
    StructField('path', StringType()),
    StructField('text', StringType()),
])

texts = spark.createDataFrame(texts, schema=schema).persist()

NameError: name 'spark' is not defined

### Step 2

Now we need to create the index. Spark assumes the data is distributed, so to assign an index, we need to use the lower-level `RDD` API. The zipWithIndex will sort the data on the workers and assign the indices.

In [None]:
rows_w_indexed = texts.rdd.zipWithIndex()
(path, text), i = rows_w_indexed.first()

print(i)
print(path)
print(text[:200])

Now that we have created the index, we need to create a `DataFrame` like we did previously, except now we need to add our index into our `Rows`.

In [7]:
indexed = rows_w_indexed.map(
    lambda row_index: Row(
        index=row_index[1], 
        **row_index[0].asDict())
)
(i, path, text) = indexed.first()

NameError: name 'rows_w_indexed' is not defined

In [8]:
indexed_schema = schema.add(StructField('index', IntegerType()))

indexed = spark.createDataFrame(indexed, schema=indexed_schema)\
    .persist()

NameError: name 'schema' is not defined

In [15]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
indexed = texts.withColumn("index", row_number().over(Window.orderBy("path")))

ImportError: cannot import name '_parse_memory' from 'pyspark.util' (/Users/hyun.kim/anaconda3/envs/spark-study2/lib/python3.9/site-packages/pyspark/util.py)

In [9]:
indexed.limit(10).toPandas()

NameError: name 'indexed' is not defined

Each document $d$ is a collection of terms, $t_d$. So our index is the mapping from integers to collections of terms.

An inverted index, on the other hand, is the mapping from terms $t_d$ to integers, $\text{inv-index}: t_d \rightarrow i, j, k, ...$ This allows us to quickly look up what documents contain a given term.

### Step 3

Now let's process the text

In [10]:
from sparknlp.pretrained import PretrainedPipeline

assembler = DocumentAssembler()\
    .setInputCol('text')\
    .setOutputCol('document')
tokenizer = Tokenizer()\
    .setInputCols(['document'])\
    .setOutputCol('token')
lemmatizer = LemmatizerModel.pretrained()\
    .setInputCols(['token'])\
    .setOutputCol('lemma')
normalizer = Normalizer()\
    .setInputCols(['lemma'])\
    .setOutputCol('normalized')\
    .setLowercase(True)
finisher = Finisher()\
    .setInputCols(['normalized'])\
    .setOutputCols(['normalized'])\
    .setOutputAsArray(True)

pipeline = Pipeline().setStages([
    assembler, tokenizer, 
    lemmatizer, normalizer, finisher
]).fit(indexed)

indexed_w_tokens = pipeline.transform(indexed)

ImportError: cannot import name 'print_exec' from 'pyspark.cloudpickle' (/Users/hyun.kim/anaconda3/envs/spark-study2/lib/python3.9/site-packages/pyspark/cloudpickle/__init__.py)

In [11]:
indexed_w_tokens.limit(10).toPandas()

NameError: name 'indexed_w_tokens' is not defined

In [12]:
doc_index = indexed_w_tokens.select('index', 'path', 'text').toPandas()
doc_index = doc_index.set_index('index')

NameError: name 'indexed_w_tokens' is not defined

### Step 4

Now, let us create our inverted index. We will use Spark SQL to do this.

```
SELECT term, collect_set(index) AS documents
FROM (
    SELECT index, explode(normalized) AS term
    FROM indexed_w_tokens
)
GROUP BY term
ORDER BY term
```

In [None]:
inverted_index = indexed_w_tokens\
    .selectExpr('index', 'explode(normalized) AS term')\
    .distinct()\
    .groupBy('term').agg(collect_set('index').alias('documents'))\
    .persist()

In [None]:
inverted_index.show(10)

This is our inverted index. We can see that the term "amplifier" occurs in documents 630, 624, and 654. With this information, we can quickly find all documents that contain particular terms.

Another benefit is that this inverted index is based on the size of our vocabulary, not on the amount of text in our corpus, so it is not big data. The inverted index grows only with new terms and document indices. For very large corpora, this can still be a large amount of data for a single machine. In the case of the mini_newsgroups data set, however, it is easily manageable.

Let's see how big our inverted index is.

In [None]:
inverted_index.count()

For us, since we have such a small number of documents, the inverted index has more entries than the index.  Word frequencies follow Zipf's law—that is, the frequency of a word is inversely proportional to its rank when sorted. As a result, the most-used English words are already in our inverted index. This can be further constrained by not tracking words that don't occur at least a certain number of times.

In [None]:
inverted_index = {
    term: set(docs) 
    for term, docs in inverted_index.collect()
}

 Now we can begin our most basic ranking function—simple Boolean search. In this case, let's look up all the documents that contain the words "language" or "information."

In [None]:
lang_docs = inverted_index['language']
print('docs', ('{}, ' * 10).format(*list(lang_docs)[:10]), '...')
print('number of docs', len(lang_docs))

In [None]:
info_docs = inverted_index['information']
print('docs', ('{}, ' * 10).format(*list(info_docs)[:10]), '...')
print('number of docs', len(info_docs))

In [None]:
filter_set = list(lang_docs | info_docs)
print('number of docs in filter set', len(filter_set))

In [None]:
intersection = list(lang_docs & info_docs)
print('number of docs in intersection set', len(intersection))

Let's print out lines from our filter set. Here, the filter set is the result set, but generally, the filter set is ranked by $r(q, D)$, which results in the result set.

Let's look at the lines in which we see the occurrences, to get an idea about our result set.

In [None]:
k = 1
for i in filter_set:
    path, text = doc_index.loc[i]
    lines = text.split('\n')
    print(path.split('/')[-1], 'length:', len(text))
    for line_number, line in enumerate(lines):
        if 'information' in line or 'language' in line:
            print(line_number, line)
    print()
    k += 1
    if k > 5:
        break

Now that we have our result set, how should we rank our results? We could just count the number of occurrences of our search term, but that would be biased toward long documents. Also, what happens if our query includes a very common word like "the"? If we just use the counts, common words like "the" will dominate our results. In our result set, the one with the most occurrences of the query terms has the longest text. We could say that the more terms found in the document, the more relevant the document is, but this has problems too. What do we do with one-term queries? In our example, only one document has both. Again, if our query has a common word—for example, "the cat in the hat"—should "the" and "in" have the same importance as "cat" and "hat"? To solve this problem, we need a more flexible model for our documents and queries.

## Vector Space Model

In the previous chapter, we introduced the concept of vectorizing documents. We talked about creating binary vectors, where 1 means that the word is present in the document. We can also use the counts.

When we convert a corpus to a collection of vectors, we are implicitly modeling our language as a vector space. In this vector space, each dimension represents one term. This has many benefits and drawbacks. It is a simple way to represent our text in a manner that allows machine learning algorithms to work with it. It also allows us to represent the vectors sparsely. On the other hand, we lose the information contained in the word order. This process also creates high dimensional data sets, which can be problematic to some algorithms.

Let's calculate the vectors for our data set. In the previous chapter, we used the `CountVectorizer` for this. We will build the vectors in Python, but the way we will build them will help us understand how libraries implement vectorization.

In [None]:
class SparseVector(object):
    
    def __init__(self, indices, values, length):
        # if the indices are not in ascending order, we need 
        # to sort them
        is_ascending = True
        for i in range(len(indices) - 1):
            is_ascending = is_ascending and indices[i] < indices[i+1]
        if not is_ascending:
            pairs = zip(indices, values)
            sorted_pairs = sorted(pairs, key=lambda x: x[0])
            indices, values = zip(*sorted_pairs)
        self.indices = indices
        self.values = values
        self.length = length
        
    def __getitem__(self, index):
        try:
            return self.values[self.indices.index(index)]
        except ValueError:
            return 0.0
        
    def dot(self, other):
        assert isinstance(other, SparseVector)
        assert self.length == other.length
        res = 0
        i = j = 0
        while i < len(self.indices) and j < len(other.indices):
            if self.indices[i] == other.indices[j]:
                res += self.values[i] * other.values[j]
                i += 1
                j += 1
            elif self.indices[i] < other.indices[j]:
                i += 1
            elif self.indices[i] > other.indices[j]:
                j += 1
        return res
    
    def hadamard(self, other):
        assert isinstance(other, SparseVector)
        assert self.length == other.length
        res_indices = []
        res_values = []
        i = j = 0
        while i < len(self.indices) and j < len(other.indices):
            if self.indices[i] == other.indices[j]:
                res_indices.append(self.indices[i])
                res_values.append(self.values[i] * other.values[j])
                i += 1
                j += 1
            elif self.indices[i] < other.indices[j]:
                i += 1
            elif self.indices[i] > other.indices[j]:
                j += 1
        return SparseVector(res_indices, res_values, self.length)
    
    def sum(self):
        return sum(self.values)
    
    def __repr__(self):
        return 'SparseVector({}, {})'.format(
            dict(zip(self.indices, self.values)), self.length)

We need to make two passes over all the documents. In the first pass, we will get our vocabulary and the counts. In the second pass we will construct the vectors.

In [None]:
from collections import Counter

vocabulary = set()
vectors = {}

for row in indexed_w_tokens.toLocalIterator():
    counts = Counter(row['normalized'])
    vocabulary.update(counts.keys())
    vectors[row['index']] = counts
    
vocabulary = list(sorted(vocabulary))
inv_vocabulary = {term: ix for ix, term in enumerate(vocabulary)}
vocab_len = len(vocabulary)

Now that we have this information, we need to go back over our word counts and construct actual vectors.

In [None]:
for index in vectors:
    terms, values = zip(*vectors[index].items())
    indices = [inv_vocabulary[term] for term in terms]
    vectors[index] = SparseVector(indices, values, vocab_len)

In [None]:
vectors[42]

In [None]:
vocabulary[3598]

In [13]:
vocabulary[37876]

NameError: name 'vocabulary' is not defined

As we discussed previously, there are many drawbacks to using only the counts for a search. The concern is that words that are generally common in English will have more impact than the less common words. There are a couple strategies for addressing this. First, let's look at the simplest solution—removing the common words.

### Stop-Word Removal

These common words we are looking to remove are called stop words.  This term was coined in the 1950s by Hans Peter Luhn, a pioneer in information retrieval. Default stop-word lists are available, but it is often necessary to modify generic stop-word lists for different tasks.

In [None]:
from pyspark.ml.feature import StopWordsRemover

sw_remover = StopWordsRemover() \
    .setInputCol("normalized") \
    .setOutputCol("filtered") \
    .setStopWords(StopWordsRemover.loadDefaultStopWords("english"))

filtered = sw_remover.transform(indexed_w_tokens)

In [None]:
from collections import Counter

vocabulary_filtered = set()
vectors_filtered = {}

for row in filtered.toLocalIterator():
    counts = Counter(row['filtered'])
    vocabulary_filtered.update(counts.keys())
    vectors_filtered[row['index']] = counts
    
vocabulary_filtered = list(sorted(vocabulary_filtered))
inv_vocabulary_filtered = {
    term: ix 
    for ix, term in enumerate(vocabulary_filtered)
}
vocab_len_filtered = len(vocabulary)

In [None]:
for index in vectors:
    terms, values = zip(*vectors_filtered[index].items())
    indices = [inv_vocabular_filteredy[term] for term in terms]
    vectors_filtered[index] = \
        SparseVector(indices, values, vocab_len_filtered)

In [None]:
vectors[42]

In [None]:
vocabulary[3264]

In [None]:
vocabulary[38226]

The words "bake" and "timmons" seem more informative. You should explore your data when determining what words should be included in the stop-word list.

It may seem like a daunting task to list all the words we don't want. However, recalling what we discussed about morphology, we can narrow down what we want to remove. We want to remove unbound function morphemes.

A fluent speaker of a language, who knows these basics of morphology, is able to create a reasonably good list. However, this still leaves two concerns. What if we need to keep some common words? What if we want to remove some common lexical morphemes? You can modify the list, but that still leaves one last concern. How do we handle queries like "fictional cats"? The word "fictional" is less common than "cats," so it makes sense that the former should be more important in determining what documents are returned. Let's look at how we can implement this using our data.

## Inverse Document Frequency

Instead of manually editing our vocabulary, we can try and weight the words. We need to find some way of weighting the words using their "commonness." One way to define "commonness" is by identifying the number of documents in our corpus that contain the word.  This is generally called document frequency. We want words with high document frequency to be down-weighted, so we are interested in using inverse document frequency (IDF).

 We take these values and multiply them by the term frequencies, which are the frequencies of words in a given document.  The result of multiplying inverse document frequency by term frequency gives us the TF.IDF.

\begin{equation}
\begin{aligned} 
tf(t, d) &= \text{the number of times } t \text{ occurs in } d\\ 
df(t) &= \text{the number of documents } t \text{ occurs in }\\ 
idf(t) &= \frac{\text{the number of documents}}{df(t)} 
\end{aligned}
\end{equation}

There are many different flavors of TF.IDF. The most common kind is smoothed logarithmic.

\begin{equation}
\begin{aligned} 
tf(t, d) &= log(1 + \text{the number of times } t \text{ occurs in } d)\\ 
df(t) &= \text{the number of documents } t \text{ occurs in }\\ 
idf(t) &= log(\frac{\text{the number of documents}}{1+df(t)}) 
\end{aligned}
\end{equation}

Let's calculate this with our vectors. We actually already have the term frequency, so all we need to do is calculate the idf, transform the values with log, and multiply tf and idf.

In [None]:
idf = Counter()

for vector in vectors.values():
    idf.update(vector.indices)

In [None]:
for ix, count in idf.most_common(20):
    print('{:5d} {:20s} {:d}'.format(ix, vocabulary[ix], count))

We can now make `idf` a `SparseVector`. We know it contains all the words, so it actually won't be sparse, but this will help us implement the next steps.

In [None]:
indices, values = zip(*idf.items())
idf = SparseVector(indices, values, vocab_len)

In [None]:
from math import log

for index, vector in vectors.items():
    vector.values = list(map(lambda v: log(1+v), vector.values))
    
idf.values = list(map(lambda v: log(vocab_len / (1+v)), idf.values))

In [None]:
tfidf = {index: tf.hadamard(idf) for index, tf in vectors.items()}

In [None]:
tfidf[42]

Let's look at the TF.IDF values for "be" and "the." Let's also look at one of the terms with a higher TF.IDF than these common words.

In [None]:
tfidf[42][3598] # be

In [None]:
tfidf[42][37876] # the

In [None]:
vocabulary[17236], tfidf[42][17236]

Let's look at the document to get an idea of why this word is so important.

In [None]:
print(doc_index.loc[42]['text'])

We can see the document is talking about some person named "Maddi Hausman."

## In Spark
Spark has stages for calculating TF.IDF in MLlib. If you have a column that contains arrays of strings, you can use either `CountVectorizer`, which we are already familiar with, or `HashingTF` to get the `tf` values.   `HashingTF` uses the hashing trick, in which you decide on a vector space beforehand, and hash the words into that vector space. If there is a collision, then those words will be counted as the same. This lets you trade off between memory efficiency and accuracy. As you make your predetermined vector space larger, the output vectors become larger, but the chance of collisions decreases.  

Now that we know how to turn a document into a vector, in the next chapter we can explore how we can use that vector in classic machine learning tasks.  


## Exercises

Now that we have calculated the TF.IDF values, let's build a search function. First, we need a function to process the query.

In [None]:
def process_query(query, pipeline):
    data = spark.createDataFrame([(query,)], ['text'])
    return pipeline.transform(data).first()['normalized']

Then we need a function to get the filter set.

In [None]:
def get_filter_set(processed_query):
    filter_set = set()
    # find all the documents that contain any of the terms
    return filter_set

Next, we need a function that will compute the score for the document.

In [None]:
def get_score(index, terms):
    return # return a single score

We also want a function for displaying results.

In [None]:
def display(index, score, terms):
    hits = [term for term in terms if term in vocabulary and tfidf[index][inv_vocabulary[term]] > 0.]
    print('terms', terms, 'hits', hits)
    print('score', score)
    print('path', path)
    print('length', len(doc_index.loc[index]['text']))

Finally, we are ready for our search function.

In [None]:
def search(query, pipeline, k=5):
    processed_query = process_query(query, pipeline)
    filter_set = get_filter_set(processed_query)
    scored = {index: get_score(index, processed_query) for index in filter_set}
    display_list = list(sorted(filter_set, key=scored.get, reverse=True))[:k]
    for index in display_list:
        display(index, scored[index], processed_query)

In [None]:
search('search engine', pipeline)

You should be able to implement `get_filter_set` and `get_score` easily using examples in this chapter. Try out a few queries. You will likely notice that there are two big limitations here. There is no N-gram support, and the ranker is biased toward longer documents. What could you modify to fix these problems?

## Resources

* An Introduction to Information Retrieval, by Christopher D. Manning, Prabhakar Raghavan, and Hinrich Schütze: this book covers many important aspects of information retrieval. Two of its three authors are also authors of Foundations of Statistical Natural Language Processing.
* Apache Lucene: this is the most-used open source search engine. Often, one of the search platforms built on top of Lucene are used, Apache Solr or Elasticsearch.
* Lucene in Action, 2nd ed., by Michael McCandless, Erik Hatcher, and Otis Gospodnetic (Manning Publications)
A guide to implementing searches using Lucene
* Elasticsearch: The Definitive Guide, by Clinton Gormley and Zachary Tong (O'Reilly)
* A guide to implementing searches using Elasticsearch
* Learning to Rank for Information Retrieval, by Tie-Yan Liu (Springer)
  * Learning to rank, building machine learning–based rankers, is an important part of modern search engines. Tie-Yan Liu is one the most important contributors to the field of learning to rank. 