In this notebook we will index the french dump version of the wikipedia dataset, and use with the retrieval for our qa/system

In [40]:
import numpy as np
from haystack.utils import clean_wiki_text, convert_files_to_docs, fetch_archive_from_http, print_answers

In [2]:
from datasets import load_dataset

In [3]:
wiki_corpus = load_dataset("wikipedia", "20220301.fr", split="train")



In [4]:
wiki_corpus = wiki_corpus.shuffle(seed=42)



In [5]:
sample_wiki = wiki_corpus.shard(100, index=0)

In [6]:
sample_doc_dict = sample_wiki[1]

In [7]:
sample_doc_dict

{'id': '12576878',
 'url': 'https://fr.wikipedia.org/wiki/Teinture%20naturelle',
 'title': 'Teinture naturelle',
 'text': "Une teinture naturelle est une matière colorante dérivée de plantes ou d'invertébrés. On parle de teinture quand il s'agit de colorer des textiles au moyen de produits solubles dans l'eau ; les minéraux donnent des colorants insolubles appelés pigments qui s'appliquent mal à cet usage.\n\nLa plupart des teintures naturelles proviennent de racines, de baies, d'écorce, de feuilles ou de bois de plantes tinctoriales ou bien de champignons ou de lichens.\n\nHistoire \n\nLe concept de teinture naturelle se développe par opposition aux teintures issues de la chimie organique dans la deuxième moitié du . Il oppose aux produits de l'industrie chimique, les teintures que des artisans peuvent extraire de plantes, de lichens, d'insectes. Auparavant, on ne considérait comme naturelles que les quelques substances utilisables sans transformation, tandis que la plupart des teintu

In [10]:
from haystack.schema import Document

In [11]:
docs = Document.from_dict(sample_doc_dict, field_map={"text": "content"})

#### Building the Elastic Search Index

In [12]:
from haystack.utils import clean_wiki_text, convert_files_to_docs

In [39]:
from haystack.errors import HaystackError
from haystack.schema import Document
from typing import List, Optional, Generator, Set, Union, Callable, Dict
from copy import deepcopy
from haystack.nodes import PreProcessor
import re

In [14]:
from gensim.utils import deaccent

def remove_accents(document):
    input_without_accent = deaccent(document)
    return input_without_accent

In [15]:
async def convert_wiki_article_to_docs(
    item: dict,
    clean_func: Optional[Callable] = None,
    split_paragraphs: bool = False,
) -> List[Document]:
    """
    item

    :param items: dict of items
    :param clean_func: a custom cleaning function that gets applied to each doc (input: str, output:str)
    :param split_paragraphs: split text in paragraphs.
    """
    documents = []
    processed = 0
    text = item.get("text")
    text = remove_accents(text)
    if clean_func:
        text = clean_func(text)
    if split_paragraphs:
        for para in text.split("\n"):
            if 200 <= len(para.strip()) <= 2000:
                # just pick the paragraph with length between 50 and 1500
                processed += 1
                doc = Document(content=para, meta={"title": item.get("title")}, id=item.get("id"))
                documents.append(doc)
            else:
                continue
    else:
        processed += 1
        doc = Document(content=text, meta={"title": item.get("title")}, id=item.get("id"))
        documents.append(doc)
    
    return documents

In [16]:
import asyncio

#### Saving the Document in the retriever

In [17]:
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from haystack.document_stores import ElasticsearchDocumentStore


In [18]:
document_store = ElasticsearchDocumentStore(index="fr-wikipedia", recreate_index=True, analyzer="french")

INFO - haystack.document_stores.elasticsearch -  Index 'fr-wikipedia' deleted.
INFO - haystack.document_stores.elasticsearch -  Index 'label' deleted.


In [76]:
%%script false --no-raise-error
all_docs = []
for i in range(0, 100):
    shard = wiki_corpus.shard(100, index=i)
    with tqdm(total=shard.shape[0]) as pbar:
        docs_in_shard = tqdm_asyncio.gather(*[convert_wiki_article_to_docs(item, clean_func=clean_wiki_text, split_paragraphs=True) for item in shard])
        all_docs.append(docs_in_shard)
    print("done with shard ", i)

In [None]:
from  functools import reduce
from operator import iconcat

In [77]:
%%script false --no-raise-error
with tqdm(total=len(all_docs)) as pbar:
     scan_results = await tqdm_asyncio.gather(*all_docs[90:100])

In [78]:
%%script false --no-raise-error
scan_results = reduce(iconcat, scan_results, [])
scan_results = reduce(iconcat, scan_results, [])

With our document indexted int the elastic searh we can search , use the piaf dataset which have question with answers without paragraph and leverage them.

In [67]:
from collections import deque

In [68]:
from elasticsearch.helpers import bulk, parallel_bulk, scan
def write_documents_parallel(
        self,
        documents: Union[List[dict], List[Document]],
        index: Optional[str] = None,
        batch_size: int = 10_000,
        duplicate_documents: Optional[str] = None,
        headers: Optional[Dict[str, str]] = None,
    ):
        """
        Indexes documents for later queries in Elasticsearch./ an update for the index that use parralell bluk

        Behaviour if a document with the same ID already exists in ElasticSearch:
        a) (Default) Throw Elastic's standard error message for duplicate IDs.
        b) If `self.update_existing_documents=True` for DocumentStore: Overwrite existing documents.
        (This is only relevant if you pass your own ID when initializing a `Document`.
        If don't set custom IDs for your Documents or just pass a list of dictionaries here,
        they will automatically get UUIDs assigned. See the `Document` class for details)

        :param documents: a list of Python dictionaries or a list of Haystack Document objects.
                          For documents as dictionaries, the format is {"content": "<the-actual-text>"}.
                          Optionally: Include meta data via {"content": "<the-actual-text>",
                          "meta":{"name": "<some-document-name>, "author": "somebody", ...}}
                          It can be used for filtering and is accessible in the responses of the Finder.
                          Advanced: If you are using your own Elasticsearch mapping, the key names in the dictionary
                          should be changed to what you have set for self.content_field and self.name_field.
        :param index: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used.
        :param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time.
        :param duplicate_documents: Handle duplicates document based on parameter options.
                                    Parameter options : ( 'skip','overwrite','fail')
                                    skip: Ignore the duplicates documents
                                    overwrite: Update any existing documents with the same ID when adding documents.
                                    fail: an error is raised if the document ID of the document being added already
                                    exists.
        :param headers: Custom HTTP headers to pass to elasticsearch client (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='})
                Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information.
        :raises DuplicateDocumentError: Exception trigger on duplicate document
        :return: None
        """

        if index and not self.client.indices.exists(index=index, headers=headers):
            self._create_document_index(index, headers=headers)

        if index is None:
            index = self.index
        duplicate_documents = duplicate_documents or self.duplicate_documents
        assert (
            duplicate_documents in self.duplicate_documents_options
        ), f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}"

        field_map = self._create_document_field_map()
        document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents]
        document_objects = self._handle_duplicate_documents(
            documents=document_objects, index=index, duplicate_documents=duplicate_documents, headers=headers
        )
        documents_to_index = []
        for doc in tqdm(document_objects):
            _doc = {
                "_op_type": "index" if duplicate_documents == "overwrite" else "create",
                "_index": index,
                **doc.to_dict(field_map=self._create_document_field_map()),
            }  # type: Dict[str, Any]

            # cast embedding type as ES cannot deal with np.array
            if _doc[self.embedding_field] is not None:
                if type(_doc[self.embedding_field]) == np.ndarray:
                    _doc[self.embedding_field] = _doc[self.embedding_field].tolist()

            # rename id for elastic
            _doc["_id"] = str(_doc.pop("id"))

            # don't index query score and empty fields
            _ = _doc.pop("score", None)
            _doc = {k: v for k, v in _doc.items() if v is not None}

            # In order to have a flat structure in elastic + similar behaviour to the other DocumentStores,
            # we "unnest" all value within "meta"
            if "meta" in _doc.keys():
                for k, v in _doc["meta"].items():
                    _doc[k] = v
                _doc.pop("meta")
            documents_to_index.append(_doc)

            # Pass batch_size number of documents to bulk
            if len(documents_to_index) % batch_size == 0:
                pb_ = parallel_bulk(self.client, 
                              documents_to_index, 
                              chunk_size=10000, 
                              thread_count=8, 
                              queue_size=8,
                              refresh=self.refresh_type, 
                              headers=headers)
                deque(pb_, maxlen=0)
                documents_to_index = []

        if documents_to_index:
            pb_= parallel_bulk(self.client, 
                          documents_to_index, 
                          chunk_size=10000, 
                          thread_count=8, 
                          queue_size=8,
                          refresh=self.refresh_type, 
                          headers=headers)
            deque(pb_, maxlen=0)

