# Term vector similarity

In this exercise you'll need to complete the code for computing the similarity between two documents that are represented by their term vectors.

In [None]:
pip install ipytest

In [None]:
import ipytest
import math
import pytest
from typing import List

ipytest.autoconfig()

## Jaccard similarity

This metric is a set similarity; that is, it only captures the presence and absence of terms with no regard to their frequency. Put simply, it captures the ratio of shared terms and total terms in the two documents.

$$sim_{Jaccard} = \frac{|X \cap Y|}{|X \cup Y|}$$

where $X$ and $Y$ denote the set of terms in documents $x$ and $y$, respectively.

If the two documents are given as term vectors, Jaccard similarity may be computed as:

$$sim_{\mathrm{Jaccard}}(\mathbf{x},\mathbf{y}) = \frac{\sum_{i} \mathbb{1}(x_i) \times \mathbb{1}(y_i)}{\sum_{i} \mathbb{1}(x_i+y_i)}$$

where $\mathbb{1}(x)$ is an indicator function ($1$ if $x>0$ and $0$ otherwise).

In [None]:
def jaccard(x: List[int], y: List[int]) -> float:
    """Computes the Jaccard similarity between two term vectors."""
    intersection = sum([x_i > 0 and y_i > 0 for x_i, y_i in zip(x, y)])
    union = sum([x_i > 0 or y_i > 0 for x_i, y_i in zip(x, y)])
    return intersection / union

Tests.

In [None]:
%%run_pytest[clean]

def test_no_common_terms():
    x = [0, 0, 0, 1, 2, 1]
    y = [1, 5, 3, 0, 0, 0]
    assert jaccard(x, y) == 0

def test_only_common_terms():
    x = [0, 1, 2, 1, 0, 1]
    y = [0, 5, 3, 7, 0, 1]
    assert jaccard(x, y) == 1

def test_some_common_terms():
    x = [0, 1, 1, 0, 1, 1]
    y = [5, 0, 3, 0, 7, 0]
    assert jaccard(x, y) == 0.4

## Cosine similarity

$$sim_{cos}(x,y) = \frac{\mathbf{x} \cdot \mathbf{y}}{||\mathbf{x}||~||\mathbf{y}||} = \frac{\sum_{i=1}^n x_i y_i}{\sqrt{\sum_{i=1}^n x_i^2} \sqrt{\sum_{i=1}^n y_i^2}}$$

In [None]:
def cosine(x: List[float], y: List[float]) -> float:
    """Computes the Cosine similarity between two term vectors."""
    dot_product = sum([x_i * y_i for x_i, y_i in zip(x, y)])
    norm_x = math.sqrt(sum(x_i**2 for x_i in x))
    norm_y = math.sqrt(sum(y_i**2 for y_i in y))
    return dot_product / (norm_x * norm_y)

Tests.

In [None]:
%%run_pytest[clean]

def test_no_common_terms():
    x = [0, 0, 0, 1, 2, 1]
    y = [1, 5, 3, 0, 0, 0]
    assert cosine(x, y) == 0

def test_identical_docs():
    x = [0, 0, 0, 1, 2, 1]
    assert cosine(x, x) == pytest.approx(1, rel=1e-3)

def test_short_docs():
    x = [4, 2]
    y = [1, 3]
    assert cosine(x, y) == pytest.approx(math.sqrt(2) / 2, rel=1e-3)

# Text preprocessing

In this exercise, you'll need to implement basic text preprocessing steps.

In [None]:
pip install ipytest

In [None]:
from typing import List, Set
import ipytest
import string
import re

ipytest.autoconfig()

## Task 1: Tokenization

Split an input text into tokens based on whitespaces, punctuation, hyphens, and HTML markup. Additionally, lowercase all tokens.

In [None]:
def tokenize(text: str)-> List[str]:    
    """Returns a sequence of terms given an input text."""
    # Remove HTML markup using a regular expression.
    re_html = re.compile("<[^>]+>")
    text = re_html.sub(" ", text)
    # Replace punctuation marks (including hyphens) with spaces.
    for c in string.punctuation:
        text = text.replace(c, " ")
    # Lowercase and split on whitespaces.
    return text.lower().split()

Tests.

In [None]:
%%run_pytest[clean]

def test_whitespace():
    assert tokenize("aaa bbb ccc") == ["aaa", "bbb", "ccc"]
    
def test_punctuation():
    assert tokenize("aaa! bbb.ccc,ddd:eee ff\"f") == ["aaa", "bbb", "ccc", "ddd", "eee", "ff", "f"]
    
def test_hyphens():
    assert tokenize("aaa bbb-Ccc") == ["aaa", "bbb", "ccc"]
    
def test_html():
    assert tokenize("aaa <bbb>ccc <ddd>eee</ddd></bbb>fff <ggg />") == ["aaa", "ccc", "eee", "fff"]

## Task 2: Stopwords removal

Remove stopwords from a sequence of tokens, given a set of stopwords.

In [None]:
def remove_stopwords(tokens: List[str], stopwords: Set[str]) -> List[str]:
    """Removes stopwords from a sequence of tokens."""
    return [token for token in tokens if token not in stopwords]

Tests.

In [None]:
%%run_pytest[clean]

def test_no_stopwords():
    assert remove_stopwords(["this", "is", "some", "text"], {}) == ["this", "is", "some", "text"]
    
def test_stopwords():
    assert remove_stopwords(["this", "is", "some", "text"], {"is", "this"}) == ["some", "text"]
    
def test_stopwords2():
    assert remove_stopwords(["this", "isolate", "otto"], {"is", "this", "to"}) == ["isolate", "otto"]    

## Task 3: Suffix-s stemming

Remove the s-suffix from all terms in a sequence.

In [None]:
def suffix_s_stemmer(terms: List[str]) -> List[str]:
    """Removes the s-suffix from all terms in a sequence."""
    stemmed_terms = []
    for term in terms:        
        stemmed_term = term[:-1] if term[-1] == "s" else term
        stemmed_terms.append(stemmed_term)
    return stemmed_terms

Tests.

In [None]:
%%run_pytest[clean]

def test_stemming():
    assert suffix_s_stemmer(["dogs", "better", "cats"]) == ["dog", "better", "cat"]

# Naive Bayes text classifier

In this exercise, you'll implement a Naive Bayes classifier for text from scratch.

In [None]:
pip install ipytest

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ipytest
  Downloading ipytest-0.13.0-py3-none-any.whl (14 kB)
Collecting pytest>=5.4
  Downloading pytest-7.2.0-py3-none-any.whl (316 kB)
