
# Ranking explanation

In this demonstration, we explain a ranking based on the query term scores for each ranked document.  
A similar idea is used in this [paper](https://cs.brown.edu/research/pubs/theses/ugrad/2019/ramos.jerome.pdf) to create an explainable search engine.


In [None]:
!pip install ipytest

In [None]:
!pip install jupyter-dash

In [None]:
import math
from dataclasses import dataclass, field
from typing import Dict, List

import ipytest
import matplotlib.pyplot as plt
import pytest
import plotly.graph_objs as go

ipytest.autoconfig()

Class to represent retrieved documents.

In [None]:
@dataclass
class ScoredDocument:
    """Representation of a retrieved document. It contains doc_id, ranking score,
    and query terms scores."""

    doc_id: str
    score: float = 0
    query_terms_scores: Dict[str, float] = field(default_factory=dict)

Term-document matrix.

In [None]:
TD_MATRIX_TYPE = Dict[str, List[int]]
DOCUMENT_SCORES_TYPE = List[ScoredDocument]
TD_MATRIX = {
    "beijing": [0, 1, 0, 0, 1],
    "dish": [0, 1, 0, 0, 1],
    "duck": [3, 2, 2, 0, 1],
    "rabbit": [0, 0, 1, 1, 0],
    "recipe": [0, 0, 1, 1, 1],
}

## TF-IDF scorer
*Note*: re-use code from E3-5.

Implement the scoring function 

$$score(d,q) = \sum_{t \in q} tf_{t,q} \times tf_{t,d} \times idf_t$$

Use normalized frequencies for TF weight, i.e., $tf_{t,d}=\frac{c_{t,d}}{|d|}$, where $c_{t,d}$ is the number of occurrences of term $t$ in document $d$ and $|d|$ is the document length (=total number of terms). (Analogously for the query.)

Compute IDF values using the following formula: $idf_{t}=\log \frac{N}{n_t}$, where $N$ is the total number of documents and $n_t$ is the number of documents that contain term $t$.  Use base-10 the logarithm.

In [None]:
class TFIDFScorer:
    def __init__(self, td_matrix: TD_MATRIX_TYPE) -> None:
        """Initializes the TF-IDF scorer. Document lengths and IDF values
        are precomputed.

        Args:
            td_matrix: Dictionary of "term: term count" pairs.
        """
        self._td_matrix = td_matrix
        self._num_docs = len(list(td_matrix.values()))
        self._query_terms = None

        # Pre-compute the length of documents for more efficient scoring.
        self._doc_len = {}
        for doc_id in range(self._num_docs):
            self._doc_len[doc_id] = sum(
                self._td_matrix[term][doc_id] for term in self._td_matrix.keys()
            )
        # Pre-compute IDF values.
        self._idf = {}
        for term, freqs in self._td_matrix.items():
            nt = sum(1 if f > 0 else 0 for f in freqs)
            self._idf[term] = math.log10(self._num_docs / nt)

    def _parse_query(self, query: str) -> None:
        """Parses the input query to a sequence of vocabulary terms and stores
        it in a member variable.
        """
        self._query_terms = [term for term in query.split() if term in self._td_matrix]

    def score_documents(self, query: str) -> DOCUMENT_SCORES_TYPE:
        """Score all documents in the collection.

        Params:
            query: Query string.

        Returns:
            List of ScoredDocuments ordered by score descending, then by doc ID ascending.
        """
        scores = {doc_id: 0 for doc_id in range(self._num_docs)}
        query_terms_scores = {doc_id: {} for doc_id in range(self._num_docs)}
        self._parse_query(query)

        for term in set(self._query_terms):
            for doc_id in range(self._num_docs):
                query_term_score = self._get_doc_term_weight(
                    doc_id, term
                ) * self._get_query_term_weight(term)
                scores[doc_id] += query_term_score
                query_terms_scores[doc_id][term] = query_term_score

        return [
            ScoredDocument(doc_id, score, query_terms_scores[doc_id])
            for doc_id, score in sorted(
                scores.items(), key=lambda x: (x[1], -x[0]), reverse=True
            )
        ]

    def _get_query_term_weight(self, term: str) -> float:
        return self._query_terms.count(term) / len(self._query_terms)

    def _get_doc_term_weight(self, doc_id: int, term: str) -> float:
        return self._td_matrix[term][doc_id] / self._doc_len[doc_id] * self._idf[term]

Display detailed score for ranked document.

In [None]:
def explain_ranking(ranking: DOCUMENT_SCORES_TYPE) -> None:
    """

    Args:
        ranking: List of ScoredDocument.
    """
    fig = go.Figure()

    for term in ranking[0].query_terms_scores.keys():
      doc_ids= list()
      scores=list()
      for doc in ranking:
          doc_ids.append(doc.doc_id)
          scores.append(doc.query_terms_scores[term])
      fig.add_bar(x=doc_ids,
                  y=scores, name=term)


    
    fig.update_layout(barmode='stack',
                  title = 'Detailed score for ranked documents',
                  showlegend=True).show()


In [None]:
query = "beijing duck recipe"
scorer = TFIDFScorer(TD_MATRIX)
ranking = scorer.score_documents(query)
explain_ranking(ranking)

Tests.

In [None]:
%%run_pytest[clean]

@pytest.mark.parametrize(
    "td_matrix,query,correct_values",
    [
        (
            TD_MATRIX,
            "beijing",
            [
                ScoredDocument(1, 0.0995, {"beijing": 0.0995}),
                ScoredDocument(4, 0.0995, {"beijing": 0.0995}),
                ScoredDocument(0, 0, {"beijing": 0}),
                ScoredDocument(2, 0, {"beijing": 0.0}),
                ScoredDocument(3, 0, {"beijing": 0}),
            ],
        ),
        (
            TD_MATRIX,
            "duck duck",
            [
                ScoredDocument(0, 0.0969, {"duck": 0.0969}),
                ScoredDocument(1, 0.0485, {"duck": 0.0485}),
                ScoredDocument(2, 0.0485, {"duck": 0.0485}),
                ScoredDocument(4, 0.0242, {"duck": 0.0242}),
                ScoredDocument(3, 0, {"duck": 0}),
            ],
        ),
        (
            TD_MATRIX,
            "beijing duck recipe",
            [
                ScoredDocument(
                    4, 0.0597, {"beijing": 0.0332, "duck": 0.0081, "recipe": 0.0184}
                ),
                ScoredDocument(
                    1, 0.0493, {"beijing": 0.0332, "duck": 0.0161, "recipe": 0}
                ),
                ScoredDocument(3, 0.0369, {"beijing": 0, "duck": 0, "recipe": 0.0369}),
                ScoredDocument(
                    2, 0.0346, {"beijing": 0, "duck": 0.0161, "recipe": 0.0185}
                ),
                ScoredDocument(0, 0.0323, {"beijing": 0, "duck": 0.0323, "recipe": 0}),
            ],
        ),
    ],
)
def test_tfidf_scorer(
    td_matrix: TD_MATRIX_TYPE, query: str, correct_values: DOCUMENT_SCORES_TYPE
):
    scorer = TFIDFScorer(td_matrix)
    ranking = scorer.score_documents(query)
    assert [x.doc_id for x in ranking] == [
        x.doc_id for x in correct_values
    ]  # Checking ranking
    assert [x.score for x in ranking] == pytest.approx(
        [x.score for x in correct_values], rel=1e-2
    )  # Checking scores
    for query_term, score in ranking[0].query_terms_scores.items():
        # Checking query terms scores for top document
        assert ranking[0].query_terms_scores[query_term] == pytest.approx(
            correct_values[0].query_terms_scores[query_term], rel=1e-2
        )


%%run_pytest[clean] and %%run_pytest are deprecated in favor of %%ipytest. %%ipytest will clean tests, evaluate the cell and then run pytest. To disable cleaning, configure ipytest with ipytest.config(clean=False).


[32m.[0m[32m.[0m[32m.[0m[32m                                                                                          [100%][0m
[32m[32m[1m3 passed[0m[32m in 0.03s[0m[0m