In [43]:
document_store.write_documents_parallel = write_documents_parallel

In [79]:
%%script false --no-raise-error
write_documents_parallel(document_store, scan_results)

In [81]:
from haystack.nodes import BM25Retriever

In [82]:
bm25_retriever = BM25Retriever(document_store=document_store, all_terms_must_match=False)

In [83]:
import pandas as pd

In [84]:
from pathlib import Path
DATA_PATH = Path.cwd().joinpath("data")
assert DATA_PATH.exists(), "the data path does not exist"

In [85]:
piaf_file = DATA_PATH.joinpath("corpus", "raw", "piaf", "questoin-reponse.csv")

In [86]:
assert piaf_file.exists(), "the piaf dataset does not exist"

piaf_question = data

In [87]:
piaf_df_without_context = pd.read_csv(piaf_file)

In [88]:
sample_question_response = piaf_df_without_context.sample(1)
question = deaccent(sample_question_response.question.values[0])
response = deaccent(sample_question_response.reponse.values[0])


In [89]:
question

'Quels plats de viandes epicees sont prepares a partir de ces ingredients ?'

In [90]:
def get_positive_context(retriever: BM25Retriever, search_query:str, answer:str, positive_documents: int = 100) -> List[Document]:    
        """given entitity retrieve the positive context
        we will first retrieve the top  100 documents , 
        - if the answer is in the top 40 document the input of the reader is the top 40 documents
        if the top 40 documents does not contain the answer we check whithin the top 41 to 100 document if the anwer is ther and we put it ther.
        other wise we discard the sentence

        Args:
            retriever (BM25Retriever): _description_
            n_ctxs (int, optional): _description_. Defaults to 15.
            entity (Entity, optional): _description_. Defaults to None.
        """
        list_pos_ctxs = []
        retrieved_docs = retriever.retrieve(query=search_query, top_k=positive_documents)
        for index, retrieve_doc in enumerate(retrieved_docs[0:40]):
            if answer.lower() in retrieve_doc.content.lower():
                list_pos_ctxs.append(
                    {"title": retrieve_doc.meta.get("title"), "content": retrieve_doc.content}
                )
        if len(list_pos_ctxs) == 0:
            for index, retrieve_doc in enumerate(retrieved_docs[40:100]):
                if answer.lower() in retrieve_doc.content.lower():
                    list_pos_ctxs.append(
                        {"title": retrieve_doc.meta.get("title"), "content": retrieve_doc.content}
                    )
        else:
            pass
        if len(list_pos_ctxs) == 0:
            return []
        return list_pos_ctxs