[K     |████████████████████████████████| 316 kB 8.7 MB/s 
Collecting iniconfig
  Downloading iniconfig-1.1.1-py2.py3-none-any.whl (5.0 kB)
Collecting exceptiongroup>=1.0.0rc8
  Downloading exceptiongroup-1.0.4-py3-none-any.whl (14 kB)
Collecting pluggy<2.0,>=0.12
  Downloading pluggy-1.0.0-py2.py3-none-any.whl (13 kB)
Collecting jedi>=0.10
  Downloading jedi-0.18.1-py2.py3-none-any.whl (1.6 MB)
[K     |████████████████████████████████| 1.6 MB 48.3 MB/s 
Installing collected packages: pluggy, jedi, iniconfig, exceptiongroup, pytest, ipytest
  Attempting uninstall: pluggy
    Found existing installation: pluggy 0.7.1
    Uninstalling pluggy-0.7.1:
      Successfully uninstalled pluggy-0.7.1
  Attempting uninstall: pytest
    Found existing i

In [None]:
import ipytest
from typing import List

ipytest.autoconfig()

### Training the model

  - Calculate $P(y)$ for each class label in the training data
  - Calculate $P(x_i|y)$ for each feature (term) for each class label in the training data using Laplace (add-one) smoothing
  
$$P(x_i|y)=\frac{c_{i,y} + 1}{c_i + m}$$

where 
  - $c_{i,y}$ is the number of times term $x_i$ appears in class $y$
  - $c_i$ is the total number of times term $x_i$ appears in the collection
  - $m$ is the number of classes


### Applying the model

Return the class $y \in Y$ that maximizes $P(y) \prod_{x_i} P(x_i|y)$.

Note that we need to consider $x_i$ at each *word position* in the document. Thus, we need to multiply with $P(x_i|y)$ as many times as $x_i$ appears in the document.
We can rewrite it as: $$P(y|x) \propto P(y) \prod_{i \in d} P(x_i|y)^{c_{i,d}}$$ where $c_{i,d}$ is the number of times term $i$ appears in document $d$.

Finally, we perform the computations in the log domain, that is, $$\log P(y) +  \sum_{i=1}^n (c_{i,d} \log P(x_i|y))$$

## 1) Probability estimations

The estimation of probabilities $P(x_i|y)$ and $P(y)$ are refactored to a separate class to make them testable.

In [None]:
class NBProbabilityEstimator:
    
    def get_prior_prob(self, y: int, training_labels: List[int]) -> float:
        """Computes the class prior probability, P(y).
        
        Args:
            y: Class ID.
            training_labels: Class labels in training data.
            
        Returns:
            The probability P(y).
        """
        return training_labels.count(y) / len(training_labels)
    
    def get_term_prob(self, count_inclass: int, count_total: int, num_classes: int) -> float:
        """Computes the smoothed term probability for a given class, P(x_i|y).
        
        Args:
          count_inclass: Number of times the term appears in the given class.
          count_total: Number of times the term appears in the collection.
          num_classes: Number of classes.
          
        Returns:
          The probability P(x_i|y).
        """
        return (count_inclass + 1) / (count_total + num_classes)

### Tests

In [None]:
%%run_pytest[clean]

def test_prior_prob():
    nbpe = NBProbabilityEstimator()
    assert nbpe.get_prior_prob(1, [0, 1, 2, 3]) == 0.25
    assert nbpe.get_prior_prob(1, [1, 1, 2, 3]) == 0.5

def test_term_prob():
    nbpe = NBProbabilityEstimator()
    assert nbpe.get_term_prob(5, 20, 10) == 0.2
    assert nbpe.get_term_prob(74, 90, 10) == 0.75
    assert nbpe.get_term_prob(0, 6, 10) == 0.0625

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).
ipytest.clean_tests is deprecated in favor of ipytest.clean


[32m.[0m[32m.[0m[32m                                                                                           [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.05s[0m[0m


## 2) Naive Bayes classifier

Implement training and prediction for a Naive Bayes classifier.  We are operating with dense matrices for simplicity.

In [None]:
import numpy as np
import math

class NBClassifier:

    def __init__(self) -> None:
        self._nbprob = NBProbabilityEstimator()
        self._num_classes = 0
        self._prior_prob = None  # Holds P(y) values
        self._term_prob = None  # Holds P(x_i|y) values
        
    
    def fit(self, X_train: List[List[int]], y_train: List[int]) -> None:
        """Fits the model.
        
        Args:
          X_train: Document-term matrix for training data. 
              Rows correspond to documents and columns correspond to terms.
          y_train: Class labels corresponding to training documents.
        """        
        self._num_classes = len(np.unique(y_train))
        num_docs = len(X_train)
        num_terms = len(X_train[0])        
        self._term_prob = np.zeros((num_terms, self._num_classes))
        
        # Iterating through the vocabulary
        for i in range(num_terms):
            # Holds c_{i,j} values, i.e., the number of times term i appears with class j.
            class_count = [0] * self._num_classes
            for d in range(num_docs):
                class_count[y_train[d]] += X_train[d][i]
                        
            # Calculate P(x_i|y)
            total_count = sum(class_count)
            for j in range(self._num_classes):
                self._term_prob[i, j] = self._nbprob.get_term_prob(class_count[j], total_count, num_terms)
                
        # Pre-compute class prior probabilities
        self._prior_prob = []
        for y in range(self._num_classes):
            self._prior_prob.append(self._nbprob.get_prior_prob(y, y_train))

                
    def _predict_instance(self, x: List[int]) -> int:
        """Predict class for a single instance (document).
        
        Args:
          x: Document term vector.
          
        Returns:
          The predicted class label (0-indexed).
        """
        probs = []
        
        for y in range(self._num_classes):
            p = math.log(self._prior_prob[y])
            for i in range(len(x)):
                if x[i] > 0:
                    p += x[i] * math.log(self._term_prob[i][y])
            probs.append(p)
            
        # Get the class with the highest probability.
        return probs.index(max(probs))
        
    
    def predict(self, X_test: List[List[int]]) -> List[float]:
        """Make predictions for a set of documents.
        
        Args:
          X_test: Document-term matrix for test data.
          
        Returns:
          List with predictions.
        """
        return [self._predict_instance(x) for x in X_test]        

## 3) Testing on real data

We will be using a subset of the 20Newsgroups collection.

In [None]:
from sklearn.datasets import fetch_20newsgroups

categories = [
    "alt.atheism",
    "soc.religion.christian", 
    "talk.religion.misc",
    "comp.sys.ibm.pc.hardware",
    "comp.sys.mac.hardware"
]

train = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=123)
test = fetch_20newsgroups(subset='test', categories=categories, shuffle=True, random_state=123)

### Feature extraction

Get term frequencies using `CountVectorizer`. (We ignore terms that appear in less than 10 documents to speed up computation.)

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

count_vect = CountVectorizer(min_df=10)
X_train_counts = count_vect.fit_transform(train.data)
X_test_counts = count_vect.transform(test.data)

### Train and apply model

Note that we convert sparse matrices to dense ones. This is not efficient and should be avoided when working with large datasets. Nevertheless, this simplifies the implementation for this exercise.

In [None]:
nb = NBClassifier()
nb.fit(X_train_counts.toarray(), train.target.tolist())
predicted = nb.predict(X_test_counts.toarray())

### Evaluation

In [None]:
from sklearn import metrics


print(f"{metrics.accuracy_score(test.target, np.asarray(predicted)):.3f}")

0.595


**TODO** Once you completed the exercise E2-3, check back here to see if the performance you got with the implementation from scratch is comparable to that of sklearn. Most likely, you'll see quite a bit difference. Can you find out the reason for that? You can share the solution at the next class session (for a bonus point).

## Optional exercises

If you're done, try to implement it without making a conversion to dense matrices.

Also, do we really need to precompute and store all term probabilities?

# Cross-validation

In this exercise, we want to split a dataset into train-test splits for k-fold cross-validation. Part of the excercise involves designing appropriate tests for this task.

In [None]:
pip install ipytest

In [None]:
import ipytest
import numpy as np
import random
from typing import Dict, List

ipytest.autoconfig()

## Task

The dataset is given as a list of instances (by their IDs). Your task is divide it into k folds to perform cross-validation.

Each fold should enumerate the instances for the train and test splits.

For examples, given `instances = [1, 2, 3]` and `k=3`, the method should return

```
folds = [
    {'train': [1, 2], 'test': [3]},
    {'train': [1, 3], 'test': [2]},
    {'train': [2, 3], 'test': [1]},
]
```

In [None]:
def create_folds(instances: List[int], k: int = 5) -> List[Dict[str, List[int]]]:
    """Given a set of instances, it returns k splits of train and test."""
    # Shuffle instances (by first making a copy of them).
    instances_shuffled = list(instances)
    # random.seed(10)  # Uncomment to check if the last test fails.
    random.shuffle(instances_shuffled)

    folds = []
    for fold_id in range(k):
        train, test = [], []
        for i in range(len(instances_shuffled)):
            if i % k == fold_id:
                test.append(instances_shuffled[i])
            else:
                train.append(instances_shuffled[i])
        
        folds.append({
            'train': train, 
            'test': test
        })
    return folds

### Tests

One simple test is provided, which merely checks if the required number of folds is generated and that each contains the correct number of train and test instances.

Part of the exercise is to create some more advanced tests. 

  - One test should test converage, that is, check that all instances are part of exactly one test fold and k-1 train folds.
  - Another test should checks that the folds are sufficiently random, i.e., that you're not always returning the exact same partitioning of instances.

**Note** You can test whether the last test works as intended, by fixing the random seed, i.e., uncommenting that line in `create_folds`. Then, the test should not pass (as it is not a truly random assignment anymore).

In [None]:
%%run_pytest[clean]

from collections import Counter, defaultdict


def test_fold_size():
    instances = list(range(100))
    folds = create_folds(instances, k=5)
    assert len(folds) == 5
    for fold in folds:
        assert len(fold['train']) == 80
        assert len(fold['test']) == 20

def test_coverage():
    instances = list(range(100))
    k = 5
    folds = create_folds(instances, k=k)
    train_counter, test_counter = Counter(), Counter()
    for fold in folds:
        train_counter.update(fold['train'])
        test_counter.update(fold['test'])    

    assert set(train_counter) == set(instances)
    assert set(train_counter.values()) == set([k-1])
    assert set(test_counter) == set(instances)
    assert set(test_counter.values()) == set([1])

def test_randomization():
    instances = list(range(100))
    k = 5
    # Create a sufficiently large sample (i.e., repeat 1000x)
    # and keep track of how many times we get the exact same test fold.
    num_test_fold = defaultdict(int)
    for _ in range(1000):
        folds = create_folds(instances, k=k)
        for fold_id, fold in enumerate(folds):
            # We create a "signature" for each test fold by concatenating
            # the ordered instance IDs.
            signature = "-".join([str(_) for _ in sorted(fold["test"])])
            num_test_fold[signature] += 1

    # If a given combination of instance IDs is observed more than 3 times,
    # then the partitioning is probably not sufficiently random.
    # Note, that 3 is a somewhat arbitrarily chosen theshold. It should be
    # done more precisely (any likely using some statistical test).
    # The smaller the number of instance, the more often this test fails 
    # incorrectly (e.g., with 28 instances, it'll fail once in a while; 
    # with less than 25 instances, it'll almost always fail.)
    for _, count in num_test_fold.items():
        assert count < 3

# Experiments with text classifiers in sklearn

In this exercise we'll be experimenting with various classification algorithms in scikit learn using the [20 Newsgroups collection](http://people.csail.mit.edu/jrennie/20Newsgroups/).

The first part of the notebook shows a detailed example usage of text classification using sklearn (based on [scikit learn's "Working with text data" tutorial](https://scikit-learn.org/stable/tutorial/text_analytics/working_with_text_data.html)).
The real exercise is at the bottom, where you'll be asked to perform various experiments.

## Load data

In order to get faster execution times, we will work on a partial dataset with only 5 categories out of the 20 available in the dataset:

In [None]:
pip install ipytest

In [None]:
categories = [
    "alt.atheism",
    "soc.religion.christian", 
    "talk.religion.misc",
    "comp.sys.ibm.pc.hardware",
    "comp.sys.mac.hardware"
]

We load the documents from those categories, divided into train and test sets.

In [None]:
from sklearn.datasets import fetch_20newsgroups

train = fetch_20newsgroups(subset="train", categories=categories, shuffle=True, random_state=123)
test = fetch_20newsgroups(subset="test", categories=categories, shuffle=True, random_state=123)

Check which categories got loaded.

In [None]:
print(train.target_names)

Check the size of training and test splits.

In [None]:
print("Training instances: {}".format(len(train.data)))
print("Test instances:     {}".format(len(test.data)))

Check target labels of some of the train and test instances.

In [None]:
print(train.target[:10])
print(test.target[:10])

## Train a model

Bag-of-words document representation, using raw term counts.

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

count_vect = CountVectorizer()
X_train_counts = count_vect.fit_transform(train.data)

Check dimensionality (instances x features).

In [None]:
print(X_train_counts.shape)

Check vocabulary (sample 10 terms).

In [None]:
for idx, term in enumerate(count_vect.vocabulary_.keys()):
    if idx < 10:
        print(f"{term} (ID: {count_vect.vocabulary_[term]})")

Learn a Naive Bayes model on the training data (by default it uses Laplace smoothing with alpha=1).

In [None]:
from sklearn.naive_bayes import MultinomialNB

classifier = MultinomialNB(alpha=1.0)
classifier.fit(X_train_counts, train.target)

## Apply the model

First, extract the same feature representation by re-using the `CountVectorizer` from before.

In [None]:
X_test_counts = count_vect.transform(test.data)

Check dimensionality (documents x features).

In [None]:
print(X_test_counts.shape)

Then, predict labels for test instances.

In [None]:
predicted = classifier.predict(X_test_counts)

Look at some of the predicted labels.

In [None]:
print(predicted[:10])

## Evaluate model performance

We use Accuracy as our measure here.

In [None]:
from sklearn import metrics

print(f"{metrics.accuracy_score(test.target, predicted):.3f}")

## Exercise

1) Use TF weighting instead of the raw counts. (See the [sklearn documentation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfTransformer.html) for `TfidfTransformer` usage.)

2) Try at least one different classifier, e.g., [linear SVM](https://scikit-learn.org/stable/modules/generated/sklearn.svm.LinearSVC.html#sklearn.svm.LinearSVC) (or [other SVMs](https://scikit-learn.org/stable/modules/svm.html#svm-classification)).

3) Record the results you got in the table below. How far can you push accuracy?

### Solution

Building a pipeline for each row in the table, then running an evaluating them in a single loop.

In [None]:
from sklearn.linear_model import SGDClassifier
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.pipeline import Pipeline

#### Naive Bayes variants

In [None]:
pipeline_nb_raw = Pipeline([
    ('vect', CountVectorizer()),
    ('clf', MultinomialNB()),
])

In [None]:
pipeline_nb_tf = Pipeline([
    ('vect', CountVectorizer()),
    ('tfidf', TfidfTransformer(use_idf=False)),
    ('clf', MultinomialNB()),
])

#### SVM variants

In [None]:
pipeline_svm_raw = Pipeline([
    ('vect', CountVectorizer()),
    ('clf', SGDClassifier()),
])

In [None]:
pipeline_svm_tf = Pipeline([
    ('vect', CountVectorizer()),
    ('tfidf', TfidfTransformer(use_idf=False)),
    ('clf', SGDClassifier()),
])

In [None]:
for pipeline in [
    pipeline_nb_raw, pipeline_nb_tf, 
    pipeline_svm_raw, pipeline_svm_tf
]:
    pipeline.fit(train.data, train.target)
    predicted = pipeline.predict(test.data)
    print(f"{metrics.accuracy_score(test.target, predicted):.3f}")

### Results

| Model | Term weighting | Accuracy |
| -- | -- |:--:|
| Naive Bayes | Raw counts | 0.864 |
| Naive Bayes | TF | 0.667 |
| SVM | Raw counts | 0.819 |
| SVM | TF | 0.851 |
| ... | ... | ... | 


## Optional exercise

Can you push performance ever further? You could try, for example, more sophisticated text preprocessing (tokenization, stopwords removal, and stemming) using [NLTK](https://www.nltk.org/) (which is part of the Anaconda distribution). See, e.g., [this article](https://towardsdatascience.com/machine-learning-nlp-text-classification-using-scikit-learn-python-and-nltk-c52b92a7c73a) for some hints.

# Building an inverted index

  - You are given a sample (1000 documents) from the [The Reuters-21578 data collection](http://www.daviddlewis.com/resources/testcollections/reuters21578/) in `data/reuters21578-000.xml`
  - The code that parses the XML and extract a list of preprocessed terms (tokenized, lowercased, stopwords removed) is already given
  - You are also given an `InvertedIndex` class that manages the posting lists operations
  - Your task is to build an inverted index from the input collection.

In [None]:
pip install ipytest

In [None]:
import ipytest
import re

from typing import List, Dict, Union, Any, Callable
from collections import Counter, defaultdict
from xml.dom import minidom
from dataclasses import dataclass

ipytest.autoconfig()

## Parsing documents

Stopwords list

In [None]:
STOPWORDS = ["a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "such", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with"]

Stripping tags inside <> using regex

In [None]:
def striptags(text: str) -> str:
    """Removes xml tags.

    Args:
        text: Text string with xml tags.

    Returns:
        String without xml tags.
    """
    p = re.compile(r"<.*?>")
    return p.sub("", text)

Parse input text and return a list of indexable terms

In [None]:
def parse(text: str) -> List[str]:
    """Parses documents and removes xml tags and punctuation.

    Args:
        text: Text to parse.

    Returns:
        List of tokens.
    """
    terms = []
    # Replace specific characters with space
    chars = ["'", ".", ":", ",", "!", "?", "(", ")"]
    for ch in chars:
        text = text.replace(ch, " ")

    # Remove tags
    text = striptags(text)

    # Tokenization
    # default behavior of the split is to split on one or more whitespaces
    return [term.lower() for term in text.split() if term not in STOPWORDS]

## Processing the input document collection

  - The collection is given as a single XML file. 
  - Each document is inside `<REUTERS ...> </REUTERS>`.
  - We extract the contents of the `<DATE>`, `<TITLE>`, and `<BODY>` tags.
  - After each extracted document, the provided callback function is called and all document data is passed in a single dict argument.

In [None]:
def process_collection(input_file:str, callback: Callable) -> None:
    """Processes file and calls the callback function for each document in the
    file.

    Args:
        input_file: Path to file to process.
        callback: Function that will be called for each document.
    """
    xmldoc = minidom.parse(input_file)
    # Iterate documents in the XML file
    itemlist = xmldoc.getElementsByTagName("REUTERS")
    for doc_id, doc in enumerate(itemlist):
        date = doc.getElementsByTagName("DATE")[0].firstChild.nodeValue
        # Skip documents without a title or body
        if not (doc.getElementsByTagName("TITLE") and doc.getElementsByTagName("BODY")):
            continue
        title = doc.getElementsByTagName("TITLE")[0].firstChild.nodeValue
        body = doc.getElementsByTagName("BODY")[0].firstChild.nodeValue
        callback({
            "doc_id": doc_id+1,
            "date": date,
            "title": title,
            "body": body
            })

Prints a document"s contents (used as a callback function passed to `process_collection`)

In [None]:
def print_doc(doc: Dict[str, Union[str, int]]) -> None:
    """Print details of the first 5 documents.

    Args:
        doc: Dictionary with document details.
    """
    if doc["doc_id"] <= 5:  # print only the first 5 documents
        print("docID:", doc["doc_id"])
        print("date:", doc["date"])
        print("title:", doc["title"])
        print("body:", doc["body"])
        print("--")

In [None]:
!mkdir data
!wget --output-document="data/reuters21578-000.xml" "https://raw.githubusercontent.com/iai-group/ir-course-2022/main/resources/reuters21578-000.xml"

In [None]:
process_collection("data/reuters21578-000.xml", print_doc)

## Task 1: Complete the inverted index class

  - The inverted index is an object with methods for adding and fetching postings.
  - The data is stored in a map, where keys are terms and values are lists of postings.
  - Each posting is an object that holds the doc_id and an optional payload.

In [None]:
# Since this is a simple data class, intializing it can be abstracted with
# the use of dataclass decorator.
# https://docs.python.org/3/library/dataclasses.html

@dataclass
class Posting:
    doc_id: int
    payload: Any = None

In [None]:
class InvertedIndex:

    def __init__(self):
        self._index = defaultdict(list)
    
    def add_posting(self, term: str, doc_id: int, payload: Any=None) -> None:
        """Adds a document to the posting list of a term."""
        # append new posting to the posting list
        self._index[term].append(Posting(doc_id, payload))

    def get_postings(self, term: str) -> List[Posting]:
        """Fetches the posting list for a given term."""
        return self._index.get(term)

    def get_terms(self) -> List[str]:
        """Returns all unique terms in the index."""
        return self._index.keys() 
    
    def write_to_file(self, filename_index: str) -> None:
        """Saves the index to a textfile."""
        with open(filename_index, "w") as f:
            for term, postings in self._index.items():
                f.write(term)
                for posting in postings:
                    f.write(f" {posting.doc_id}")
                    if posting.payload:
                        f.write(f":{str(posting.payload)}")
                f.write("\n")

Tests.

In [None]:
%%run_pytest[clean]

def test_postings():
    ind = InvertedIndex()
    ind.add_posting("term", 1, 1)
    ind.add_posting("term", 2, 4)
    # Testing existing term
    postings = ind.get_postings("term")
    assert len(postings) == 2
    assert postings[0].doc_id == 1
    assert postings[0].payload == 1
    assert postings[1].doc_id == 2
    assert postings[1].payload == 4
    # Testing non-existent term
    assert ind.get_postings("xyx") is None

def test_vocabulary():
    ind = InvertedIndex()
    ind.add_posting("term1", 1)
    ind.add_posting("term2", 1)
    ind.add_posting("term3", 2)
    ind.add_posting("term2", 3)
    assert set(ind.get_terms()) == set(["term1", "term2", "term3"])

## Task 2: Build an inverted index from the input collection

**TODO**: Complete the code to index the entire document collection.  (The content for each document should be the title and body concatenated)

In [None]:
ind = InvertedIndex()

def index_doc(doc: Dict[str, Union[str, int]]) -> None:
    """Index document by concatenating document title and body.

    Args:
        doc: Document details.
    """
    text = doc["title"] + " " + doc["body"]
    terms = parse(text)  # list of terms in the document
    tc = Counter(terms)  # dict with term counts
    for term, freq in tc.items():
        ind.add_posting(term, doc["doc_id"], freq)
    
process_collection("data/reuters21578-000.xml", index_doc)

## Task 3: Save the inverted index to a file

Save the inverted index to a file (`data/index.dat`). Use a simple text format with `termID docID1:freq1 docID2:freq2 ...` per line, e.g.,

```
xxx 1:1 2:1 3:2
yyy 2:1 4:2
zzz 1:3 3:1 5:2
...
```

Implement this by (1) adding a `write_to_file(self, filename)` method to the `InvertedIndex` class and then (2) invoking that method in the cell below.

In [None]:
ind.write_to_file("data/index.dat")

## Task 4 (advanced, optional): Plot collection size against index size

Create a plot that compares the size of the document collection (bytes) against the size of the corresponding index (bytes) on the y-axis vs. with respect to the number of documents on the x-axis. You may use [Matplotlib](https://www.tutorialspoint.com/jupyter/jupyter_notebook_plotting.htm) for plotting.

In our solution, we create a different callback function and use that one for indexing.

In [None]:
import sys
import os

ind = InvertedIndex()
tmp_file = "data/index_tmp.dat"
stats = {
    "i": 0,
    "sum_bytes": 0,
    "num_docs": [],
    "size_index": [],
    "size_docs": []
}

def index_doc_with_stats(doc: Dict[str, Union[str, int]]) -> None:
    """Indexes documents and updates stats dictionary.

    Args:
        doc: [description]
    """
    index_doc(doc)
    # Stats are stored in a global variable (not very elegant but quick solution)
    stats["i"] += 1
    stats["sum_bytes"] += sys.getsizeof(str(doc))  # String document representation is a good proxy for doc size
    # We measure index size and document collection size after every 100 docs
    if stats["i"] % 100 == 0:
        stats["num_docs"].append(stats["i"])
        stats["size_docs"].append(stats["sum_bytes"])
        # To get index size, we dump it to a file and get file size
        # Alternatively, the pympler package may be used to measure the size of Python objects
        ind.write_to_file(tmp_file)
        stats["size_index"].append(os.path.getsize(tmp_file))
        
process_collection("data/reuters21578-000.xml", index_doc_with_stats)

In [None]:
import matplotlib.pyplot as plt

# Rendering plots inline in Jupyter notebooks.
%matplotlib inline

In [None]:
plt.plot(stats["num_docs"], stats["size_docs"], label="Collection size")
plt.plot(stats["num_docs"], stats["size_index"], label="Index size")
plt.xlabel("Number of documents")
plt.ylabel("Bytes")
plt.legend(loc="upper left")
plt.show()

# Query processing with document-at-a-time scoring

Implement term-at-a-time scoring using a simple retrieval function.

In [None]:
pip install ipytest

In [None]:
import ipytest
import pytest

from typing import Dict, List, Tuple
from collections import Counter

ipytest.autoconfig()


### Inverted index

For simplicity, the inverted index for the document collection is given as a dictionary, with a terms as keys and posting lists as values. Each posting is a (document ID, term frequency) tuple.

In [None]:
index = {
    "beijing": [(1, 1), (4, 1)],
    "dish": [(1, 1), (4, 1)],
    "duck": [(0, 3), (1, 2), (2, 2), (4, 1)],
    "rabbit": [(2, 1), (3, 1)],
    "recipe": [(2, 1), (3, 1), (4, 1)]
}

### Document lengths

The length of each document is provided in a list. (Normally, this information would be present in a document metadata store or in a forward index.)

In [None]:
doc_len = [3, 4, 4, 2, 4]

### Document-at-a-time scoring

We utilize the fact that the posting lists are ordered by document ID.  Then, it"s enough to iterate through each query term"s posting list only once.  We keep a pointer for each query term.

Normally, document scores would be kept in a priority queue. Here, for simplicity, we will keep them in a dictionary.

The retrieval function we use is the following:

$$score(q,d) = \sum_{t \in q} w_{t,d} \times w_{t,q}$$

where $w_{t,d}$ and $w_{t,q}$ are length-normalized term frequencies. I.e., $w_{t,d}=\frac{c_{t,d}}{|d|}$, where $c_{t,d}$ is the number of occurrences of term $t$ in document $d$ and $|d|$ is the document length (=total number of terms). (It goes analogously for the query.)

In [None]:
def score_collection(index: Dict[str, List[Tuple[int, int]]], 
                    doc_len: List[int], 
                    query: str) -> List[Tuple[int, float]]:
    """Scores all documents in the collection.
    
    Args:
        index: Dict holding the inverted index.
        doc_len: List with document lengths.
        query: Search query.
    
    Returns:
        List with (document_id, score) tuples, ordered by score desc.
    """
    
    # Turns the query string into a "term: freq" dictionary.
    query_freqs = dict(Counter(query.split()))
    # Computes query length (i.e., sum of all query term frequencies).
    query_len = sum(query_freqs.values())

    doc_scores = {}  # Holds the final document scores (this should be a priority list, but for simplicity we use a dictionary here).
    
    pos = {term: 0 for term in query_freqs}  # Holds a pointer for each query term"s posting list.
        
    # Iterate through each document.
    for doc_id in range(len(doc_len)):            
        # First, we collect the document term frequencies from the index.
        # (Essentially, we just "recover" the document"s contents from the index.)
        c_td = {}  # Holds the term frequencies in the document
        for term in query_freqs.keys(): 
            # Get the term frequency from the posting list.
            # Utilize the fact that the posting lists are ordered by document ID!
            if pos[term] == len(index[term]):  # The end of the posting list has been reached.
                continue
            (d, freq) = index[term][pos[term]]
            if d == doc_id:
                c_td[term] = freq
                pos[term] += 1
            else:
                # This means that d > doc_id, i.e., the term is not present in this doc.
                pass
                    
        # Then, we score the document.
        score = 0  # Holds the document"s retrieval score
        for term, c_tq in query_freqs.items():
            # Incement the document"s score according to the given query term
            w_td = c_td.get(term, 0) / doc_len[doc_id]
            w_tq = c_tq / query_len
            score += w_td * w_tq
        # Record final document score.
        doc_scores[doc_id] = score
        
    return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)

Tests.

In [None]:
%%run_pytest[clean]

def test_scoring():
    scores = score_collection(index, doc_len, "beijing duck recipe")    
    assert scores[0][0] == 0
    assert scores[0][1] == pytest.approx(1/3, rel=1e-2)
    assert scores[2][0] == 2
    assert scores[2][1] == pytest.approx(1/4, rel=1e-2)
    assert scores[4][0] == 3
    assert scores[4][1] == pytest.approx(1/6, rel=1e-2)

# Document-term matrix generation

In this exercise, you'll have to generate a document-term matrix from an input list of preprocessed documents.

In [None]:
pip install ipytest

In [None]:
from typing import List, Tuple
import ipytest
import pytest

ipytest.autoconfig()

Input documents are given as lists of tokenized terms.

In [None]:
DOCUMENTS = [
    ["aaa", "bbb", "ccc"],
    ["eee", "fff"],
    ["aaa", "eee", "aaa", "ccc", "fff", "fff", "ggg", "aaa"],
    ["bbb", "bbb", "bbb"],
    ["ggg", "fff", "ccc", "aaa", "ccc"],
]

### Solution #1

Simple (naive) solution.

In [None]:
def get_doc_term_matrix(docs: List[List[str]]) -> Tuple[List[List[int]], List[str]]:
    """Generates a document-term matrix and the corresponding vocabulary.
    
    Args:
        docs: List of documents, each given by a list of tokenized terms.
        
    Returns:
        Tuple consisting of the document-term matrix and the corresponding vocabulary.
        In the document-term matrix row `i` corresponds to `docs[i]` and column `j`
        corresponds to the jth element of the vocabulary. Values represent the number
        of times the term appears in the document.
        Terms may be in any order in the vocabulary.
    """
    vocabulary = list(set([term for doc in docs for term in doc]))
    doc_term_matrix = []
    for doc in docs:
        doc_term_vector = []
        for term in vocabulary:
            doc_term_vector.append(doc.count(term))
        doc_term_matrix.append(doc_term_vector)
    return doc_term_matrix, vocabulary

### Solution #2

More effective solution avoiding iterating through document vectors multiple times.

In [None]:
def get_doc_term_matrix(docs: List[List[str]]) -> Tuple[List[List[int]], List[str]]:
    """Generates a document-term matrix and the corresponding vocabulary.
    
    Args:
        docs: List of documents, each given by a list of tokenized terms.
        
    Returns:
        Tuple consisting of the document-term matrix and the corresponding vocabulary.
        In the document-term matrix row `i` corresponds to `docs[i]` and column `j`
        corresponds to the jth element of the vocabulary. Values represent the number
        of times the term appears in the document.
        Terms may be in any order in the vocabulary.
    """
    # The vocabulary is represented internally as a dictionary 
    # to allow for effective lookup of term indices.
    vocabulary = {term: idx for idx, term in enumerate(set([term for doc in docs for term in doc]))}
    # Initialize doc-term matrix with zeros.
    doc_term_matrix = [[0] * len(vocabulary) for _ in range(len(docs))]
    # Iterate through the content of each document only once and increase term counts.
    for doc_idx, doc in enumerate(docs):
        for term in doc:
            term_idx = vocabulary[term]
            doc_term_matrix[doc_idx][term_idx] += 1
            
    return doc_term_matrix, list(vocabulary.keys())

Tests.

In [None]:
%%run_pytest[clean]

def test_num_docs():
    doc_term_matrix, _ = get_doc_term_matrix(DOCUMENTS)
    assert len(doc_term_matrix) == len(DOCUMENTS)
    
def test_vocabulary():
    _, vocabulary = get_doc_term_matrix(DOCUMENTS)
    assert set(vocabulary) == {"aaa", "bbb", "ccc", "eee", "fff", "ggg"}
    
def test_term_counts():
    doc_term_matrix, vocabulary = get_doc_term_matrix(DOCUMENTS)
    idx_aaa = vocabulary.index("aaa")
    idx_ccc = vocabulary.index("ccc")
    idx_fff = vocabulary.index("fff")
    assert doc_term_matrix[0][idx_aaa] == 1
    assert doc_term_matrix[0][idx_ccc] == 1
    assert doc_term_matrix[0][idx_fff] == 0
    assert doc_term_matrix[2][idx_aaa] == 3
    assert doc_term_matrix[2][idx_ccc] == 1
    assert doc_term_matrix[2][idx_fff] == 2

# TF-IDF weighting

In this exercise, you'll have to compute term weightings (TF, IDF, and TF-IDF) based on a document-term matrix.

In [None]:
pip install ipytest

In [None]:
from typing import List
import ipytest
import math
import pytest

ipytest.autoconfig()

The document-term vector contains the raw term frequencies for each term in the document.

In [None]:
DOC_TERM_MATRIX = [
    [0, 0, 3, 0, 0, 0],
    [1, 1, 2, 0, 0, 0],
    [0, 0, 2, 1, 1, 0],
    [0, 0, 0, 1, 1, 0],
    [1, 1, 1, 0, 1, 1]
]

## Task 1: TF weighting

Compute the L1-normalized term frequency vector for a given document.

The L1-normalized frequency of a single term in a document is given by:

$$tf_{t,d}=\frac{c_{t,d}}{|d|}$$ 

where $c_{t,d}$ is the count of occurrences of term $t$ in document $d$ and $|d|$ is the document length (total number of terms).

In [None]:
def get_tf_vector(doc_term_vector: List[int]) -> List[float]:    
    """Computes the normalized term frequency vector from a raw term-frequency vector."""
    sum_freq = sum(doc_term_vector)
    if sum_freq == 0:  # This would mean that the document has no content.
        return None    
    tf_vector = [freq / sum_freq for freq in doc_term_vector]
    return tf_vector

Tests.

In [None]:
%%run_pytest[clean]

def test_tf_doc0():
    assert get_tf_vector(DOC_TERM_MATRIX[0]) == [0, 0, 1, 0, 0, 0]
    
def test_tf_doc1():
    assert get_tf_vector(DOC_TERM_MATRIX[1]) == [0.25, 0.25, 0.5, 0, 0, 0]

## Task 2: IDF weighting

Compute the IDF weight of a term given by

$$idf_{t}=\log \frac{N}{n_t}$$ 

where $N$ is the total number of documents and $n_t$ is the number of documents that contain term $t$.
**Use base-10 logarithm in this exercise.**

In [None]:
def get_term_idf(doc_term_matrix: List[List[int]], term_index: int) -> float:
    """Computes the IDF value of a term, given by its index, based on a document-term matrix."""
    N = len(doc_term_matrix)
    n_t = sum([1 if doc_freqs[term_index] > 0 else 0 for doc_freqs in doc_term_matrix])
    return math.log10(N / n_t)

Tests.

In [None]:
%%run_pytest[clean]

def test_idf_term0():
    assert get_term_idf(DOC_TERM_MATRIX, 0) == pytest.approx(0.3979, rel=1e-3)
    
def test_idf_term2():
    assert get_term_idf(DOC_TERM_MATRIX, 2) == pytest.approx(0.0969, rel=1e-3)

## Task 3: TF-IDF weighting

Compute the TF-IDF vector for a given document, where the TF-IDF weight of a term in a document is given by:

$$ tfidf_{t,d} = tf_{t,d} \times idf_{t}$$

In [None]:
def get_tfidf_vector(doc_term_matrix: List[List[int]], doc_index: int) -> List[float]:
    """Computes the TFIDF vector from a raw term-frequency vector."""
    tf_vector = get_tf_vector(doc_term_matrix[doc_index])
    tfidf_vector = []
    for term_index, tf in enumerate(tf_vector):
        idf = get_term_idf(doc_term_matrix, term_index)
        tfidf_vector.append(tf * idf)
    return tfidf_vector

Tests.

In [None]:
%%run_pytest[clean]

def test_tfidf_doc0():
    assert get_tfidf_vector(DOC_TERM_MATRIX, 0) == pytest.approx([0, 0, 0.0969, 0, 0, 0], rel=1e-3)

# Vector space retrieval

This exercise is about scoring a (toy-sized) document collection against a query using various retrieval functions instantiated in the vector space model.

In [None]:
pip install ipytest

In [None]:
import ipytest
import math
import pytest

from typing import Dict, List, Tuple
from abc import ABC, abstractmethod

ipytest.autoconfig()

Term-document matrix.

In [None]:
TD_MATRIX_TYPE = Dict[str, List[int]]
DOCUMENT_SCORES_TYPE = List[Tuple[int, float]]
TD_MATRIX = {
    "beijing": [0, 1, 0, 0, 1],
    "dish": [0, 1, 0, 0, 1],
    "duck": [3, 2, 2, 0, 1],
    "rabbit": [0, 0, 1, 1, 0],
    "recipe": [0, 0, 1, 1, 1],
}

## Scoring

The general scoring function is 

$$score(d,q) = \sum_{t \in q} w_{t,d} \times w_{t,q}$$

where $w_{t,d}$ is the term"s weight in the document and $w_{t,q}$ is the term"s weight in the query.

The `Scorer` class below provides an abstract implementation of the above function. For a specific instantiation,  you"ll need to create a child class and implement `_get_query_term_weight()` and `_get_doc_term_weight()`.

For your convenience, the collection is provided in the form of a term-document matrix.

In [None]:
class AbstractScorer(ABC):
    def __init__(self, td_matrix: TD_MATRIX_TYPE) -> None:
        """Initialize the scorer abstract class.

        Args:
            td_matrix: Dictionary of "term: term count" pairs.
        """
        self._td_matrix = td_matrix
        self._num_docs = len(list(td_matrix.values()))
        self._query_terms = None

    def _parse_query(self, query: str) -> None:
        """Parses the input query to a sequence of vocabulary terms and stores
        it in a member variable.
        """
        self._query_terms = [term for term in query.split() if term in self._td_matrix]


        
    def score_documents(self, query: str) -> DOCUMENT_SCORES_TYPE:
        """Score all documents in the collection.
        
        Params:
            query: Query string
        
        Returns:
            List of (document ID, score) tuples ordered by score descending, then by doc ID ascending.
        """
        scores = {doc_id: 0 for doc_id in range(self._num_docs)}
        self._parse_query(query)
        
        for term in set(self._query_terms):
            for doc_id in range(self._num_docs):
                scores[doc_id] += self._get_doc_term_weight(doc_id, term) * self._get_query_term_weight(term)
                
        return [(doc_id, score) for doc_id, score in sorted(scores.items(), key=lambda x: (x[1], -x[0]), reverse=True)]
        
    @abstractmethod
    def _get_query_term_weight(self, term: str) -> float:
        return 1
    
    @abstractmethod
    def _get_doc_term_weight(self, doc_id: int, term: str) -> float:
        return 0

## Task 1: Binary scorer

Set $w_{t,d}$ to 1 if $t$ is present in the document otherwise $0$.
Similarly, Set $w_{t,q}$ to 1 if $t$ is present in the query otherwise $0$.

This method will then score documents based on the number of matching (unique) query terms.

In [None]:
class BinaryScorer(AbstractScorer):
    
    def _get_query_term_weight(self, term: str) -> int:
        return 1 if term in self._query_terms else 0
    
    def _get_doc_term_weight(self, doc_id: int, term: str) -> int:
        return 1 if self._td_matrix[term][doc_id] > 0 else 0

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("td_matrix,query,correct_values", [
    (TD_MATRIX, "beijing", [(1, 1), (4, 1), (0, 0), (2, 0), (3, 0)]),
    (TD_MATRIX, "beijing duck recipe", [(4, 3), (1, 2), (2, 2), (0, 1), (3, 1)]),
])
def test_binary_scorer(td_matrix: TD_MATRIX_TYPE, query: str, correct_values: DOCUMENT_SCORES_TYPE):  
    scorer = BinaryScorer(td_matrix)
    assert scorer.score_documents(query) == correct_values

## Task 2: TF scorer

Set $w_{t,d}=\frac{c_{t,d}}{|d|}$, that is, the relative frequency of the term in the document.

For $w_{t,q}$, use the frequency (count) of the term in the query.

In [None]:
class TFScorer(AbstractScorer):
    
    def __init__(self, td_matrix: TD_MATRIX_TYPE) -> None:
        """Initialize TFScorer. Here, the lengths of documents are precomputed
        for more efficient scoring.

        Args:
            td_matrix: Dictionary of "term: term count" pairs.
        """
        super(TFScorer,self).__init__(td_matrix)
        # Pre-compute the length of documents for more efficient scoring.
        self._doc_len = {}
        for doc_id in range(self._num_docs):
            self._doc_len[doc_id] = sum(self._td_matrix[term][doc_id] for term in self._td_matrix.keys())
    
    def _get_query_term_weight(self, term: str) -> int:
        return self._query_terms.count(term)
    
    def _get_doc_term_weight(self, doc_id, term: str) -> float:
        return self._td_matrix[term][doc_id] / self._doc_len[doc_id]

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("td_matrix,query,correct_values", [
    (TD_MATRIX, "beijing", [(1, 0.25), (4, 0.25), (0, 0), (2, 0), (3, 0)]),
    (TD_MATRIX, "duck duck", [(0, 2), (1, 1), (2, 1), (4, 0.5), (3, 0)]),
    (TD_MATRIX, "beijing duck recipe", [(0, 1.0), (1, 0.75), (2, 0.75), (4, 0.75), (3, 0.5)]),
])
def test_tf_scorer(td_matrix: DOCUMENT_SCORES_TYPE, query: str, correct_values: DOCUMENT_SCORES_TYPE):  
    scorer = TFScorer(td_matrix)
    assert scorer.score_documents(query) == correct_values

## Task 3: TD-IDF scorer

Implement the scoring function 

$$score(d,q) = \sum_{t \in q} tf_{t,q} \times tf_{t,d} \times idf_t$$

Use normalized frequencies for TF weight, i.e., $tf_{t,d}=\frac{c_{t,d}}{|d|}$, where $c_{t,d}$ is the number of occurrences of term $t$ in document $d$ and $|d|$ is the document length (=total number of terms). (Analogously for the query.)

Compute IDF values using the following formula: $idf_{t}=\log \frac{N}{n_t}$, where $N$ is the total number of documents and $n_t$ is the number of documents that contain term $t$.  Use base-10 the logarithm.

In [None]:
class TFIDFScorer(AbstractScorer):
    
    def __init__(self, td_matrix: TD_MATRIX_TYPE) -> None:
        """Initializes TFIDFScorer. Here, both document lengts and IDF values
        are precomputes.

        Args:
            td_matrix: Dictionary of "term: term count" pairs.
        """
        super(TFIDFScorer,self).__init__(td_matrix)
        # Pre-compute the length of documents for more efficient scoring.
        self._doc_len = {}
        for doc_id in range(self._num_docs):
            self._doc_len[doc_id] = sum(self._td_matrix[term][doc_id] for term in self._td_matrix.keys())
        # Pre-compute IDF values.
        self._idf = {}
        for term, freqs in self._td_matrix.items():
            nt = sum(1 if f > 0 else 0 for f in freqs)
            self._idf[term] = math.log10(self._num_docs / nt)
    
    def _get_query_term_weight(self, term: str) -> float:
        return self._query_terms.count(term) / len(self._query_terms)
    
    def _get_doc_term_weight(self, doc_id: int, term: str) -> float:
        return self._td_matrix[term][doc_id] / self._doc_len[doc_id] * self._idf[term]

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("td_matrix,query,correct_values", [
    (TD_MATRIX, "beijing", [(1, 0.0995), (4, 0.0995), (0, 0), (2, 0), (3, 0)]),
    (TD_MATRIX, "duck duck", [(0, 0.0969), (1, 0.0485), (2, 0.0485), (4, 0.0242), (3, 0)]),
    (TD_MATRIX, "beijing duck recipe", [(4, 0.0597), (1, 0.0493), (3, 0.0369), (2, 0.0346), (0, 0.0323)]),
])
def test_tfidf_scorer(td_matrix: TD_MATRIX_TYPE, query: str, correct_values: DOCUMENT_SCORES_TYPE):  
    scorer = TFIDFScorer(td_matrix)
    ranking = scorer.score_documents(query)
    assert [x[0] for x in ranking] == [x[0] for x in correct_values]  # Checking ranking
    assert [x[1] for x in ranking] == pytest.approx([x[1] for x in correct_values], rel=1e-2)  # Checking scores

# Rocchio feedback

Compute an expanded query model using Rocchio feedback, given a set of positive and negative documents as expicit feedback.

In [None]:
pip install ipytest

In [None]:
import ipytest
import pytest
from typing import List

ipytest.autoconfig()

Vocabulary

In [None]:
VOCAB = ['news', 'about', 'presidental', 'campaign', 'food', 'text']

Query vector

In [None]:
Q = [1, 1, 1, 1, 0, 0]

Document-term matrix (each row corresponds to a document vector)

In [None]:
DT_MATRIX = [
    [1.5, 0.1, 0, 0, 0, 0],
    [1.5, 0.1, 0, 2, 2, 0],
    [1.5, 0, 3, 2, 0, 0],
    [1.5, 0, 4, 2, 0, 0], 
    [1.5, 0, 0, 6, 2, 0]
]

Feedback: IDs (indices) of positive and negative documents

In [None]:
D_POS = [2, 3]
D_NEG = [0, 1, 4]

## Rocchio feedback

Compute the updated query according to:
$$\vec{q}_m = \alpha \vec{q} + \frac{\beta}{|D^+|}\sum_{d \in D^+}\vec{d} - \frac{\gamma}{|D^-|}\sum_{d \in D^-}\vec{d}$$

where
  - $\vec{q}$ is the original query vector
  - $\vec{d}$ is the term vector of document $d$
  - $D^+, D^-$ are set of relevant and non-relevant feedback documents
  - $\alpha, \beta, \gamma$ are parameters that control the movement of the original vector

In [None]:
def get_updated_query(
    q: List[int], d_pos: List[int], d_neg: List[int], 
    alpha: float, beta: float, gamma: float
) -> List[int]:
    """Computes an updated query model using Rocchio feedback.
    
    Args:
        q: Query vector.
        d_pos: List of positive feedback document IDs.
        d_neg: List of positive feedback document IDs.
        alpha: Feedback parameter alpha.
        beta: Feedback parameter beta.
        gamma: Feedback parameter gamma.
    
    Returns:
        Updated query vector.
    """
    q_m = [alpha * t for t in q]
    
    # Positive feedback docs
    for idx in d_pos:
        for t in range(len(VOCAB)):
            q_m[t] += beta / len(d_pos) * DT_MATRIX[idx][t]
        
    # Negative feedback docs
    for idx in d_neg:
        for t in range(len(VOCAB)):
            q_m[t] -= gamma / len(d_neg) * DT_MATRIX[idx][t]
        
    return q_m

Tests.

In [None]:
%%run_pytest[clean]

def test_no_expansion():
    q_m = get_updated_query(Q, D_POS, D_NEG, 1, 0, 0)
    assert q_m == Q

def test_expansion():
    q_m = get_updated_query(Q, D_POS, D_NEG, 0.6, 0.2, 0.2)
    assert q_m == pytest.approx([0.600, 0.587, 1.300, 0.467, -0.267, 0], rel=1e-2)

# PageRank calculation

In [None]:
pip install ipytest

In [None]:
import ipytest
import pytest
from typing import Any, Dict, List, Set, Tuple

ipytest.autoconfig()

You're given a web graph in form of an edge list. Each edge is represented as a `(from_node, to_node)` tuple.
(We assume that there is at most one link between any pair of nodes and that the input is correct.)

## Input 1

![](https://raw.githubusercontent.com/iai-group/ir-course-2022/main/resources/pagerank1.png)

In [None]:
WEB_GRAPH_1 = [("A", "B"), ("A", "C"), ("B", "C"), ("C", "A")]

## Input 2

![](https://raw.githubusercontent.com/iai-group/ir-course-2022/main/resources/pagerank2.png)

Mind that this web graph contains rank sinks, i.e., nodes that have only incoming edges but no outgoing ones. You'll need to deal with those by adding an incoming link from all nodes (including the very node itself).

In [None]:
WEB_GRAPH_2 = [(1, 2), (1, 3), (3, 1), (3, 2), (3, 5), (4, 5), (4, 6), (5, 4), (5, 6), (6, 4)]

## Utilities

In [None]:
def get_all_nodes(web_graph: List[Tuple[Any, Any]]) -> Set[Any]:
    """Returns a list of nodes given a web graph.
    
    Params:
        web_graph: List of edges.

    Returns:
        Set of nodes.
    """
    nodes = set()
    for (from_node, to_node) in web_graph:
        nodes.add(from_node)
        nodes.add(to_node)
    
    return nodes

In [None]:
def get_outlinks_num(web_graph: List[Tuple[Any, Any]]) -> Dict[Any, int]:
    """Computes the number of outgoing links for each node in a web graph.
    
    Param:
        web_graph: List of edges.

    Returns:
        Dict with nodes as keys and the number of outgoing nodes as values.
    """
    outlinks = {node: 0 for node in get_all_nodes(web_graph)}
    for (from_node, to_node) in web_graph:
        outlinks[from_node] += 1    
    return outlinks

## PageRank calculation

The pagerank of a given node $a$ is computed using:

$$PR(a) = \frac{q}{T} + (1-q) \sum_{i=1}^n \frac{PR(p_i)}{L(p_i)}$$

where 
  - $q$ is the probability of jumping to a random page
  - $T$ is the total number of pages (nodes) in the Web graph
  - $p_1\dots p_n$ are pages that **point to** page $a$
  - $PR(p_i)$ is the PageRank value of page $p_i$
  - $L(p_i)$ is the number of outgoing links of page $p_i$

In [None]:
def pagerank(web_graph: List[Tuple[Any, Any]], q: float = 0.15, iterations: int = 3) -> Dict[Any, float]:
    """Computes PageRank for all nodes in a web graph.
    
    Params:
        web_graph: List of edges.
        q: Random jump probability.
        iterations: Number of iterations.
        
    Returns:
        Dict with node names as keys and PageRank scores as values.    
    """
    nodes = get_all_nodes(web_graph)
    # Calculate the number of outgoing links of each page.
    outlinks_num = get_outlinks_num(web_graph)
    # Collect all inlinks of a page for more efficient PageRank computation.
    inlinks = {node: [] for node in nodes}
    for (from_node, to_node) in web_graph:
        inlinks[to_node].append(from_node)
    
    # Identify and deal with rank sinks.
    for node, lnum in outlinks_num.items():
        if lnum == 0:
            print('Node {} is a rank sink!'.format(node))
            # Add links to all nodes (including the node itself).
            for to_node in nodes:
                inlinks[to_node].append(node)
            # Update outlinks count.
            outlinks_num[node] = len(nodes)
    
    # Initialize pagerank values.
    pr = {node: 1/len(nodes) for node in nodes}
    
    # Calculate pagerank scores iteratively.
    for i in range(iterations):
        pr_old = pr.copy()
        for node in pr.keys():
            pr[node] = q / len(nodes)
            # Iterating over all pages p_i that link to node. 
            for from_node in inlinks[node]:
                pr[node] += (1 - q) * pr_old[from_node] / outlinks_num[from_node]
    
    return pr

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("web_graph,q,iterations,correct_values", [
    (WEB_GRAPH_1, 0.5, 0, {"A": 1/3, "B": 1/3, "C": 1/3}),
    (WEB_GRAPH_1, 0.5, 1, {"A": 0.3333, "B": 0.25, "C": 0.4166}),
    (WEB_GRAPH_1, 0.5, 2, {"A": 0.375, "B": 0.25, "C": 0.375}),
    (WEB_GRAPH_1, 0.5, 3, {"A": 0.3541, "B": 0.2604, "C": 0.3854}),
    (WEB_GRAPH_2, 0.15, 0, {1: 1/6, 2: 1/6, 3: 1/6, 4: 1/6, 5: 1/6, 6: 1/6}),
    (WEB_GRAPH_2, 0.15, 1, {1: 0.0958, 2: 0.1666, 3: 0.1194, 4: 0.2611, 5: 0.1666, 6: 0.1902}),
    (WEB_GRAPH_2, 0.15, 2, {1: 0.0824, 2: 0.1231, 3: 0.0893, 4: 0.2811, 5: 0.1934, 6: 0.2304}),
])
def test_pagerank(web_graph, q, iterations, correct_values):    
    assert pagerank(web_graph, q=q, iterations=iterations) == pytest.approx(correct_values, rel=1e-3)

# Retrieval Evaluation

Compare the effectiveness of System A and System B on a test collection consisting of three queries. The table below contains the rankings generated by the two systems as well as the ground truth. We assume that relevance is binary, i.e., the ground truth column contains a set of the relevant documents. 

Document rankings produced by two systems and binary relevance judgements:

\begin{array}{|l|l|l|l|}
    \hline    
    \textbf{Query} & \textbf{System A ranking} & \textbf{System B ranking} & \textbf{Ground truth} \\
    \hline
    \hline
    Q1 & \textbf{1}, 2, 4, 5, \textbf{3}, 6, 9, 8, 10, 7 & 2, 4, \textbf{3}, 10, 5, 6, 7, 8, 9, \textbf{1} & 1, 3 \\
    \hline
    Q2 & 1, \textbf{2}, \textbf{4}, \textbf{5}, 3, 9, 8, \textbf{6}, 10, 7 & \textbf{5}, \textbf{6}, \textbf{4}, 1, 7, 8, 9, 10, 3, \textbf{2} & 2, 4, 5, 6 \\
    \hline
    Q3 & 1, \textbf{7}, 4, 5, 3, 6, 9, 8, 10, 2 & 2, 4, 3, \textbf{7}, 5, 6, 1, 8, 9, 10 & 7 \\
    \hline
  \end{array}

We boldface the relevant documents in the table for a better overview.

## Solution


First we compute effectiveness metrics for individual queries (rows 1--3 in the table below. Then, we average these number over the set of queries (row 4).


Effectiveness measures fo **System A**: 

\begin{array}{|l||c|c|c|c|}
    \hline    
    \textbf{Query} & P@5 & P@10 & (M)AP & (M)RR \\
    \hline
    \hline
	Q1 & 
		\frac{2}{5} & \frac{2}{10} & (\frac{1}{1} + \frac{2}{5}) / 2 & \frac{1}{1} \\
    \hline
	Q2 & 
		\frac{3}{5} & \frac{4}{10} & (\frac{1}{2} + \frac{2}{3} + \frac{3}{4} + \frac{4}{8}) / 4 & \frac{1}{2} \\
    \hline
	Q3 & 
	 \frac{1}{5} & \frac{1}{10} & (\frac{1}{2}) / 1 & \frac{1}{2} \\
    \hline
    \hline		    
	Average & 
		0.4 & 0.233 & 0.601 & 0.666 \\
    \hline
  \end{array}


Effectiveness measures fo **System B**: 

\begin{array}{|l||c|c|c|c|}
    \hline    
    \textbf{Query} & P@5 & P@10 & (M)AP & (M)RR \\
    \hline
    \hline
	Q1 & \frac{1}{5} & \frac{2}{10} & (\frac{1}{3} + \frac{2}{10}) / 2 & \frac{1}{3} \\
    \hline
	Q2 & \frac{3}{5} & \frac{4}{10} & (\frac{1}{1} + \frac{2}{2} + \frac{3}{3} + \frac{4}{10}) / 4  & \frac{1}{1} \\
    \hline
	Q3 & \frac{1}{5} & \frac{1}{10} & (\frac{1}{4}) / 1 & \frac{1}{4} \\
    \hline
    \hline		    
	Average & 0.333 & 0.233 & 0.455 & 0.527 \\
    \hline
  \end{array}

# Interpolated Precision

In this exercise, you'll have to calculate interpolated precision for generating smooth precision-recall graphs.

In [None]:
pip install ipytest

In [None]:
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

You're given precision and recall values measured at various rank positions (indexed from 0) for a given ranking.

In [None]:
precision = [1.0, 0.5, 0.67, 0.75, 0.8, 0.83, 0.71, 0.63, 0.56, 0.6]
recall = [0.17, 0.17, 0.33, 0.5, 0.67, 0.83, 0.83, 0.83, 0.83, 1.0]

We can plot these values:

In [None]:
plt.plot(recall, precision)
plt.ylabel("Precision")
plt.xlabel("Recall")
plt.show()

As you can see, this is not exactly a pretty plot. 

Instead, we'd like to report on standard recall levels R'=(0.0, 0.1, ... 1.0) using interpolated precision values.

We calculate interpolated precision at a given recall level using

$$P(R) = \max \{ P' : R' \geq R \wedge (R',P') \in S \} ~,$$

where S is the set of observed (R,P) points.

In [None]:
recall_levels = np.arange(0, 1.1, 0.1)

In [None]:
interpolated_precision = []
for r_prime in recall_levels:
    interpolated_precision.append(
        max([p for (p, r) in zip(precision, recall) if r >= r_prime])
    )

We can now generate a new plot using standard recall levels and interpolated precision values.

In [None]:
plt.plot(recall_levels, interpolated_precision)
plt.ylabel("Precision")
plt.xlabel("Recall")
plt.show()

# NDCG Calculation

In this exercise, you'll have to evaluate system rankings, by computing the Normalized Discounted Cumulative Gain (NDCG) measure.

In [None]:
pip install ipytest

In [None]:
import ipytest
import math
import pytest

from typing import Dict, List

ipytest.autoconfig()

### Rankings produced for each query

The key is the query ID (string), the value is a list of document IDs (ints).

In [None]:
system_rankings = {
    "q1": [2, 1, 3, 4, 5, 6, 10, 7, 9, 8],
    "q2": [1, 2, 9, 4, 5, 6, 7, 8, 3, 10],
    "q3": [1, 7, 4, 5, 3, 6, 9, 8, 10, 2]
}

### Ground truth

The key is the query ID, the value is a dictionary with (document ID, relevance) pairs. Relevance is measured on a 3-point scale: non-relevant (0), poor (1), good (2), excellent (3). Documents not listed here are non-relevant (relevance=0).

In [None]:
ground_truth = {
    "q1": {4: 3, 1: 2, 2: 1},
    "q2": {3: 3, 4: 3, 1: 2, 2: 1, 8: 1},
    "q3": {1: 3, 4: 3, 7: 2, 5: 2, 6: 1, 8: 1}
}

## Computing evaluation metrics

Discounted cumulative gain at rank $k$ is computed as:

$$DCG_k = rel_1 + \sum_{i=2}^k\frac{rel_i}{\log_2 i}$$

Normalized discounted cumulative gain at rank $k$ is computed as:

$$NDCG_k = \frac{DCG_k}{IDCG_k}$$

where $IDCG_k$ is the $DCG_k$ score of an idealized (perfect) ranking.

In [None]:
def dcg(relevances: List[int], k: int) -> float:
    """Computes DCG@k, given the corresponding relevance levels for a ranked list of documents.
    
    For example, given a ranking [2, 3, 1] where the relevance levels according to the ground 
    truth are {1:3, 2:4, 3:1}, the input list will be [4, 1, 3].
    
    Args:
        relevances: List with the ground truth relevance levels corresponding to a ranked list of documents.
        k: Rank cut-off.
        
    Returns:
        DCG@k (float).
    """
    # Note: Rank position is indexed from 1.
    return relevances[0] + sum(
        rel / math.log(i + 2, 2) 
         for i, rel in enumerate(relevances[1:k])
    )

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("relevances,k,correct_value", [
    ([4, 1, 3], 2, 5.0),
    ([4, 1, 3], 5, 6.893)
])
def test_dcg(relevances, k, correct_value):    
    assert dcg(relevances, k) == pytest.approx(correct_value, rel=1e-3)

In [None]:
def ndcg(system_ranking: List[int], ground_truth: List[int], k:int = 10) -> float:
    """Computes NDCG@k for a given system ranking.
    
    Args:
        system_ranking: Ranked list of document IDs (from most to least relevant).
        ground_truth: Dict with document ID: relevance level pairs. Document not present here are to be taken with relevance = 0.
        k: Rank cut-off.
    
    Returns:
        NDCG@k (float).
    """
    # Relevance levels for the ranked docs.
    relevances = [ground_truth.get(rank,0) for rank in system_ranking]

    # Relevance levels of the idealized ranking.
    relevances_ideal = sorted(ground_truth.values(), reverse=True)
    
    return dcg(relevances, k) / dcg(relevances_ideal, k)        

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("qid,k,correct_value", [
    ("q1", 5, 0.799),
    ("q1", 10, 0.799),
    ("q2", 5, 0.549),
    ("q2", 10, 0.705),
    ("q3", 5, 0.908),
    ("q3", 10, 0.949),
])
def test_queries(qid, k, correct_value):    
    assert ndcg(system_rankings[qid], ground_truth[qid], k) == pytest.approx(correct_value, rel=1e-3)


# Student's paired sample t-test

In this exercise you'll need to complete the code for computing the t statistic and p-value of a Student's paired sample t-test.

$H_0: \bar{x}_A = \bar{x}_B$

$H_1: \bar{x}_A \neq \bar{x}_B$

In [None]:
pip install ipytest

In [None]:
import ipytest
import numpy as np
import pytest
from scipy import stats
from typing import List

ipytest.autoconfig()

## t statistic

$$t = \frac{\bar{x}_D}{\frac{s_D}{\sqrt[]{n}}}$$

with $\bar{x}_D$ and $s_D$ as the average and standard deviation of the differences between all pair.

In [None]:
def t_stat(a: List[float], b: List[float], n: int) -> float:
    """Computes the t statistic between two systems.
    
    Args:
      a: System A recorded metric for each topic.
      b: System B recorded metric for each topic.
      n: Size of the sample.

    Retuns:
      t statistic for t-test between two systems.
    """
    n = min(len(a), n)
    x = np.array(a[:n]) - np.array(b[:n])

    x_D = np.mean(x)
    s_D = np.sqrt(sum((x-x_D)**2) / (n-1))

    return x_D / (s_D/np.sqrt(n))

# p-value

$$\text{p-value} = P(T(X^*) \leq T(X_0) \mid H_0) + P(T(X^*) \geq T(X_0) \mid H_0)$$

Each probability composing the p-value can be computed using the cumulative distribution function (CDF) of the t-Student distribution. SciPy has an implementation of the [CDF](https://docs.scipy.org/doc/scipy-1.9.1/reference/generated/scipy.stats.t.html): `cdf(x, df, loc=0, scale=1)`.

In [None]:
def p_value(n: int, t_stat: float) -> float:
    """Computes the p-value.
    
    Args:
      n: Size of the sample.
      t_stat: t statisitic.
      
    Returns:
      p-value for t statistic.
    """
    df = n - 1
    p = (1.0 - stats.t.cdf(abs(t_stat), df)) * 2.0
    return p

## Tests

In [None]:
%%ipytest

def test_lecture_example():
    system_A = [0.2215, 0.3924, 0.654, 0.5611, 0.9186, 0.1104, 0.6086, 0.5062, 0.9688, 0.995]
    system_B = [0.0765, 0.0426, 0.5738, 0.1571, 0.9881, 0.7164, 0.7507, 0.435, 0.3959, 0.8709]
    n = len(system_A)
    t = t_stat(system_A, system_B, n)
    p = p_value(n, t)

    assert t.round(3) == 0.897
    assert p.round(3) == 0.393

# Elasticsearch

Run this example to index and search a toy-sized collection of documents using Elasticsearch.  There is nothing for you to add/complete here, it's just to make sure you're all set for the next exercise.

Before starting, make sure that you've 

1. Downloaded and started Elasticsearch
1. Installed the `elasticsearch` Python package
  - It's part of the standard Anaconda distribution; otherwise, you can run `conda install elasticsearch`.

In [None]:
!pip install ipytest
!pip install elasticsearch==7.9.0

In [None]:
%%bash

wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512
tar -xzf elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
sudo chown -R daemon:daemon elasticsearch-7.9.2/
shasum -a 512 -c elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512 

In [None]:
%%bash --bg

sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch

In [None]:
# Sleep for few seconds to let the instance start.
import time
time.sleep(20)

In [None]:
from elasticsearch import Elasticsearch
from pprint import pprint

In [None]:
INDEX_NAME = "toy_index"  # the name of the index

INDEX_SETTINGS = {  # single shard with a single replica
    "settings" : {
        "index" : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        }
    }
}

The collection of documents is given here as a Python dictionary. Each document has two fields: title and content.

In [None]:
DOCS = {
    1: {"title": "Rap God",
        "content": "gonna, gonna, Look, I was gonna go easy on you and not to hurt your feelings"
        },
    2: {"title": "Lose Yourself",
        "content": "Yo, if you could just, for one minute Or one split second in time, forget everything Everything that bothers you, or your problems Everything, and follow me"
        },
    3: {"title": "Love The Way You Lie",
        "content": "Just gonna stand there and watch me burn But that's alright, because I like the way it hurts"
        },
    4: {"title": "The Monster",
        "content": ["gonna gonna I'm friends with the monster", "That's under my bed Get along with the voices inside of my head"]
        },
    5: {"title": "Beautiful",
        "content": "Lately I've been hard to reach I've been too long on my own Everybody has a private world Where they can be alone"
        }
}  # Eminem rulez ;)

### Create Elasticsearch object

In [None]:
es = Elasticsearch()

Check if service is running

In [None]:
es.info()

### Create index

If the index exists, we delete it (normally, you don't want to do this).

In [None]:
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

We set the number of shards and replicas to be used for each index when it's created. (We use a single shard instead of the default 5.)

In [None]:
es.indices.create(index=INDEX_NAME, body=INDEX_SETTINGS)

### Add documents to the index

In [None]:
for doc_id, doc in DOCS.items():
    es.index(index=INDEX_NAME, doc_type="_doc", id=doc_id, body=doc)

### Check what has been indexed

Get the contents of doc #3

In [None]:
doc = es.get(index=INDEX_NAME, id=3)

In [None]:
pprint(doc)

Get the term vector for doc #3.

`termvectors` returns information and statistics on terms in the fields of a particular document.

In [None]:
tv = es.termvectors(index=INDEX_NAME, doc_type="_doc", id=3, fields="title,content", term_statistics=True)

In [None]:
pprint(tv)

Interpretation of the returned values
  * `[{field}]['field_statistics']`: 
    - `doc_count`: how many documents contain this field
    - `sum_ttf`: the sum of all term frequencies in this field
  * `[{field}][{term}]`:
    - `doc_freq`: how many document contain this term
    - `term_freq`: frequency (number of occurrences) of the term in this document field
    - `ttf`: total term frequency, i.e., number of occurrences of the term in this field in all documents

Note that Elasticsearch splits indices into multiple shards (by default: 5). This means that when you ask for term statistics, these are computed by shard. In case of a large collection, this is typically not an issue as the statistics become "normalized" across the different shards and the differences are negligible. For smaller collections that fit on a single disk, you may set the number of shards to 1 to avoid this issue alltogether (like we've done in this example in `INDEX_SETTINGS`).

Check the following documents for further information:
  - https://www.elastic.co/guide/en/elasticsearch/reference/6.2/_basic_concepts.html
  - https://www.elastic.co/blog/practical-bm25-part-1-how-shards-affect-relevance-scoring-in-elasticsearch

### Search

In [None]:
query = "rap monster"
res = es.search(index=INDEX_NAME, q=query, _source=False, size=10)

Print full response (`hits` holds the results)

In [None]:
pprint(res)

Print only search results (ranked list of docs)

In [None]:
for hit in res["hits"]["hits"]:
    print("Doc ID: %3r  Score: %5.2f" % (hit["_id"], hit["_score"]))

## Elasticsearch query language

Elasticsearch supports structured queries based on its own [DSL query language](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html).

Mind that certain queries expect analyzed query terms (e.g., [term queries](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-term-query.html)), while other query types (e.g., [match](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query.html)) perform analysis as part of the processing. Make sure you check the respective documentation carefully.

### Building a second toy index with position information

In [None]:
INDEX_NAME2 = "toy_index2"  

INDEX_SETTINGS2 = {
    "settings" : {
        "index" : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        },
        "analysis": {
            "analyzer": {
                "my_english_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "stopwords": "_english_",
                    "filter": [
                        "lowercase",
                        "english_stop",
                        "filter_english_minimal"
                    ]                
                }
            },
            "filter" : {
                "filter_english_minimal" : {
                    "type": "stemmer",
                    "name": "minimal_english"
                },
                "english_stop": {
                    "type": "stop",
                    "stopwords": "_english_"
                }
            },
        }
    },
    "mappings": {
        "properties": {
            "title": {
                "type": "text",
                "term_vector": "with_positions",
                "analyzer": "my_english_analyzer"
            },
            "content": {
                "type": "text",
                "term_vector": "with_positions",
                "analyzer": "my_english_analyzer"
            }
        }
    }
}

In [None]:
if es.indices.exists(INDEX_NAME2):
    es.indices.delete(index=INDEX_NAME2)
    
es.indices.create(index=INDEX_NAME2, body=INDEX_SETTINGS2)

In [None]:
for doc_id, doc in DOCS.items():
    es.index(index=INDEX_NAME2, doc_type="_doc", id=doc_id, body=doc)

Check that term position information has been added to the index

In [None]:
tv = es.termvectors(index=INDEX_NAME2, doc_type="_doc", id=3, fields="title", term_statistics=True)

pprint(tv)

### Examples

Searching for documents that must match a [boolean combination](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html) of multiple terms (in any order).  

In [None]:
query = {
    "bool": {
        "must": [
            {"match": {"content": "gonna"}}, 
            {"match": {"content": "monster"}}
        ]
    }
}

res = es.search(index=INDEX_NAME2, body={"query": query})

pprint(res)

Searching for documents that match an [extract phrase](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-match-query-phrase.html) (terms in that exact order).

In [None]:
query = {"match_phrase": {"content": "split second"}}

res = es.search(index=INDEX_NAME2, body={'query': query})

pprint(res)

# Elasticsearch

In this exercise, you'll first build an Elasticsearch index of a toy document collection, then request various term statistics from that index.

Remember to make sure that the Elasticsearch service is running (i.e., has been started in a terminal window).

See [this document](Elasticsearch.md) for help on Elasticsearch usage.

In [None]:
!pip install ipytest
!pip install elasticsearch==7.9.0

In [None]:
%%bash

wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512
tar -xzf elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
sudo chown -R daemon:daemon elasticsearch-7.9.2/
shasum -a 512 -c elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512 

In [None]:
%%bash --bg

sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch

In [None]:
# Sleep for few seconds to let the instance start.
import time
time.sleep(20)

In [None]:
from elasticsearch import Elasticsearch
from typing import Dict, List, Optional

import ipytest
import pytest

ipytest.autoconfig()

This is to check that the Elasticsearch service is running on your machine.

In [None]:
es = Elasticsearch()

## Indexing

We use a toy data collection with 5 documents, each with title and content fields.

In [None]:
DOCS = [
    {"doc_id": "D1",
     "title": "First document",
     "content": "House on the hill"
    },
    {"doc_id": "D2",
     "title": "Second title",
     "content": "Downtown Stavanger is beautiful"
    },
    {"doc_id": "D3",
     "title": "First, second, and third",
     "content": "Never step on snakes"
    },
    {"doc_id": "D4",
     "title": "Document number four",
     "content": "House, house. It's a beautiful house you have"
    },
    {"doc_id": "D5",
     "title": "This document is the last document",
     "content": "There can be only one matching result"
    }    
]

In [None]:
INDEX_SETTINGS = {  # single shard with a single replica
    "settings" : {
        "index" : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        }
    }
}

In [None]:
INDEX_NAME = "test_e6-3"

In [None]:
if es.indices.exists(INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)
es.indices.create(index=INDEX_NAME, body=INDEX_SETTINGS)

Add documents in `DOC` to the index.

In [None]:
for doc in DOCS:
    es.index(index=INDEX_NAME, doc_type="_doc", id=doc["doc_id"],
             body={"title": doc["title"], "content": doc["content"]})

## Term statistics

Complete the methods below for getting various term statistics from the index.

Consult [this notebook](2-Elasticsearch.ipynb) for the interpretation of term vector statistics.

In [None]:
def get_doc_term_freqs(index_name: str, doc_id: str, field: str) -> Dict[str, int]:
    """Returns the terms along with their frequencies contained in a given document.
    
    Args:
        index_name: Name of index.
        doc_id: Document ID.
        field: Field name.
    
    Returns:
        Dict with terms as keys and corresponding frequencies (i.e., 
        number of occurrences within the given document field) as values.
    """
    tv = es.termvectors(index=index_name, doc_type="_doc", id=doc_id, fields=field, term_statistics=True)
    if tv["_id"] != doc_id:
        return None
    if field not in tv["term_vectors"]:
        return None
    term_freqs = {}
    for term, term_stat in tv["term_vectors"][field]["terms"].items():
        term_freqs[term] = term_stat["term_freq"]
    return term_freqs

In [None]:
def get_doc_field_len(index_name: str, doc_id: str, field: str) -> int:
    """Returns the length of a given document field.
    
    Length is defined as the total number of terms contained in that field.
    
    Args:
        index_name: Name of index.
        doc_id: Document ID.
        field: Field name.
    
    Returns:
        Field length.    
    """
    term_freqs = get_doc_term_freqs(index_name, doc_id, field)
    if term_freqs is not None:
        return sum(term_freqs.values())
    return None

In [None]:
def get_doc_containing_term(index_name: str, field: str, term: str) -> Optional[str]:
    """Returns any document ID that contains term in a given field or None.
    
    Args:
        index_name: Name of index.
        field: Field name.
        term: Term.

    Returns:
        ID of a document that contains `term` or None.
    """
    # Use a boolean query to find a document that contains the term.
    hits = es.search(index=index_name, body={"query": {"match": {field: term}}},
                               _source=False, size=1).get("hits", {}).get("hits", {})
    return hits[0]["_id"] if len(hits) > 0 else None

In [None]:
def get_term_doc_count(index_name: str, field: str, term: str) -> int:
    """Returns the total number of documents that contain a given term within a specific field.
    
    Args:
        index_name: Name of index.
        field: Field name.
        term: Term.
        
    Returns:
        Number of documents that contain the given term within `field`.
    """
    # Find a document that contains the term.
    doc_id = get_doc_containing_term(index_name, field, term)
    if doc_id is None:
        return 0
    # Request term statistics for that document and extract the 
    # requested information from there.
    tv = es.termvectors(index=index_name, doc_type="_doc", id=doc_id, fields=field, term_statistics=True)
    return tv["term_vectors"][field]["terms"][term]["doc_freq"]

In [None]:
def get_term_coll_freq(index_name: str, field: str, term: str) -> int:
    """Returns the total collection term frequency of a term in a given field.
    
    Args:
        index_name: Name of index.
        field: Field name.
        term: Term.
        
    Returns:
        Total number of occurrences of `term` in all documents within `field`.
    """
    # Find a document that contains the term.
    doc_id = get_doc_containing_term(index_name, field, term)
    if doc_id is None:
        return 0
    # Request term statistics for that document and extract the 
    # requested information from there.
    tv = es.termvectors(index=index_name, doc_type="_doc", id=doc_id, fields=field, term_statistics=True)
    return tv["term_vectors"][field]["terms"][term]["ttf"]

Tests.

In [None]:
%%run_pytest[clean]

def test_doc_term_freqs():
    assert get_doc_term_freqs(INDEX_NAME, "D2", "title") == {"second": 1, "title": 1}
    assert get_doc_term_freqs(INDEX_NAME, "D4", "content") == {"a": 1, "beautiful": 1, "have": 1,
                                                               "house": 3, "it's": 1, "you": 1}    
def test_doc_field_len():
    assert get_doc_field_len(INDEX_NAME, "D2", "title") == 2
    assert get_doc_field_len(INDEX_NAME, "D4", "content") == 8
    
def test_doc_containing_term():
    assert get_doc_containing_term(INDEX_NAME, "title", "document") in ["D1", "D4", "D5"]
    assert get_doc_containing_term(INDEX_NAME, "content", "house") in ["D1", "D4"]
    
def test_term_doc_count():
    assert get_term_doc_count(INDEX_NAME, "title", "document") == 3
    assert get_term_doc_count(INDEX_NAME, "content", "house") == 2    
    
def test_term_coll_freq():
    assert get_term_coll_freq(INDEX_NAME, "title", "this") == 1
    assert get_term_coll_freq(INDEX_NAME, "title", "document") == 4
    assert get_term_coll_freq(INDEX_NAME, "content", "house") == 4       

# Trivia quiz using DBpedia

In this exercise, you'll have to answer some trivia questions with the help of DBpedia.  

In [None]:
pip install ipytest

In [None]:
from typing import Any, Tuple

import hashlib
import numpy as np
import requests
import urllib.parse

Utility method for fetching all triples associated with a given entity from DBpedia.

In [None]:
def get_triples(entity_id: str) -> Tuple[str, str, Any]:
    """Returns all triples from DBpedia associated with a given entity.
        
    Args:
        entity_id: Unique entity identifier (e.g., "Kimi_Räikkönen")
    
    Yields:
        (subject, predicate, object) triples where the given entity
        stands either as subject or as object.
        
    Raises:
        ValueError: If invalid or not the canonical entity ID is provided.
    """
    # Fetch DBpedia entry in JSON format.
    # <SPO> triples are represented in the JSON as {S: P: [O1, O2, ...]},
    # i.e., multi-valued predicates are grouped together.
    data = requests.get(f"http://dbpedia.org/data/{entity_id}.json").json()
    
    # Check whether valid entity ID is provided.
    entity_uri = f"http://dbpedia.org/resource/{entity_id}"
    if entity_uri not in data:
        raise ValueError("Invalid entity ID.")
        
    # Check whether this is the canonical entity ID.
    if "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" not in data[entity_uri]:
        raise ValueError("Not the canonical entity ID.")

    for subj, vals in data.items():                
        for pred, objs in vals.items():
            for obj in objs:
                yield subj, pred, obj["value"]

## Example

Compare with the information at https://dbpedia.org/page/Kimi_R%C3%A4ikk%C3%B6nen.

In [None]:
triples_kimi = [(s, p, o) for s, p, o in get_triples("Kimi_Räikkönen")]

for s, p, o in triples_kimi:
    print(s, p, o)

Types.

In [None]:
for s, p, o in triples_kimi:
    if s != "http://dbpedia.org/resource/Kimi_Räikkönen":
        continue
    if p == "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
        print(o)

Birth place and date.

In [None]:
for s, p, o in triples_kimi:
    if s != "http://dbpedia.org/resource/Kimi_Räikkönen":
        continue
    if "birthPlace" in p or "birthDate" in p:
        print(o)

## Quiz questions

Answer the questions below.

You're allowed to find the exact answers either manually or programmatically, as long as you get the underlying data programmatically using the provided `fetch_entity()` method.

Since tests for this exercise would reveal the correct answer, they are based on MD5 hashes.

### 1) How many Formula 1 Grand Prixes has Kimi Räikkönen won?

In [None]:
wins = [s for s, p, o in triples_kimi if "ontology/firstDriver" in p and "Grand_Prix" in s]
num_wins = len(wins)

In [None]:
assert hashlib.md5(np.int8(num_wins)).hexdigest() == "f5a7e477cd3042b49a9085d62307cd28"

### 2) Is Kimi Räikkönen married?

Return the string "Y" or "N".

In [None]:
married = "N"
for s, p, o in triples_kimi:
    if "ontology/spouse" in p:
        married = "Y"
        break

In [None]:
assert hashlib.md5(bytes(married, "utf-8")).hexdigest() == "57cec4137b614c87cb4e24a3d003a3e0"

### 3) In which country did Kimi win his first Grand Prix?

Return the canonical name of the country as a string.

Utility functions to help answer this question.

In [None]:
def get_entity_id(uri: str) -> str:
    """Return the entity_id part of an URI (part after the last slash)."""
    return uri.split("/")[-1]

def is_country(uri: str) -> bool:
    """Determines if a given URI is a country."""
    for _, p, o in get_triples(get_entity_id(uri)):
        if p != "http://www.w3.org/1999/02/22-rdf-syntax-ns#type":
            continue
        if o == "http://dbpedia.org/ontology/Country":
            return True
    return False

In [None]:
# Get first win.
first_win = None
for s, p, o in triples_kimi:
    if "ontology/firstWin" in p:
        first_win = get_entity_id(o)  # Keep only entity_id, not the whole URI.
        break

# Determine the location of that GP.
country = None
for _, p, o in get_triples(first_win):
    if "ontology/location" in p:
        if is_country(o):
            country = get_entity_id(o)

In [None]:
assert hashlib.md5(bytes(country, "utf-8")).hexdigest() == "3f0e49c46cbde0c7adf5ea04a97ab261"

# Bigram matches in Elasticsearch

This exercise is about getting ordered and unordered bigram matches using Elasticsearch.

In [None]:
!pip install ipytest

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ipytest
  Downloading ipytest-0.12.0-py3-none-any.whl (15 kB)
Collecting pytest>=5.4
  Downloading pytest-7.1.3-py3-none-any.whl (298 kB)
[K     |████████████████████████████████| 298 kB 4.3 MB/s 
Collecting iniconfig
  Downloading iniconfig-1.1.1-py2.py3-none-any.whl (5.0 kB)
Collecting pluggy<2.0,>=0.12
  Downloading pluggy-1.0.0-py2.py3-none-any.whl (13 kB)
Collecting jedi>=0.10
  Downloading jedi-0.18.1-py2.py3-none-any.whl (1.6 MB)
[K     |████████████████████████████████| 1.6 MB 32.3 MB/s 
Installing collected packages: pluggy, jedi, iniconfig, pytest, ipytest
  Attempting uninstall: pluggy
    Found existing installation: pluggy 0.7.1
    Uninstalling pluggy-0.7.1:
      Successfully uninstalled pluggy-0.7.1
  Attempting uninstall: pytest
    Found existing installation: pytest 3.6.4
    Uninstalling pytest-3.6.4:
      Successfully uninstalled pytest-3.6.4
Successfull

In [None]:
!pip install elasticsearch==7.9.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting elasticsearch==7.9.0
  Downloading elasticsearch-7.9.0-py2.py3-none-any.whl (213 kB)
[K     |████████████████████████████████| 213 kB 4.1 MB/s 
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.9.0


In [None]:
%%bash

wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
wget -q https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512
tar -xzf elasticsearch-oss-7.9.2-linux-x86_64.tar.gz
sudo chown -R daemon:daemon elasticsearch-7.9.2/
shasum -a 512 -c elasticsearch-oss-7.9.2-linux-x86_64.tar.gz.sha512 

elasticsearch-oss-7.9.2-linux-x86_64.tar.gz: OK


In [None]:
%%bash --bg

sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch

In [None]:
# Sleep for few seconds to let the instance start.
import time
time.sleep(20)

In [None]:
%%bash

ps -ef | grep elasticsearch

root         168     166  0 08:42 ?        00:00:00 sudo -H -u daemon elasticsearch-7.9.2/bin/elasticsearch
daemon       169     168 99 08:42 ?        00:00:20 /content/elasticsearch-7.9.2/jdk/bin/java -Xshare:auto -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -XX:+ShowCodeDetailsInExceptionMessages -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dio.netty.allocator.numDirectArenas=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.locale.providers=SPI,COMPAT -Xms1g -Xmx1g -XX:+UseG1GC -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -Djava.io.tmpdir=/tmp/elasticsearch-7080371589566594553 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -Xlog:gc*,gc+age=trace,safepoint:file=logs/gc.log:utctime,pid,tags:filecou

In [None]:
%%bash

curl -sX GET "localhost:9200/"

{
  "name" : "449c557fdc0b",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "VkbUjcNeQ7-fcKUWfXBlXA",
  "version" : {
    "number" : "7.9.2",
    "build_flavor" : "oss",
    "build_type" : "tar",
    "build_hash" : "d34da0ea4a966c4e49417f2da2f244e3e97b4e6e",
    "build_date" : "2020-09-23T00:45:33.626720Z",
    "build_snapshot" : false,
    "lucene_version" : "8.6.2",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}


In [None]:
from collections import Counter
from elasticsearch import Elasticsearch
from pprint import pprint

import ipytest
import pytest

ipytest.autoconfig()

## Indexing a toy collection 

This time, we store **term position information** and perform minimal stemming, i.e., removing only plurals (for that, we specify a custom analyzer).

Check the [Elasticsearch documentation on analyzers](https://www.elastic.co/guide/en/elasticsearch/reference/current/analyzer.html).

In [None]:
INDEX_NAME = "toy_index"  

INDEX_SETTINGS = {
    'settings' : {
        'index' : {
            "number_of_shards" : 1,
            "number_of_replicas" : 1
        },
        'analysis': {
            'analyzer': {
                'my_english_analyzer': {
                    'type': "custom",
                    'tokenizer': "standard",
                    'stopwords': "_english_",
                    'filter': [
                        "lowercase",
                        "english_stop",
                        "filter_english_minimal"
                    ]                
                }
            },
            'filter' : {
                'filter_english_minimal' : {
                    'type': "stemmer",
                    'name': "minimal_english"
                },
                'english_stop': {
                    'type': "stop",
                    'stopwords': "_english_"
                }
            },
        }
    },
    'mappings': {
        'properties': {
            'title': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            },
            'content': {
                'type': "text",
                'term_vector': "with_positions",
                'analyzer': "my_english_analyzer"
            }
        }
    }
}

In [None]:
DOCS = {
    1: {"title": "Rap God",
        "content": "gonna, gonna, Look, I was gonna go easy on you and not to hurt your feelings"
        },
    2: {"title": "Lose Yourself",
        "content": "Yo, if you could just, for one minute Or one split second in time, forget everything Everything that bothers you, or your problems Everything, and follow me"
        },
    3: {"title": "Love The Way You Lie",
        "content": "Just gonna stand there and watch me burn But that's alright, because I like the way it hurts"
        },
    4: {"title": "The Monster",
        "content": ["gonna gonna I'm friends with the monster", "That's under my bed Get along with the voices inside of my head"]
        },
    5: {"title": "Beautiful",
        "content": "Lately I've been hard to reach I've been too long on my own Everybody has a private world Where they can be alone"
        },
    6: {"title": "Fake Eminem 1",
        "content": "This is not real Eminem, just some text to get more matches for a split second for a split second."
        },
    7: {"title": "Fake Eminem 2",
        "content": "I have a monster friend and I'm friends with the monster and then there are some more friends who are monsters."
        },
}

In [None]:
ES_NODES = "http://localhost:9200"
es = Elasticsearch(hosts = [ES_NODES])

In [None]:
es.info()

{'name': '449c557fdc0b',
 'cluster_name': 'elasticsearch',
 'cluster_uuid': 'VkbUjcNeQ7-fcKUWfXBlXA',
 'version': {'number': '7.9.2',
  'build_flavor': 'oss',
  'build_type': 'tar',
  'build_hash': 'd34da0ea4a966c4e49417f2da2f244e3e97b4e6e',
  'build_date': '2020-09-23T00:45:33.626720Z',
  'build_snapshot': False,
  'lucene_version': '8.6.2',
  'minimum_wire_compatibility_version': '6.8.0',
  'minimum_index_compatibility_version': '6.0.0-beta1'},
 'tagline': 'You Know, for Search'}

In [None]:
if es.indices.exists(index=INDEX_NAME):
    es.indices.delete(index=INDEX_NAME)

In [None]:
es.indices.create(index=INDEX_NAME, body=INDEX_SETTINGS)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'toy_index'}

Testing our analyzer.

In [None]:
es.indices.analyze(index=INDEX_NAME, body={'analyzer': 'my_english_analyzer', 'text': 'monsters in my bed'})

{'tokens': [{'token': 'monster',
   'start_offset': 0,
   'end_offset': 8,
   'type': '<ALPHANUM>',
   'position': 0},
  {'token': 'my',
   'start_offset': 12,
   'end_offset': 14,
   'type': '<ALPHANUM>',
   'position': 2},
  {'token': 'bed',
   'start_offset': 15,
   'end_offset': 18,
   'type': '<ALPHANUM>',
   'position': 3}]}

In [None]:
for doc_id, doc in DOCS.items():
    es.index(index=INDEX_NAME, id=doc_id, body=doc)

Notice that you also get term position information when requesting a term vector.

In [None]:
tv = es.termvectors(index=INDEX_NAME, id=2, fields='title,content')
pprint(tv)

{'_id': '2',
 '_index': 'toy_index',
 '_type': '_doc',
 '_version': 1,
 'found': True,
 'term_vectors': {'content': {'field_statistics': {'doc_count': 7,
                                                   'sum_doc_freq': 85,
                                                   'sum_ttf': 101},
                              'terms': {'bother': {'term_freq': 1,
                                                   'tokens': [{'position': 18}]},
                                        'could': {'term_freq': 1,
                                                  'tokens': [{'position': 3}]},
                                        'everything': {'term_freq': 3,
                                                       'tokens': [{'position': 15},
                                                                  {'position': 16},
                                                                  {'position': 23}]},
                                        'follow': {'term_freq': 1,
                    

## Recovering ordered sequence of terms from inverted index

This method returns the sequence of terms for a given document field, with `None` values for stopwords that got removed.

In [None]:
def get_term_sequence(es, doc_id, field):
    tv = es.termvectors(index=INDEX_NAME, id=doc_id, fields=[field])
    # We first put terms in a position-indexed dict.
    pos = {}
    for term, tinfo in tv['term_vectors'][field]['terms'].items():
        for token in tinfo['tokens']:
            pos[token['position']] = term
    # Then, turn that dict to a list.
    seq = [None] * (max(pos.keys()) + 1)
    for p, term in pos.items():
        seq[p] = term
    return seq

Tests.

In [None]:
%%run_pytest[clean]

def test_get_term_sequence():
    assert get_term_sequence(es, 4, 'title') == [None, 'monster']
    assert get_term_sequence(es, 7, 'content') == ['i', 'have', None, 'monster', 'friend', None, "i'm", 'friend', None, None, 'monster', None, None, None, None, 'some', 'more', 'friend', 'who', None, 'monster']

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


[32m.[0m[32m                                                                                            [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.03s[0m[0m


## Getting ordered bigram matches

Use the `get_term_sequence()` method to get the document field's content as a sequence of terms, then check for ordered bigram matches yourself.

In [None]:
def count_ordered_bigram_matches(es, doc_id, field, bigram):
    """Counts the number of ordered bigram matches in a given document field. 
    
    Args:
        es: Elasticsearch instance
        doc_id: Document ID
        field: Document field
        bigram: A sequence of two terms given as a list
    
    Returns:
        Number of times the bigram can be found in this exact order.
    """
    # Get the document field's content as a sequence of terms.
    text = get_term_sequence(es, doc_id, field)
    # Count the number of matches    
    count = 0
    for i in range(len(text) - 1):
        if text[i] == bigram[0]:
            if text[i + 1] == bigram[1]:
                count += 1
    return count

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize('doc_id, field, bigram, correct_value', [
    (6, 'content', ['split', 'second'], 2),
    (2, 'content', ['split', 'second'], 1),
    (1, 'content', ['split', 'second'], 0),
])
def test_count_ordered_bigram_matches(doc_id, field, bigram, correct_value):
    assert count_ordered_bigram_matches(es, doc_id, field, bigram) == correct_value

[32m.[0m[32m.[0m[32m.[0m[32m                                                                                          [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.04s[0m[0m


%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


## Getting unordered bigram matches

As before, use the `get_term_sequence()` method to get the document field's content as a sequence of terms, then check for ordered bigram matches yourself.

In [None]:
def count_unordered_bigram_matches(es, doc_id, field, bigram, w=4):
    """Counts the number of unordered bigram matches in a given document field. 
    
    Args:
        es: Elasticsearch instance
        doc_id: Document ID
        field: Document field
        bigram: A sequence of two terms given as a list
        w: The maximum distance between the two query terms that still counts as a match
    
    Returns:
        Number of times the bigram can be found within a distance of w from each other in any order.
    """
    text = get_term_sequence(es, doc_id, "content")
    count = 0
    for i in range(len(text) - 1):
        if text[i] in bigram:
            other_term = bigram[0] if text[i] == bigram[1] else bigram[1]
            count += Counter(text[i+1:i+w])[other_term]
    return count

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize('doc_id, field, bigram, correct_value', [
    (7, 'title', ['friend', 'monster'], 3),
    (4, 'title', ['friend', 'monster'], 1),
    (1, 'title', ['friend', 'monster'], 0),
])
def test_count_ordered_bigram_matches(doc_id, field, bigram, correct_value):
    assert count_unordered_bigram_matches(es, doc_id, field, bigram) == correct_value

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


[32m.[0m[32m.[0m[32m.[0m[32m                                                                                          [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.06s[0m[0m


## Feedback

Please give (anonymous) feedback on this exercise by filling out [this form](https://forms.gle/2jPayczbFhEcC9K68).

# Entity linking

You are provided with the data from a knowledge graph and asked to annotate an input document using a general entity linking pipeline approach, consisting of mention detection, candidate selection, and disambiguation steps. 

In [None]:
pip install ipytest

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ipytest
  Downloading ipytest-0.12.0-py3-none-any.whl (15 kB)
Collecting pytest>=5.4
  Downloading pytest-7.1.3-py3-none-any.whl (298 kB)
[K     |████████████████████████████████| 298 kB 5.5 MB/s 
Collecting iniconfig
  Downloading iniconfig-1.1.1-py2.py3-none-any.whl (5.0 kB)
Collecting pluggy<2.0,>=0.12
  Downloading pluggy-1.0.0-py2.py3-none-any.whl (13 kB)
Collecting jedi>=0.10
  Downloading jedi-0.18.1-py2.py3-none-any.whl (1.6 MB)
[K     |████████████████████████████████| 1.6 MB 45.6 MB/s 
Installing collected packages: pluggy, jedi, iniconfig, pytest, ipytest
  Attempting uninstall: pluggy
    Found existing installation: pluggy 0.7.1
    Uninstalling pluggy-0.7.1:
      Successfully uninstalled pluggy-0.7.1
  Attempting uninstall: pytest
    Found existing installation: pytest 3.6.4
    Uninstalling pytest-3.6.4:
      Successfully uninstalled pytest-3.6.4
Successfull

In [None]:
import ipytest
import pytest

ipytest.autoconfig()

## 1) Mention detection

You are given an excerpt from a surface form dictionary in the format `SF_DICT[mention][entity] = count`, where `count` refers to the number of times `mention` was linked to `entity` in a given training corpus.
The total count of linked occurrences of a mention is given as the key `_total` (i.e., this is the number of times mention was linked to any entity in the training corpus).
Note that not all linked entities are listed in the dictionary, hence the counts do not necessarily sum up to `_total`.

In [None]:
SF_DICT = {
    "1992 elections": {
        "wikipedia:Philippine_general_election,_1992": 9,
        "wikipedia:Angolan_presidential_election,_1992": 1,
        "_total": 98
    },
    "angola": {
        "wikipedia:Angola": 4026,
        "wikipedia:Angola_(Portugal)": 6,
        "wikipedia:Angola_national_football_team": 120,
        "_total": 4298
    },
    "democracy": {
        "wikipedia:Democracy": 108,
        "wikipedia:Democracy_(album)": 3,
        "_total": 2162
    },
    "multiparty democracy": {
        "wikipedia:multiparty_democracy": 11,
        "_total": 11
    },
    "one party": {
        "wikipedia:Non-possessors": 1,
        "wikipedia:Single-party_state": 5,
        "_total": 983
    }
}

In [None]:
TEXT = (
    "Angola changed from a one-party Marxist-Leninist system "
    "ruled by the MPLA to a formal multiparty democracy "
    "following the 1992 elections"
).lower()

We perform mention detection based on the following heuristic:

  - At each term position
    - Start with the longest possible n-gram (n = 8). 
    - If the n-gram is found in the dictionary, the mention and the corresponding entities are kept (and the shorter n-grams are ignored). Otherwise, we try to match the (n-1)-grams. 
    - Repeat until a mention is found or n reaches 0.

In [None]:
def detect_mentions(text, sf_dict):
    """Performs mention detection in text against a given surface form dictionary.
    
    Args:
        text: Input text.
        sf_dict: Surface form dictionary.
    
    Returns:
        List of matches as `(pos, mention, entity)` tuples ordered by pos, mention, and entity.
        (Term positions are indexed from 0.)
    """
    matches = []
    tokens = text.split()
    for pos, term in enumerate(tokens):
        n = max(8, len(tokens) - pos)
        while n > 0:
            # Check for matching n-gram
            n_gram = " ".join(tokens[pos:pos+n])
            if n_gram in sf_dict:
                for entity in sorted(sf_dict[n_gram].keys()):
                    if entity != "_total":
                        matches.append((pos, n_gram, entity))
                break
            n -= 1
    return matches

Tests.

In [None]:
%%run_pytest[clean]

def test_detect_mentions():
    assert detect_mentions(TEXT, SF_DICT) == [
        (0, "angola", "wikipedia:Angola"),
        (0, "angola", "wikipedia:Angola_(Portugal)"),
        (0, "angola", "wikipedia:Angola_national_football_team"),
        (14, "multiparty democracy", "wikipedia:multiparty_democracy"),
        (15, "democracy", "wikipedia:Democracy"),
        (15, "democracy", "wikipedia:Democracy_(album)"),
        (18, "1992 elections", "wikipedia:Angolan_presidential_election,_1992"),
        (18, "1992 elections", "wikipedia:Philippine_general_election,_1992")
    ]

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


[32m.[0m[32m                                                                                            [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.02s[0m[0m


## 2) Entity ranking

Entity ranking is based on the commonness score:

$$Commonness(e, m) = p(e|m) = \frac{n(m, e)}{\sum_{e'} n(m, e')}$$

where $n(m, e)$ denotes the number of times entity $e$ is the link target of mention $m$.

In [None]:
def commonness(entity, mention, sf_dict):
    """Computes the commonness for an entity-mention pair given a surface form dictionary.
    
    Args:
        entity: Entity.
        mention: Mention.
        sf_dict: Surface form dictionary (containing entity-mention count statistics).
        
    Returns:
        Commonness (float).    
    """
    if mention not in sf_dict:
        return None
    return sf_dict[mention].get(entity, 0) / sf_dict[mention]["_total"]

In [None]:
def rank_entities(mentions, sf_dict, k=5):
    """Ranks candidate entities for each mention based on commonness and retains 
    the top-k highest-scoring entities for each mention.
    
    Args:
        mentions: Detected mentions (list of `(mention, entity, pos)` tuples).
        sf_dict: Surface form dictionary.
        k: Number of top-scoring entities to keep for each mention.
        
    Returns:
        Candidate entities with scores for each mention. Each mention is 
        represented as a dict `{'mention': xxx, 'pos': yyy, 'entities': zzz`,
        where entities is a list of `(entity, score)` tuples ordered by score desc.
    """
    # Reorganize input for more convenient processing.
    entities_per_mention = {}
    for (pos, mention, entity) in mentions:
        key = "{}::{}".format(pos, mention)
        if key not in entities_per_mention:
            entities_per_mention[key] = []
        entities_per_mention[key].append(entity)
    
    # Score all candidate entities for each mention.
    mentions_entities = []
    for key, entities in entities_per_mention.items():
        pos, mention = key.split("::")
        entity_scores = sorted([(entity, commonness(entity, mention, sf_dict))
                               for entity in entities], key=lambda x: x[1], reverse=True)
        mentions_entities.append({
            "mention": mention,
            "pos": int(pos),
            "entities": entity_scores[:k]
        })
    return mentions_entities

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("mention,entity,correct_value", [
    ("1992 elections", "wikipedia:Philippine_general_election,_1992", 9/98),
    ("1992 elections", "wikipedia:Angolan_presidential_election,_1992", 1/98),
    ("angola", "wikipedia:Angola", 4026 / 4298),
    ("angola", "wikipedia:Angola_national_football_team", 120 / 4298),
    ("democracy", "wikipedia:Democracy", 108/2162),
    ("democracy", "wikipedia:Democracy_(album)", 3/2162),
    ("multiparty democracy", "wikipedia:multiparty_democracy", 1)    
])
def test_commonness(entity, mention, correct_value):    
    assert commonness(entity, mention, SF_DICT) == pytest.approx(correct_value, rel=1e-3)
    
def test_rank_entities():
    mentions = detect_mentions(TEXT, SF_DICT)
    ranked_entities = rank_entities(mentions, SF_DICT, k=2)
    assert ranked_entities[0] == {"mention": "angola",
                                  "pos": 0,
                                  "entities": [
                                      ("wikipedia:Angola", 0.9367147510469986),
                                      ("wikipedia:Angola_national_football_team", 0.02791996277338297)
                                  ]
                                 }
    assert ranked_entities[1] == {"mention": "multiparty democracy",
                                  "pos": 14,
                                  "entities": [("wikipedia:multiparty_democracy", 1.0)]}

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                     [100%][0m
[32m[32m[1m8 passed[0m[32m in 0.07s[0m[0m


%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


## 3) Disambiguation

Perform disambiguation by simply returning the entity for each mention with the highest score and only if it is above the given threshold.

In case of containment or overlapping mentions, keep only the one with the higher score.

In [None]:
def disambiguate(ranked_entities, threshold=0.1):
    """Disambiguates entities for each mention by keeping only the highest-scoring one.
    
    Args:
        ranked_entities: List of mentions along with a ranked list of candidate entities.
        threshold: Score threshold
    
    Returns:
        Entity annotations as a list of `(pos, mention, entity)` tuples.
    """
    # For each term position, we keep track of the highest scoring entity
    # that is linked to a mention in that position. We can greedily replace 
    # the annotations in case a higher scoring one is found.
    annotations = {}
    for candidates in ranked_entities:
        (entity, score) = candidates["entities"][0]
        if score < threshold:
            continue
        start_pos = candidates["pos"]
        mention = candidates["mention"]
        mention_length = len(mention.split())
        # Add mention-entity annotation if all term positions are
        # empty or are lower scoring.
        add_annotation = True
        for pos in range(start_pos, start_pos + mention_length):
            if pos in annotations:
                if annotations[pos]["score"] > score:
                    add_annotation = False
        
        if add_annotation:
            # For each term position, check if there is an existing 
            # annotation to be replaced.
            for pos in range(start_pos, start_pos + mention_length):
                if pos in annotations:
                    print("Replace on ", pos)
                    # Replace existing annotation.
                    start_pos_old = annotatations[pos]["start_pos"]
                    mention_length_old = annotatations[pos]["mention_length"]
                    for i in range(start_pos_old, start_pos_old + mention_length_old):
                        del annotations[i]
                
                # Store new annotation.
                annotations[pos] = {
                    "score": score,
                    "entity": entity,
                    "mention": mention,
                    "start_pos": start_pos,
                    "mention_length": mention_length
                }

    # Converting output to desired format.
    linked_entities = []
    for pos, annotation in sorted(annotations.items()):
        if pos == annotation["start_pos"]:
            linked_entities.append((pos, annotation["mention"], annotation["entity"]))
    return linked_entities

Tests.

In [None]:
%%run_pytest[clean]

def test_disambiguate():
    mentions = detect_mentions(TEXT, SF_DICT)
    ranked_entities = rank_entities(mentions, SF_DICT)
    linked_entities = disambiguate(ranked_entities, threshold=0.01)
    assert linked_entities == [
        (0, "angola", "wikipedia:Angola"),
        (14, "multiparty democracy", "wikipedia:multiparty_democracy"),
        (18, "1992 elections", "wikipedia:Philippine_general_election,_1992")
    ]

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


[32m.[0m[32m                                                                                            [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.02s[0m[0m


# Entity linking evaluation
You are provided with the documents annotations along with ground truth annotations and asked to evaluate them.

In [None]:
pip install ipytest

In [None]:
import ipytest
import pytest

ipytest.autoconfig()

The annotations given by a entity linking system under evaluation.

In [None]:
LINKED_ENTITIES_1 = [ 
    (0, 'angola', 'wikipedia:Angola'),
    (14, 'multiparty democracy', 'wikipedia:multiparty_democracy'),
    (18, '1992 elections', 'wikipedia:Philippine_general_election,_1992')
]

LINKED_ENTITIES_2 = [
    (5, 'angola', 'wikipedia:Angola'),
    (10, '1975', 'wikipedia:Philippine_general_election,_1992'),
    (13, 'one party', 'wikipedia:Single-party_state')
]

Ground truth annotations (reference annotations).

In [None]:
GROUND_TRUTH_ANNOTATIONS_1 = [ 
    (0, 'angola', 'wikipedia:Angola'),
    (4, 'one-party', 'wikipedia:Single-party_state'),
    (14, 'multiparty democracy', 'wikipedia:multiparty_democracy'),
    (18, '1992 elections', 'wikipedia:Philippine_general_election,_1992')
]

GROUND_TRUTH_ANNOTATIONS_2 = [
    (5, 'angola', 'wikipedia:Angola'),
    (13, 'one party', 'wikipedia:Single-party_state'),
    (14, 'Republic', 'wikipedia:Republic')
]

Set-based metrics where:
- precision is defined as the fraction of correctly linked entities that have been annotated by the system
- recall is defined as fraction of correctly linked entities that should be annotated 
- F-measure is a harmonic mean between precision and recall

In [None]:
def set_based_precision(annotations, relevance_annotations):
  """Computes set-based precision.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      Set-based precision.    
  """
  return len(set(annotations).intersection(relevance_annotations))/len(annotations)

In [None]:
def set_based_recall(annotations, relevance_annotations):
  """Computes set-based recall.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      Set-based recall.    
  """
  return len(set(annotations).intersection(relevance_annotations))/len(relevance_annotations)

In [None]:
def f1_score(precision, recall):
  """Computes F-measure.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      F-measure.    
  """
  return 2 * precision * recall / (precision + recall)

## Metrics over the collection of documents

Micro-averaged - averaged across mentions

In [None]:
import itertools 

def micro_precision(annotations, ground_truth_annotations):
  """Computes micro-averaged precision.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      Micro-averaged precision.    
  """
  all_annotations = list(itertools.chain(*annotations))
  all_ground_truth_annotations = list(itertools.chain(*ground_truth_annotations))
  return set_based_precision(all_annotations, all_ground_truth_annotations)

In [None]:
def micro_recall(all_annotations, ground_truth_annotations):
  """Computes micro-averaged recall.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      Micro-averaged recall.    
  """
  all_annotations = list(itertools.chain(*all_annotations))
  all_ground_truth_annotations = list(itertools.chain(*ground_truth_annotations))
  return set_based_recall(all_annotations, all_ground_truth_annotations)

Tests

In [None]:
%%run_pytest[clean]

def test_micro_precision():
  assert micro_precision([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2]) == pytest.approx(5/6, rel=1e-2)

def test_micro_recall():
  assert micro_recall([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2]) == pytest.approx(5/7, rel=1e-2)

def test_micro_f1():
  micro_p = micro_precision([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2])
  micro_r = micro_recall([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2])
  assert f1_score(micro_p, micro_r) == pytest.approx((2 * 5/6 * 5/7) / (5/6 + 5/7), rel=1e-2)

Macro-averaged - averaged across documents

In [None]:
def macro_precision(annotations, ground_truth_annotations):
  """Computes macro-averaged precision.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      Macro-averaged precision.    
  """
  return sum(set_based_precision(annotation, ground_truth) for annotation, ground_truth 
             in zip(annotations, ground_truth_annotations))/len(ground_truth_annotations)

In [None]:
def macro_recall(annotations, ground_truth_annotations):
  """Computes macro-averaged recall.
  
  Args:
      annotations: All annotations for a set of documents.
      relevance_annotations: All reference (ground truth) annotations for a set of documents.
      
  Returns:
      Macro-averaged recall.    
  """
  return sum(set_based_recall(annotation, ground_truth) for annotation, ground_truth 
             in zip(annotations, ground_truth_annotations))/len(ground_truth_annotations)

Tests

In [None]:
%%run_pytest[clean]

def test_macro_precision():
  assert macro_precision([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2]) == pytest.approx((1 + 2/3)/2, rel=1e-2)

def test_macro_recall():
  assert macro_recall([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2]) == pytest.approx((3/4 + 2/3)/2, rel=1e-2)

def test_macro_f1():
  macro_p = macro_precision([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2])
  macro_r = macro_recall([LINKED_ENTITIES_1, LINKED_ENTITIES_2], [GROUND_TRUTH_ANNOTATIONS_1, GROUND_TRUTH_ANNOTATIONS_2])
  assert f1_score(macro_p, macro_r) == pytest.approx((2 * 5/6 * 17/24) / (5/6 + 17/24), rel=1e-2)

# Entity linking incorporated retrieval (ELR)

In this exercise you will implement the entity matches feature function:  
$$		      	f_{\mathcal{E}}(e_i; e) = \log \sum_{f \in \mathcal{\tilde{F}}} w_{f}^{\mathcal{E}} \left( (1- \lambda )\, \mathbb{1}(e_i , f_{\tilde{e}}) + \lambda\, \frac{\sum_{e' \in \mathcal{E}} \mathbb{1}(e_i,f_{\tilde{e}'})}{|\{e' \in \mathcal{E} : f_{\tilde{e}'} \neq \emptyset\}|} \right) 
$$

In [None]:
!pip install ipytest

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ipytest
  Downloading ipytest-0.12.0-py3-none-any.whl (15 kB)
Collecting pytest>=5.4
  Downloading pytest-7.1.3-py3-none-any.whl (298 kB)
[K     |████████████████████████████████| 298 kB 20.4 MB/s 
Collecting iniconfig
  Downloading iniconfig-1.1.1-py2.py3-none-any.whl (5.0 kB)
Collecting pluggy<2.0,>=0.12
  Downloading pluggy-1.0.0-py2.py3-none-any.whl (13 kB)
Collecting jedi>=0.10
  Downloading jedi-0.18.1-py2.py3-none-any.whl (1.6 MB)
[K     |████████████████████████████████| 1.6 MB 16.7 MB/s 
Installing collected packages: pluggy, jedi, iniconfig, pytest, ipytest
  Attempting uninstall: pluggy
    Found existing installation: pluggy 0.7.1
    Uninstalling pluggy-0.7.1:
      Successfully uninstalled pluggy-0.7.1
  Attempting uninstall: pytest
    Found existing installation: pytest 3.6.4
    Uninstalling pytest-3.6.4:
      Successfully uninstalled pytest-3.6.4
Successful

In [None]:
import ipytest
import math
import pytest
from typing import Dict, List, Tuple

ipytest.autoconfig()

Term-based representations. These representations are only given to provide some context for a better understanding of the entity-based representations.


In [None]:
TERM_BASED_REPS = [{
    "label": "Ann Dunham",
     "abstract": """Stanley Ann Dunham the mother Barack Obama, was an American
        anthropologist who ...""",
     "birthPlace": "Honolulu Hawaii ...",
     "child": "Barack Obama",
     "wikiPageWikiLink": "United States Family Barack Obama",
     },
     {
    "label": "Michael Jackson",
     "abstract": """Michael Joseph Jackson (August 29, 1958 – June 25, 2009) 
        was an American singer, songwriter, and dancer. Dubbed the "King of 
        Pop", he is regarded as one of the most significant cultural figures 
        of the 20th century. Over a four-decade career, his contributions to 
        music, dance, and fashion...""",
     "birthPlace": "Gary Indiana",
     "wikiPageWikiLink": "35th_Annual_Grammy_Awards, A._R._Rahman, ...",
}]

Entity-based representations


In [None]:
ENTITY_BASED_REPS = [{
    "birthPlace": ["<Honolulu>", "<Hawaii>"],
    "child": ["<Barack_Obama>"],
    "wikiPageWikiLink": ["<United_States>", "<Family_of_Barack_Obama>"],
    },
    {
    "birthPlace": ["<Gary_Indiana>"],
    "wikiPageWikiLink": ["<35th_Annual_Grammy_Awards>", "<A._R._Rahman>"],
}]

Field weights

In [None]:
FIELD_WEIGHTS = {
    "birthPlace": 0.4,
    "child": 0.4,
    "wikiPageWikiLink": 0.2,
}

Query

In [None]:
QUERY = ("barack obama parents", ["<Barack_Obama>"])

## Entity matches feature function

$$		      	f_{\mathcal{E}}(e_i; e) = \log \sum_{f \in \mathcal{\tilde{F}}} w_{f}^{\mathcal{E}} \left( (1- \lambda )\, \mathbb{1}(e_i , f_{\tilde{e}}) + \lambda\, \frac{\sum_{e' \in \mathcal{E}} \mathbb{1}(e_i,f_{\tilde{e}'})}{|\{e' \in \mathcal{E} : f_{\tilde{e}'} \neq \emptyset\}|} \right) 
$$

First, we implement the binary indicator function:
$$\mathbb{1}(e_i , f_{\tilde{e}})$$

In [None]:
def binary_indicator_function(entity: str, field_uris: List[str]) -> int:
  """Indicates whether or not the entity is present in the field

  Args: 
    entity: URI string.
    field_uris: List of URI string in field.

  Returns:
    1 if entity is in the field, 0 otherwise.
  """
  return 1 if entity in field_uris else 0

Then, we implement a function to get document frequencies.

$$df_{e,f} = \sum_{e' \in \mathcal{E}} \mathbb{1}(e_i,f_{\tilde{e}'})$$

$$df_f = |\{e' \in \mathcal{E} : f_{\tilde{e}'} \neq \emptyset\}|$$

In [None]:
def get_document_frequencies(f: str, entity: str, entity_based_reps: List[Dict]) -> Tuple[int, int]:
  """Computes document frequencies for entity matches feature score.

  df_e_f is the total number of documents that contain the entity e in field f.
  df_f is the number of documents with a non-empty field f.

  Args:
    f: Field.
    entity: URI string.
    entity_based_reps: All entity-based representations.
    
  Returns:
    Tuple with df_e_f and df_f.
  """
  df_e_f, df_f = 0, 0
  for e in entity_based_reps: 
    if f in e.keys():
      df_f += 1
      if entity in e[f]:
        df_e_f += 1

  return df_e_f, df_f

Based on the two previous functions, we implement the entity matches feature score.

$$		      	f_{\mathcal{E}}(e_i; e) = \log \sum_{f \in \mathcal{\tilde{F}}} w_{f}^{\mathcal{E}} \left( (1- \lambda )\, \mathbb{1}(e_i , f_{\tilde{e}}) + \lambda\, \frac{\sum_{e' \in \mathcal{E}} \mathbb{1}(e_i,f_{\tilde{e}'})}{|\{e' \in \mathcal{E} : f_{\tilde{e}'} \neq \emptyset\}|} \right) 
$$

In [None]:
def compute_entity_matches_feature(entity:str, entity_based_rep:Dict, entity_based_reps:List[Dict], field_weights: Dict[str,float], smoothing_param:float=0.1) -> float:
  """Computes entity matches feature score for an entity.
  
  Args:
    entity: URI string.
    entity_based_rep: Entity-based representation.
    entity_based_reps: All entity-based representations.
    field_weights: Field weights may be set manually or via dynamic mapping 
      using PRMS.
    smoothing_param: Smoothing parameter.Defaults to 0.1.
  Returns:
    Entity matches feature score.
  """
  sum = 0
  for f, w_f_e in field_weights.items():
    e_presence = binary_indicator_function(entity, entity_based_rep[f]) if f in entity_based_rep else 0
    df_e_f, df_f = get_document_frequencies(f, entity, entity_based_reps)
    sum += w_f_e * ((1 - smoothing_param) * e_presence + smoothing_param * df_e_f / df_f)
  return math.log(sum)

Tests

In [None]:
%%run_pytest[clean]

def test_binary_indicator_function():
  assert 1 == binary_indicator_function("<Honolulu>", ["<Honolulu>", "<Hawaii>"])
  assert 0 == binary_indicator_function("<Honolulu>", ["<Gary_Indiana>"])

def test_get_document_frequencies():
  assert (1, 1) == get_document_frequencies("child", QUERY[1][0], ENTITY_BASED_REPS)
  assert (0, 2) == get_document_frequencies("birthPlace", QUERY[1][0], ENTITY_BASED_REPS)

def test_compute_entity_matches_feature():
  assert pytest.approx(math.log(0.4), rel=1e-2) == compute_entity_matches_feature(QUERY[1][0], ENTITY_BASED_REPS[0], ENTITY_BASED_REPS, FIELD_WEIGHTS)
  assert pytest.approx(math.log(0.04), rel=1e-2) == compute_entity_matches_feature(QUERY[1][0], ENTITY_BASED_REPS[1], ENTITY_BASED_REPS, FIELD_WEIGHTS)


[32m.[0m[32m.[0m[32m.[0m[32m                                                                                          [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.02s[0m[0m


%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


# Target Entity Type Identification Evaluation

In this exercise, you'll need to implement lenient evaluation measures for the target entity type identification task.

As a reminder, _target entity type identification_ is the task of finding the target types of a given input query, from a type taxonomy, such that these types correspond to most specific types of entities that are relevant to the query.  Target types cannot lie on the same branch in the taxonomy.

Our final measure is normalized discounted cumulative gain (NDCG), but we need to compute the gain values of answer types based on their distance from ground truth types in the type taxonomy.

In [None]:
pip install ipytest

In [None]:
!wget --output-document="dbpedia_types.tsv" "https://raw.githubusercontent.com/iai-group/ir-course-2022/main/resources/dbpedia_types.tsv"

In [None]:
import ipytest
import math
import operator
import pytest
from typing import Callable, List, Optional, Set

ipytest.autoconfig()

## Type taxonomy

We use the DBpedia Ontology as our type taxonomy. It is given to you in a preprocessed format in `dbpedia_types.tsv`, where each line corresponds to a type, and the tab-separated columns, respectively, are: type identifier, depth in the hierarchy, and parent type.

In [None]:
class TypeTaxonomy:
    
    ROOT = "owl:Thing"
    
    def __init__(self, tsv_filename: str) -> None:
        """Initializes the type taxonomy by loading it from a TSV file.
        
        Args:
            tsv_filename: Name of TSV file, with type_id, depth, and parent_id columns.
        """
        self._types = {self.ROOT: {"parent": None, "depth": 0}}
        self._max_depth = 0
        with open(tsv_filename, "r") as tsv_file:
            next(tsv_file)  # Skip header row
            for line in tsv_file:
                fields = line.rstrip().split("\t")
                type_id, depth, parent_type = fields[0], int(fields[1]), fields[2]
                self._types[type_id] = {"parent": parent_type, "depth": depth}
                self._max_depth = max(depth, self._max_depth)
                
        # Once all types have been read in, we also populate each type with a list
        # of its children for convenience (if the taxonomy is to be traversed
        # downwards not just upwards).
        for type_id in self._types:
            if type_id == self.ROOT:
                continue
            parent_type = self._types[type_id]["parent"]            
            if "children" not in self._types[parent_type]:
                self._types[parent_type]["children"] = set()
            self._types[parent_type]["children"].add(type_id)
                        
    def max_depth(self) -> int:
        """Returns the maximum depth of the type taxonomy."""
        return self._max_depth
    
    def is_root(self, type_id: str) -> bool:
        """Returns true if the type is the root of the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            True if root.
        """
        return type_id == self.ROOT
    
    def depth(self, type_id: str) -> int:
        """Returns the depth of a type in the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            The depth of the type in the hierarchy (0 for root).
        """
        return self._types.get(type_id, {}).get("depth")

    def parent(self, type_id: str) -> Optional[str]:
        """Returns the parent type of a type in the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            Parent type ID, or None if the input type is root.
        """
        return self._types.get(type_id, {}).get("parent")

    def children(self, type_id: str) -> Set[str]:
        """Returns the set of children types of a type in the taxonomy.
        
        Args:
            type_id: Type ID.
            
        Returns:
            Set of type IDs (empty set if leaf type).
        """
        return self._types.get(type_id, {}).get("children", set())
    
    def dist(self, type_id1: str, type_id2: str) -> float:
        """Computes the distance between two types in the taxonomy.
        
        Args:
            type_id1: ID of first type.
            type_id2: ID of second type.
            
        Returns:
            The distance between the two types in the type taxonomy, which is
            the number of steps between them if they lie on the same branch,
            and otherwise `math.inf`.
        """
        # Find which type has higher depth and set if to type_a; the other is type_b.        
        type_a, type_b = (type_id2, type_id1) if self.depth(type_id1) < self.depth(type_id2) \
                         else (type_id1, type_id2)
        dist = self.depth(type_a) - self.depth(type_b)
        
        # If they lie on the same branch, then when traversing type_a for dist steps
        # would make us end up with type_b; otherwise, they're not on the same branch.
        for _ in range(dist):
            type_a = self.parent(type_a)
        
        return dist if type_a == type_b else math.inf

Tests.

In [None]:
%%run_pytest[clean]

@pytest.fixture
def dbpedia_types():
    return TypeTaxonomy("dbpedia_types.tsv")

def test_max_depth(dbpedia_types):
    assert dbpedia_types.max_depth() == 7

@pytest.mark.parametrize("type_id,depth", [
    ("owl:Thing", 0),
    ("dbo:Agent", 1),
    ("dbo:SportFacility", 4),
    ("dbo:RaceTrack", 5)
])
def test_depth(dbpedia_types, type_id, depth):
    assert dbpedia_types.depth(type_id) == depth
    
@pytest.mark.parametrize("type_id,parent", [
    ("owl:Thing", None),
    ("dbo:Agent", "owl:Thing"),
    ("dbo:SportFacility", "dbo:ArchitecturalStructure"),
    ("dbo:RaceTrack", "dbo:SportFacility")
])
def test_depth(dbpedia_types, type_id, parent):
    assert dbpedia_types.parent(type_id) == parent

@pytest.mark.parametrize("type_id1,type_id2,distance", [
    ("dbo:Agent", "dbo:Agent", 0),  # same type
    ("dbo:Agent", "dbo:Person", 1),  # type2 is more specific
    ("dbo:Artist", "dbo:Agent", 2),  # type2 is more generic
    ("dbo:Artist", "dbo:Broadcaster", math.inf)  # different branch
])  
def test_distance(dbpedia_types, type_id1, type_id2, distance):
    assert dbpedia_types.dist(type_id1, type_id2) == distance

## Computing gain values

For simplicity, refer to this global variable in the gain computations.

In [None]:
type_taxonomy = TypeTaxonomy("dbpedia_types.tsv")

When defined in a _linear_ fashion, the gain of a type is computed as:

$$r(y) = \max_{\hat{y} \in \hat{\mathcal{T}}_q} \big( 1 - \frac{d(y,\hat{y})}{h} \big)$$

where $\hat{\mathcal{T}}_q$ is the set of ground truth types, $\hat{y}$ is a ground truth type, $y$ is an answer type, $d(y, \hat{y})$ is the distance between types in the taxonomy, and $h$ is the maximum depth of the type taxonomy.

In [None]:
def gain_linear(gt_types: Set[str], answer_type_id: str) -> float:
    """Computes the gain of an answer type in a linear fashion.
    
    Args:
        gt_types: Set of ground truth type IDs.
        answer_type_id: Answer type ID.
    
    Returns:
        Gain value.
    """
    # Note: if the distance between two types is inf, we set the linear gain to 0.
    return max([
        1 - type_taxonomy.dist(gt_type_id, answer_type_id) / type_taxonomy.max_depth() 
        if type_taxonomy.dist(gt_type_id, answer_type_id) < math.inf else 0
        for gt_type_id in gt_types
    ])

Alternatively, the gain of an answer type can be defined using an _exponential_ decay function:

$$r(y) = \max_{\hat{y} \in \hat{\mathcal{T}}_q} \big ( b^{-d(y,\hat{y})} \big )$$

where $b$ is the base of the exponential function (here: $b=2$).

In [None]:
def gain_exponential(gt_types: Set[str], answer_type_id: str) -> float:
    """Computes the gain of an answer type using exponential decay.
    
    Args:
        gt_types: Set of ground truth type IDs.
        answer_type_id: Answer type ID.
    
    Returns:
        Gain value.
    """
    # Note: if the distance between two types is inf, we set the exponential gain to 0.
    return max([
        2**(-type_taxonomy.dist(gt_type_id, answer_type_id))
        if type_taxonomy.dist(gt_type_id, answer_type_id) < math.inf else 0
        for gt_type_id in gt_types
    ])

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize("gt_types,answer_type_id,gain", [
    (["dbo:Agent"], "dbo:Agent", 1),  # same type
    (["dbo:Agent"], "dbo:Person", 1-1/7),  # type2 is more specific
    (["dbo:Artist"], "dbo:Agent", 1-2/7),  # type2 is more generic
    (["dbo:NationalSoccerClub"], "dbo:Organisation", 1-3/7),  # type2 is more generic
    (["dbo:Artist"], "dbo:Broadcaster", 0),  # different branch
    (["dbo:DisneyCharacter"], "dbo:MythologicalFigure", 0),  # sibling categories
])
def test_gain_linear(gt_types, answer_type_id, gain):
    assert gain_linear(gt_types, answer_type_id) == pytest.approx(gain)
    
@pytest.mark.parametrize("gt_types,answer_type_id,gain", [
    (["dbo:Agent"], "dbo:Agent", 1),  # same type
    (["dbo:Agent"], "dbo:Person", 1/2),  # type2 is more specific
    (["dbo:Artist"], "dbo:Agent", 1/4),  # type2 is more generic
    (["dbo:NationalSoccerClub"], "dbo:Organisation", 1/8),  # type2 is more generic
    (["dbo:Artist"], "dbo:Broadcaster", 0),  # different branch
    (["dbo:DisneyCharacter"], "dbo:MythologicalFigure", 0),  # sibling categories
])
def test_gain_exponential(gt_types, answer_type_id, gain):
    assert gain_exponential(gt_types, answer_type_id) == pytest.approx(gain)

## Putting everything together

Plug the gain values computed using either linear or exponential into the NDCG computation to get a final evaluation score.

The DCG and NDCG computation parts are given. The only part that needs completing is the construction of the ideal ranking.

In [None]:
def get_ideal_ranking(gt_types: Set[str]) -> List[str]:
    """Generates an ideal ranking corresponding to a set of ground truth types.
    
    Args:
        gt_types: Set of ground truth types.
    
    Returns:
        A ranked list of types that constitute an ideal ranking gain-wise.
    """
    gains = {}
    
    for gt_type in gt_types:
        # Ground truth type has max gain.
        gains[gt_type] = type_taxonomy.max_depth()
        
        # Traverse upwards to add parent types.
        parent_type = type_taxonomy.parent(gt_type)
        gain = type_taxonomy.max_depth() - 1
        while not type_taxonomy.is_root(parent_type):
            gains[parent_type] = max(gains.get(parent_type, 0), gain)
            gain -= 1
            parent_type = type_taxonomy.parent(parent_type)

        # Traverse downwards to add children types.
        children_types = type_taxonomy.children(gt_type)
        gain = type_taxonomy.max_depth() - 1
        while len(children_types) > 0:
            grandchildren_types = set()
            for t in children_types:
                gains[t] = max(gains.get(t, 0), gain)
                grandchildren_types.update(type_taxonomy.children(t))
            gain -= 1
            children_types = grandchildren_types
    
    # Return types ordered by gain desc.
    return [k for k, v in sorted(gains.items(), key=operator.itemgetter(1), reverse=True)]    

In [None]:
def dcg(relevances: List[float], k: int) -> float:
    """Computes DCG@k, given the corresponding relevance levels for a ranked list of types.
    
    Args:
        relevances: List with the relevance levels corresponding to a ranked list of types.
        k: Rank cut-off.
        
    Returns:
        DCG@k (float).
    """
    return relevances[0] + sum(
        [relevances[i] / math.log(i + 1, 2) 
         for i in range(1, min(k, len(relevances)))]
    )

In [None]:
def ndcg(system_ranking: List[str], gt_types: Set[str], gain_function: Callable, k:int = 10) -> float:
    """Computes NDCG@k for a given system ranking.
    
    Args:
        system_ranking: Ranked list of answer type IDs (from most to least relevant).
        gt_types: Set of ground truth types.
        gain_function: Function for computing the gain of an answer type.
        k: Rank cut-off.
    
    Returns:
        NDCG@k (float).
    """
    # Relevance (gain) levels for the ranked docs.
    relevances = [gain_function(gt_types, type_id) for type_id in system_ranking]

    # Relevance levels (gains) of the idealized ranking.
    relevances_ideal = [gain_function(gt_types, type_id) 
                        for type_id in get_ideal_ranking(gt_types)]
    
    return dcg(relevances, k) / dcg(relevances_ideal, k)        

Tests.

In [None]:
%%run_pytest[clean]

def test_ideal_ranking_single_gt():
    ideal_rankings = get_ideal_ranking({"dbo:Person"})
    # Ideal ranking starts with ground truth type.
    assert "dbo:Person" in ideal_rankings 
    assert ideal_rankings.index("dbo:Person") == 0
    # Types that are not parent or children types are not present.
    assert "dbo:Organisation" not in ideal_rankings
    # Parent types are present and ranked lower than GT type.
    assert "dbo:Agent" in ideal_rankings 
    assert ideal_rankings.index("dbo:Agent") > 0
    # Children types are present and ranked lower than GT type.    
    assert "dbo:Politician" in ideal_rankings 
    assert ideal_rankings.index("dbo:Politician") > 0
    assert "dbo:President" in ideal_rankings
    # Relative ranking of children types is correct.
    assert ideal_rankings.index("dbo:President") > ideal_rankings.index("dbo:Politician")    
    # Relative ranking of parent vs. children types is correct.
    assert ideal_rankings.index("dbo:Agent") < ideal_rankings.index("dbo:President")    

def test_ideal_ranking_multi_gt():
    # Ground truth types are subtypes of person but on different branches
    # and at different depths:
    # Skier -> WinterSportPlayer -> Athlete -> Person
    # DisneyCharacter -> FictionalCharacter -> Person
    ideal_rankings = get_ideal_ranking({"dbo:Skier", "dbo:DisneyCharacter"})
    # Ground truth type at the top of the ideal ranking in any order
    assert set(ideal_rankings[:2]) == {"dbo:Skier", "dbo:DisneyCharacter"}
    # Path tho Skier.
    assert "dbo:WinterSportPlayer" in ideal_rankings 
    assert "dbo:Athlete" in ideal_rankings 
    assert "dbo:Person" in ideal_rankings 
    assert "dbo:Agent" in ideal_rankings 
    assert ideal_rankings.index("dbo:WinterSportPlayer") < ideal_rankings.index("dbo:Athlete")
    assert ideal_rankings.index("dbo:Person") < ideal_rankings.index("dbo:Agent")
    # Path to DisneyCharacter.
    assert "dbo:FictionalCharacter" in ideal_rankings 
    assert ideal_rankings.index("dbo:FictionalCharacter") < ideal_rankings.index("dbo:Person")
    assert ideal_rankings.index("dbo:FictionalCharacter") < ideal_rankings.index("dbo:Athlete")    
    # Sibling types to ground truth types.
    assert "dbo:Ski_jumper" not in ideal_rankings
    assert "dbo:SoapCharacter" not in ideal_rankings
    # Types on other branches.
    assert "dbo:Gymnast" not in ideal_rankings
    assert "dbo:ShoppingMall" not in ideal_rankings


def test_ndcg():
    # Perfect ranking.
    assert ndcg(["dbo:Person", "dbo:Agent", "dbo:Politician"], 
                {"dbo:Person"}, gain_linear, k=3) == 1.0
    assert ndcg(["dbo:Organisation", "dbo:Agent", "dbo:Politician"], 
                {"dbo:Person"}, gain_linear, k=3) == pytest.approx(0.583, rel=1e-3)
    assert ndcg(["dbo:Athlete", "dbo:Agent", "dbo:FictionalCharacter"], 
                {"dbo:Skier", "dbo:DisneyCharacter"}, 
                gain_linear, k=3) == pytest.approx(0.719, rel=1e-3)
    assert ndcg(["dbo:Athlete", "dbo:Agent", "dbo:DisneyCharacter"], 
                {"dbo:Skier", "dbo:DisneyCharacter"}, 
                gain_linear, k=3) == pytest.approx(0.754, rel=1e-3)    

# Exploring Word2Vec with Gensim

## Overview

Word2Vec is an approach to learning *word embeddings*, vector representations of words that capture semantic and syntactic relationships between words based on their co-occurrences in natural language text. 

This unsupervised learning approach also reduces the dimensionality of the vectors representing words, which can be helpful for memory and to manage the *curse of dimensionality*, whereby high-dimensional vector spaces lead to a relative data sparsity, e.g., for machine learning. 

In this exercise you will look at the capabilities of Word2Vec as implemented in the module Gensim. 

## Requirements 

Uncomment the lines below, run the installations once as needed, then comment the code out again.

In [None]:
pip install ipytest

In [None]:
# !pip install --upgrade pip
# !pip install --upgrade Cython
# !pip install --upgrade gensim

Import all necessary libraries. 

In [None]:
# Import modules and set up logging.
from typing import List, Generator
import gensim.downloader as api
from gensim.models import Word2Vec
import logging
import numpy as np
import os

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

import ipytest
import pytest

ipytest.autoconfig()

## Download data

In [None]:
# Load the Text8 corpus.
print(api.info('text8'))
text8_corpus = api.load('text8')

{'num_records': 1701, 'record_format': 'list of str (tokens)', 'file_size': 33182058, 'reader_code': 'https://github.com/RaRe-Technologies/gensim-data/releases/download/text8/__init__.py', 'license': 'not found', 'description': 'First 100,000,000 bytes of plain text from Wikipedia. Used for testing purposes; see wiki-english-* for proper full Wikipedia datasets.', 'checksum': '68799af40b6bda07dfa47a32612e5364', 'file_name': 'text8.gz', 'read_more': ['http://mattmahoney.net/dc/textdata.html'], 'parts': 1}


## Train a model

In [None]:
# Train a Word2Vec model on the Text8 corpus with default hyperparameters. 
model = Word2Vec(text8_corpus)  

# Perform a sanity check on the trained model.
print(model.wv.similarity('tree', 'leaf')) 

0.68252116


In [None]:
# Reduce logging level.
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.WARNING)

In [None]:
print(model.wv.most_similar('tree')) 
print(model.wv.most_similar('leaf')) 

[('leaf', 0.6825212240219116), ('trees', 0.6816262006759644), ('bark', 0.6624914407730103), ('avl', 0.6079550981521606), ('fruit', 0.6029431819915771), ('flower', 0.6008095741271973), ('cactus', 0.6005957126617432), ('grass', 0.5823603868484497), ('bird', 0.5789984464645386), ('pond', 0.5772603750228882)]
[('bark', 0.7890558838844299), ('pigment', 0.7656276822090149), ('colored', 0.7648026347160339), ('reddish', 0.7575005888938904), ('coloured', 0.7569463849067688), ('grass', 0.7538576126098633), ('yellowish', 0.752110481262207), ('beetle', 0.7462701797485352), ('aloe', 0.7384286522865295), ('fleshy', 0.73822021484375)]


## Relationships

Investigate the relationships between words in terms of trained representations. 

### Evaluate  analogies
With the model you have trained, evaluate the analogy
`king-man+woman =~ queen`

In [None]:
print(model.wv.most_similar(positive=['king', 'woman'], negative=['man'], topn=5))

[('queen', 0.6803272366523743), ('empress', 0.6377588510513306), ('daughter', 0.6349191665649414), ('emperor', 0.6257573366165161), ('prince', 0.6172590255737305)]


Evaluate the analogy `ship-boat+rocket =~ spacecraft`. How similar are the left-hand side of the analogy to the right-hand side? Implement a function that can find the answer for analogies in general. We assume the right-hand side of the analogy will always be a single, positive term. 

In [None]:
def eval_analogy(model: Word2Vec, lhs_pos: List[str], lhs_neg: List[str], rhs: str)->float:
    """Returns the similarity between the left-hand and right-hand sides of an anaology.
    
        Arguments: 
            model: Trained Gensim word2vec model to use.
            lhs_pos: List of terms that are positive on the left-hand side in the analogy. 
            lhs_neg: List of terms that are negative on the left-hand side in the analogy. 
            rhs: A single positive term on the right-hand side in the analogy.
            
        Returns:
            Float of the similarity if right-hand side term is found in the top 500 most similar terms.
            Otherwise, return None."""
    # How similar are the left-hand side of the analogy to the right-hand side? 
    # Implement a function that can find the answer for analogies in general.
    # TODO: Complete.
    similarities_list = model.most_similar(positive=lhs_pos, negative=lhs_neg, topn=500)
    similarities_dict = {}
    for term, sim in similarities_list:
        similarities_dict[term] = sim
    if rhs in similarities_dict:
        return similarities_dict[rhs]
    else:
        print("Right-hand side term not found in top 500 most similar terms to the left-hand side analogy.")
        None

Test:

In [None]:
%%run_pytest[clean]

def test_eval_analogy():
    assert eval_analogy(model.wv, ['ship', 'rocket'], ['boat'], 'spacecraft') == pytest.approx(0.7, abs=1e-1)

[32m.[0m[32m                                                                                            [100%][0m

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).



[32m[32m[1m1 passed[0m[32m in 0.03s[0m[0m


## Load a pre-trained model

In [None]:
import gensim.downloader as api
model_loaded = api.load('word2vec-google-news-300')



In [None]:
loaded_analogy_eval = -1
# Evaluate the analogy 'king'-'man'+'woman' compared to 'queen' using the loaded model 
# and assign the value to the variable `loaded_analogy_eval`.
# TODO: Complete.
loaded_analogy_eval = eval_analogy(model_loaded, ['king', 'woman'], ['man'], 'queen')

In [None]:
%%run_pytest[clean]

def test_loaded_analogy_eval():
    assert loaded_analogy_eval != -1
    assert loaded_analogy_eval == pytest.approx(0.7, abs=1e-1)

%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


[32m.[0m[32m                                                                                            [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.01s[0m[0m


## Train Word2Vec on different corpora

In [None]:
# Download the rap lyrics of Kanye West.
! wget https://raw.githubusercontent.com/gsurma/text_predictor/master/data/kanye/input.txt
! mv input.txt kanye.txt

# Download the complete works of William Shakespeare.
! wget https://raw.githubusercontent.com/gsurma/text_predictor/master/data/shakespeare/input.txt
! mv input.txt shakespeare.txt

--2022-09-27 18:42:21--  https://raw.githubusercontent.com/gsurma/text_predictor/master/data/kanye/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 330453 (323K) [text/plain]
Saving to: ‘input.txt’


2022-09-27 18:42:22 (9.42 MB/s) - ‘input.txt’ saved [330453/330453]

--2022-09-27 18:42:22--  https://raw.githubusercontent.com/gsurma/text_predictor/master/data/shakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4573338 (4.4M) [text/plain]
Saving to: ‘input.txt’


2022-09-27 18:42:23 (58.1 MB/s) - ‘input.t

In [None]:
from gensim.test.utils import datapath
from gensim import utils

class MyCorpus:
    """An interator that yields sentences (lists of str)."""
    def __init__(self, data: str) -> None:
        self.data = data

    def __iter__(self) -> Generator[List[str], None, None]:
        corpus_path = datapath(self.data)
        for line in open(corpus_path):
            # assume there's one document per line, tokens separated by whitespace
            yield utils.simple_preprocess(line)

Separately train two new models using the two different datasets, and compare how these datasets affect relationships between 

In [None]:
kanye_data = MyCorpus(os.getcwd()+'/kanye.txt')
shakespeare_data = MyCorpus(os.getcwd()+'/shakespeare.txt')

In [None]:
kanye_model = None
# Train a Word2Vec model on the Kanye corpus, and name it `kanye_model`.
# TODO: Complete
kanye_model = Word2Vec(sentences=kanye_data)

In [None]:
shakespeare_model = None
# Train a Word2Vec model on the Shakespeare corpus, and name it `shakespeare_model`.
# TODO: Complete
shakespeare_model = Word2Vec(sentences=shakespeare_data)

For each of the models, we can easily find words where the two models learn very different similarities.

In [None]:
# For example, compare:
print(kanye_model.wv.most_similar(positive=['king'], topn=5))
print(shakespeare_model.wv.most_similar(positive=['king'], topn=5))

[('big', 0.9997344017028809), ('his', 0.9997167587280273), ('by', 0.9996941089630127), ('might', 0.9996739625930786), ('ooh', 0.9996687769889832)]
[('prince', 0.8771281838417053), ('duke', 0.7768160104751587), ('fifth', 0.6678986549377441), ('gaunt', 0.664115846157074), ('queen', 0.6587375402450562)]


## Compare Skip-gram and CBOW

By using the arguments of the model (training) method in `gensim.models.Word2Vec()` you can select either Skip-gram or CBOW explicitly, as well as modifying other hyperparameters. 

Train a Skip-gram model on the Text8 corpus and compare with the default CBOW model on the same dataset, with the same context window size, and compare how relationships are expressed in terms of the resulting embedding vectors.

**Hint:** Use the keyword argument `sg` in when instantiating the model object to specify Skip-gram, rather than the defaul CBOW setting.

In [None]:
model_sg = None
# Train a skip-gram Word2Vec model on `text8_corpus` and name it `model_sg``
# TODO: Complete
model_sg = Word2Vec(text8_corpus, sg=1)

In [None]:
loaded_analogy_eval_sg = eval_analogy(model_sg, ['king', 'woman'], ['man'], 'queen')

  app.launch_new_instance()


In [None]:
loaded_analogy_eval_cbow = eval_analogy(model, ['king', 'woman'], ['man'], 'queen')

  app.launch_new_instance()


**Discuss:** Which of the models produces the highest similarity for the example analogy? Will this always be the case? Why or why not?

For more information about Gensim, see https://radimrehurek.com/gensim.

# Dense retrieval with ANCE

We will use a [PyTerrier plugin for ANCE](https://github.com/terrierteam/pyterrier_ance) for dense passage retrieval. 

[ANCE](https://github.com/microsoft/ANCE) is a dense retrieval system leveraging single representations to encode documents and queries. ANCE does not require combination with sparse retrieval. ANCE leverages a training mechanism that constructs negatives from an Approximate Nearest Neighbor (ANN) index of the corpus, which is parallelly updated with the learning process to select more realistic negative training instances than the negative training instances selected by a sparse retrieval mechanism.

The experiments are run on [CORD19 corpus](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7251955/) and [TREC Covid test collection](https://ir.nist.gov/covidSubmit/).

This is exercise is based on the [example](https://colab.research.google.com/github/terrierteam/pyterrier_ance/blob/master/pyterrier_ance_vaswani.ipynb) provided by the PyTerrier team and [CIKM 2021 Tutorial Notebook](https://notebooks.githubusercontent.com/view/ipynb?browser=chrome&color_mode=auto&commit=451eb743b6a9202f20fde3ac85dbe6ad00103506&device=unknown&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f746572726965722d6f72672f63696b6d323032317475746f7269616c2f343531656237343362366139323032663230666465336163383564626536616430303130333530362f6e6f7465626f6f6b732f6e6f7465626f6f6b345f322e6970796e62&logged_in=false&nwo=terrier-org%2Fcikm2021tutorial&path=notebooks%2Fnotebook4_2.ipynb&platform=android&repository_id=416829271&repository_type=Repository&version=96).

In [None]:
%%capture
!pip install -q python-terrier
!apt install -q --upgrade libomp-dev
!pip install -q --upgrade faiss-gpu==1.6.3
!pip install -q git+https://github.com/terrierteam/pyterrier_ance.git
!pip install -q ipytest

In [None]:
%%capture
import pyterrier as pt
if not pt.started():
  pt.init(tqdm='notebook')

In [None]:
%%capture
# Collecting the topics and qrels of the TREC-COVID19 dataset
cord19 = pt.datasets.get_dataset('irds:cord19/trec-covid')
topics = cord19.get_topics(variant='title')
qrels = cord19.get_qrels()

### BM25 inverted index

We use a pre-built Terrier inverted index for the TREC-COVID19 collection.

In [None]:
%%capture
bm25_index = pt.datasets.get_dataset('trec-covid').get_index('terrier_stemmed')

### ANCE dense index


We download a pre-built ANCE FAISS index for the TREC-COVID19 collection. The indexing procedure generates a number of FAISS shards, together with some additional files.


In [None]:
%%capture
ance_index = pt.datasets.get_dataset('trec-covid').get_index('ance_msmarco_psg')
!ls /content/anceindex

### Retrieval

We create BM25 baseline transformer, and the ANCE retrieve transformer. Since most documents exceed the maximum length supported by ANCE, a sliding window of 150 tokens is used (stride 75, prepending title) to construct passages. As such, passage scores need to be aggregated, e.g., using pt.text.max_passage().



In [None]:
%%capture
from pyterrier_ance import ANCERetrieval

bm25_retriever = pt.BatchRetrieve(bm25_index, wmodel="BM25")
ance_retriever = ANCERetrieval.from_dataset('trec-covid', 'ance_msmarco_psg') >> pt.text.max_passage()

We retrieve the top 50 ranked documents for the official topics, and compute several effectiveness metrics.

In [None]:
pt.Experiment(
    [bm25_retriever % 50, ance_retriever % 50], 
    topics,
    qrels,
    eval_metrics=["map", "recip_rank", "P_10", "ndcg_cut_3", "recall_100"],
    names=['BM25', 'ANCE'],
)

***** inference of 50 queries *****


Inferencing: 0it [00:00, ?it/s]

Not running in distributed mode


Inferencing: 1it [00:09, 10.00s/it]

***** faiss search for 50 queries on 1 shards *****





  0%|          | 0/1 [00:00<?, ?shard/s]

Unnamed: 0,name,map,recip_rank,P_10,ndcg_cut_3,recall_100
0,BM25,0.04827,0.807094,0.678,0.631732,0.062774
1,ANCE,0.024658,0.656,0.452,0.479889,0.037527


The underperforming results computed our ANCE retriever are due to the lack of fine-tuning of the underlying BERT-based model with COVID19 and medical-related documents.

### Retrieval

In [None]:
%%capture
def show_res_with_text_labels(retriever, qid):
    """Displays the texts of the retrieved documents.
    
    Args:
        retriever: The retriever to be used to retreive documents.
        qid: Query ID.
        
    Returns:
        Retrieved documents with the text.    
    """
    def make_doi_url(df):
      df["doi"] = df["doi"].apply(lambda doi: "https://doi.org/" + doi)
      return df
    pipe = (retriever % 10) >> pt.text.get_text(cord19, ["title", "doi"]) >> pt.apply.generic(make_doi_url)
    res = pipe.transform(topics[topics.qid == qid])
    res = res.merge(qrels, how='left')
    def make_clickable(val):
        return '<a target="_blank" href="{}">{}</a>'.format(val, val)
    res = res.sort_values("rank", ascending=True)
    res.style.format({'doi': make_clickable})
    return res

In [None]:
show_res_with_text_labels(bm25_retriever, "1")

Unnamed: 0,qid,docid,docno,rank,score,query,title,doi,label,iteration
0,1,122553,75773gwg,0,11.558188,coronavirus origin,Zoonotic origins of human coronavirus 2019 (HC...,https://doi.org/,2,5.0
1,1,122554,kn2z7lho,1,11.558188,coronavirus origin,Zoonotic origins of human coronavirus 2019 (HC...,https://doi.org/,2,3.0
2,1,122555,4fb291hq,2,11.558188,coronavirus origin,Zoonotic origins of human coronavirus 2019 (HC...,https://doi.org/,1,3.0
3,1,135022,ne5r4d4b,3,11.558188,coronavirus origin,Origin and evolution of pathogenic coronaviruses,https://doi.org/10.1038/s41579-018-0118-9,0,1.5
4,1,186652,hl967ekh,4,11.558188,coronavirus origin,Zoonotic origins of human coronavirus 2019 (HC...,https://doi.org/10.24272/j.issn.2095-8137.2020...,2,3.0
5,1,120776,kqqantwg,5,11.417061,coronavirus origin,Possible Bat Origin of Severe Acute Respirator...,https://doi.org/,2,5.0
6,1,158983,12dcftwt,6,11.417061,coronavirus origin,Possible Bat Origin of Severe Acute Respirator...,https://doi.org/10.3201/eid2607.200092,2,5.0
7,1,81979,8ccl9aui,7,11.324364,coronavirus origin,Mosaic evolution of the severe acute respirato...,https://doi.org/,2,1.0
8,1,68472,4dtk1kyh,8,11.255136,coronavirus origin,Origin of Novel Coronavirus (COVID-19): A Comp...,https://doi.org/10.1101/2020.05.12.091397,2,3.0
9,1,104060,pl48ev5o,9,11.230726,coronavirus origin,Origin and evolution of the 2019 novel coronav...,https://doi.org/,1,4.0


In [None]:
show_res_with_text_labels(ance_retriever, "1")

***** inference of 1 queries *****


Inferencing: 0it [00:00, ?it/s]

Not running in distributed mode


Inferencing: 1it [00:00,  3.43it/s]

***** faiss search for 1 queries on 1 shards *****





  0%|          | 0/1 [00:00<?, ?shard/s]

Unnamed: 0,qid,query,score,docno,rank,title,doi,label,iteration
6,1,coronavirus origin,715.294006,j1cdoxqs,0,Coronavirus,https://doi.org/,,
3,1,coronavirus origin,714.861938,be0mr85h,1,Coronavirus.,https://doi.org/10.1177/0025817220933546,,
2,1,coronavirus origin,714.552856,9pla28n4,2,Coronaviruses: origin and evolution,https://doi.org/,,
7,1,coronavirus origin,714.552856,jkejiuf2,3,Coronaviruses: origin and evolution,https://doi.org/,2.0,3.0
4,1,coronavirus origin,714.37854,bp9xz9wk,4,Coronavirus?,https://doi.org/,,
8,1,coronavirus origin,714.107727,l0sbncnp,5,Exploration on mechanism of anti-coronavirus o...,https://doi.org/10.7501/j.issn.0253-2670.2020....,,
5,1,coronavirus origin,714.046753,hmvo5b0q,6,Understanding Coronavirus,https://doi.org/,1.0,5.0
1,1,coronavirus origin,713.970276,8l411r1w,7,"Discovery of a novel coronavirus, China Rattus...",https://doi.org/10.1128/jvi.02420-14,0.0,1.0
9,1,coronavirus origin,713.963501,utsr0zv7,8,The Human Coronavirus Disease COVID-19: Its Or...,https://doi.org/10.3390/pathogens9050331,1.0,3.0
0,1,coronavirus origin,713.77887,7mfedn03,9,Coronavirus Infections,https://doi.org/10.1016/b978-1-4160-2406-4.500...,,