#### Use Piaf Dataset to query context

In [91]:
piaf_dataset = load_dataset("piaf")



  0%|          | 0/1 [00:00<?, ?it/s]

In [186]:
piaf_dataset = piaf_dataset["train"]

In [92]:
for index in tqdm(piaf_df_without_context.index):
    question = piaf_df_without_context.loc[index].question
    response = piaf_df_without_context.loc[index].reponse
    question = deaccent(question)
    response = deaccent(response)
    retrieved_docs = get_positive_context(retriever=bm25_retriever, search_query=question, answer=response, positive_documents=100)
    piaf_df_without_context.loc[index, "positive_context"] = retrieved_docs



By using our wiki corpus we are able to find some question with positive context, we will leverage them to build our qa system

In [93]:
piaf_with_context = piaf_df_without_context.loc[piaf_df_without_context.positive_context.apply(lambda x: len(x)) > 0]

In [94]:
piaf_with_context.shape

(2459, 3)

with our dataset , we can find that {{piaf_with_context.shape[0]}} have positive context and that will be usefull to fine tune our model.

In [95]:
piaf_with_context = piaf_with_context.assign(length_positive_context =piaf_with_context["positive_context"].apply(lambda x: len(x)))

In [96]:
piaf_with_context.loc[piaf_with_context.length_positive_context > 1]

Unnamed: 0,question,reponse,positive_context,length_positive_context
6,Quel créancier du roi fut supprimé en 1312 ?,l'ordre du Temple,"[{'title': 'Commanderie de Bisham', 'content': 'Quand l'ordre du Temple fut ...",2
7,Quel créancier du roi fut supprimé en 1312 ?,l'ordre du Temple,"[{'title': 'Commanderie de Bisham', 'content': 'Quand l'ordre du Temple fut ...",2
29,Quel roi a poursuivi l'unification du royaume initié par Louis VI ?,Louis VII,"[{'title': 'Chennevières-lès-Louvres', 'content': 'L'abbe Suger, ne selon Ch...",2
89,Combien de gradient de rouge permet la palette de couleurs garanties ?,6,"[{'title': 'Truecolor', 'content': 'En mode truecolor la couleur d'un point ...",4
99,Combien de villages sont situés sur la côte des blancs ?,quatre,"[{'title': 'Le Tour (Chamonix-Mont-Blanc)', 'content': 'Le Tour est un villa...",2
...,...,...,...,...
11796,Le président tunisien est élu selon quel mode de scrutin ?,suffrage universel,"[{'title': 'Président de Sierra Leone', 'content': 'Le president de la repub...",6
11797,Par qui Valérien fut-il vaincu ?,les Perses,"[{'title': 'Cyriadès', 'content': 'En 259-260 dans l'anarchie qui suit la de...",2
11800,Qui fut le père de Gallien ?,Valérien,"[{'title': 'Musée Art et Histoire', 'content': ' Un portrait realiste de l'e...",3
11801,Qui succède à Valérien ?,Gallien,"[{'title': 'Valérien II', 'content': 'Valerien II mourut en ete 258, de rais...",4


within our dataset , 740 row have more than one context.