## Building LLM-powered pipelines to extract and process data with Haystack

In this notebook, we will show how to build a pipeline to extract and process data using Haystack by deepset.

For extraction, we will look into extracting and processing content from: 

* The internet 

* Files of different formats: PDF, txt, Markdown, JSON, CSV 

For cleaning and processing, we will focus on: 

* Removing certain characters and white space 

* Chunking and splitting text  

Once the data has been extracted and cleaned, we will store it into a Haystack document store.

### Extracting content from the internet

We will use the following components to extract content from the internet:

* `SerperDevWebSearch()` - this component will enable us to perform web searches using natural language queries.
* `LinkContentFetcher()` - this component will enable us to fetch content from the links returned by the web search.

In [None]:
!pip install --upgrade haystack-ai
!pip install markdown-it-py mdit_plain

Let's start by importing the appropriate modules and classes.

In [1]:
from haystack.components.websearch import SerperDevWebSearch
from haystack.components.fetchers import LinkContentFetcher

Let's initialize the `SerperDevWebSearch()` and `LinkContentFetcher()` components and use them to perform a web search and fetch content from the links returned by the web search. 

**Note** You will need to have a Serper API key to use the `SerperDevWebSearch()`component. You can get a free or paid API key by signing up at [https://serper.dev/](https://serper.dev/).

This notebook assumes you have a `.env` file in the root directory of this repository with the following content:

```bash
SERPER_API_KEY=your_api_key
OPENAI_API_KEY=your_api_key
```

In [2]:
from dotenv import load_dotenv
import os

load_dotenv("./../../.env")

open_ai_key = os.getenv("OPENAI_API_KEY")
serper_api_key = os.getenv("SERPERDEV_API_KEY")

Initialize the components.

For the `SerperDevWebSearch()` component, we will use the following parameters:

* `api_key` - by default this is set to `SERPERDEV_API_KEY` so as long as we have loaded it, we don't need to pass it explicitly
* `top_k` - the number of search results to return
* `allowed_domains` - a list of domains to restrict the search to
* `search_params` - a dictionary of search parameters to pass to the Serper API

For the `LinkContentFetcher()` component, we will use the following parameters:

* `retry_attempts` - the number of times to retry fetching content from a link
* `timeout` - the time to wait before retrying to fetch content from a link


Let's limit our search to five results from Wikipedia and Encyclopedia Britannica.

In [10]:
web_search = SerperDevWebSearch(top_k=5,
                                allowed_domains=["https://en.wikipedia.org/",
                                                 "https://www.britannica.com/"])
link_content = LinkContentFetcher(retry_attempts=3,
                                  timeout=10)

### Connecting the components

We will initialize the `Pipeline()` class, add the components and connect them.


In [11]:
from haystack.pipeline import Pipeline

# Initialize pipeline
pipeline = Pipeline()

# Add components
pipeline.add_component(name='search', instance=web_search)
pipeline.add_component(name ='fetcher' , instance= link_content)

# Connect components to one another
pipeline.connect("search.links", "fetcher.urls")

# Draw pipeline
pipeline.draw("./images/search_fetch_pipeline.png")



We can execute the pipeline as follows:

In [12]:
query = "What can you tell me about the year of the dragon?"
output = pipeline.run(data={"search":{"query":query}})

Let's take a look at the results:

In [13]:
output.keys()

dict_keys(['search', 'fetcher'])

Let's take a look at a few results. Due to the length of the response, we will only display the first 50 characters of the content.

In [14]:
for item in output["search"]["documents"]:
    print(item.content[0:100])
    print("Title: ", item.meta['title'], "URL: ", item.meta['link'])
    print("-------")

The dragon, also known as loong is the fifth of the 12-year cycle of animals that appear in the Chin
Title:  Dragon (zodiac) - Wikipedia URL:  https://en.wikipedia.org/wiki/Dragon_(zodiac)
-------
Dragon, Chenshi, 07:00 to 08:59, Dragons are hovering in the sky to give rain. Snake, Sishi, 09:00 t
Title:  Chinese zodiac - Wikipedia URL:  https://en.wikipedia.org/wiki/Chinese_zodiac
-------
The Chinese Dragon, also known as the loong, long or lung is a legendary creature in Chinese mytholo
Title:  Chinese dragon - Wikipedia URL:  https://en.wikipedia.org/wiki/Chinese_dragon
-------
Note: according to this website, Abraham Lincoln was born in the year of the Snake, which means Darw
Title:  Talk:Dragon (zodiac) - Wikipedia URL:  https://en.wikipedia.org/wiki/Talk%3ADragon_(zodiac)
-------
Feb. 12, 2024, 5:02 AM ET (Yahoo News). Year of the dragon, dance of the lion: NL Chinese Associatio
Title:  Dragon | Description, Mythical Dragons, Types, & Facts | Britannica URL:  https://www.britannic

Let's take a look at the scraped content from the first link. We will only show the first 50 characters.

In [15]:
output["fetcher"]['streams'][0].data[0:50]

b'<!DOCTYPE html>\n<html class="client-nojs vector-fe'

## Adding cleaning and splitting components

We will reinitialize the `Pipeline()` class and add new instances of each of the following components:

* `SerperDevWebSearch()` - to perform web searches using natural language queries
* `LinkContentFetcher()` - to fetch content from the links returned by the web search
* `HTMLToDocument()` - to convert the HTML content to a Haystack document
* `DocumentCleaner()` - to clean the content of the document
* `DocumentWriter()` - to write the document to a Haystack document store

In [None]:
from haystack.components.converters import HTMLToDocument
from haystack.components.preprocessors import DocumentCleaner
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter

# Initialize document store
document_store = InMemoryDocumentStore()

# Initialize components
web_search = SerperDevWebSearch(top_k=5, 
                                allowed_domains=["https://en.wikipedia.org/",
                                                 "https://www.britannica.com/"],
                                search_params={"type":"search"})
link_content = LinkContentFetcher(retry_attempts=3,
                                  timeout=10)
html_to_document = HTMLToDocument()
cleaner = DocumentCleaner(
	remove_empty_lines=True,
	remove_extra_whitespaces=True,
	remove_repeated_substrings=False)
writer = DocumentWriter(document_store=document_store)

# Initialize pipeline
pipeline = Pipeline()

# Add components
pipeline.add_component(name="search", instance=web_search)
pipeline.add_component(name="fetcher", instance=link_content)
pipeline.add_component(name="html_to_document", instance=html_to_document)
pipeline.add_component(name="cleaner", instance=cleaner)
pipeline.add_component(name="writer", instance=writer)

# Connect components to one another
pipeline.connect("search.links", "fetcher.urls")
pipeline.connect("fetcher", "html_to_document")
pipeline.connect("html_to_document.documents", "cleaner.documents")
pipeline.connect("cleaner.documents", "writer.documents")

# Draw pipeline
pipeline.draw("./images/search_fetch_clean_save_pipeline.png")

In [None]:
query = "What can you tell me about the year of the dragon?"
result = pipeline.run(data={"search":{"query":query}})

Show documents in the document store.

In [None]:
document_store.filter_documents()

## Extracting and processing content from files

We will use the following components to extract content from files:

Generate a table from the information below

| Component | Description |
| --- | --- |
| `AzureOCRDocumentConverter` | Converts PDF (both searchable and image-only), JPEG, PNG, BMP, TIFF, DOCX, XLSX, PPTX, and HTML to Documents. |
| `HTMLToDocument` | Converts HTML files to Documents. |
| `MarkdownToDocument` | Converts markdown files to Documents. |
| `PyPDFToDocument` | Converts PDF files to Documents. |
| `TikaDocumentConverter` | Converts various file types to Documents using Apache Tika. |
| `TextFileToDocument` | Converts text files to Documents. |




### Workflow to incorporate the components into a pipeline

The workflow below may be adapted to create a preprocessing pipeline:

1. Choose the appropriate file converter.  

2. Initialize the component to convert files into Haystack document objects. 
 
3. Initialize components to clean and store into a document store.  

4. Initialize a pipeline instance. 

5. Add components to the pipeline.  

6. Connect components in the correct order.  

7. Execute the pipeline.

Let's create a couple examples with the `MarkdownToDocument` and `TextFileToDocument` components.

In [5]:
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.converters import MarkdownToDocument, TextFileToDocument
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from pathlib import Path

# Initialize document store and components
document_store = InMemoryDocumentStore()
markdown_converter =  MarkdownToDocument()
document_cleaner = DocumentCleaner(
                    remove_empty_lines=True,
                    remove_extra_whitespaces=True,
                    remove_repeated_substrings=False
                )
document_splitter = DocumentSplitter(split_by="word", split_length=5)
document_writer = DocumentWriter(document_store=document_store)

# Initialize pipeline
pipeline = Pipeline()

# Add components
pipeline.add_component("converter", markdown_converter)
pipeline.add_component("cleaner", document_cleaner)
pipeline.add_component("splitter", document_splitter)
pipeline.add_component("writer", document_writer)

# Connect components to one another
pipeline.connect("converter", "cleaner")
pipeline.connect("cleaner.documents", "splitter.documents")
pipeline.connect("splitter.documents", "writer.documents")

# Execute pipeline
file_names = [str(f) for f in Path("./markdown_pages").rglob("*.md")]
pipeline.run({"converter": {"sources": file_names}})

In [3]:
document_store.filter_documents()

[Document(id=f10a8f3a4b330064c1c0660196236d13e816f94b864a92c0749698c50fd70d03, content: 'Tables | Option | Description |
 | ------ | ----------- |
 | data | path to data files to supply the d...', meta: {'file_path': 'markdown_pages/page3.md', 'source_id': 'daafb2334a53a68782bd589997cc0b01d8e4f60aaf94abbb99448f2c848010a0'}),
 Document(id=3b2ba3eb5de58bcf3caf8ede5900c01febb44b1256c97ce406d90b432536036b, content: 'files. | Links link text link with title Autoconverted link https://github.com/nodeca/pica (enable l...', meta: {'file_path': 'markdown_pages/page3.md', 'source_id': 'daafb2334a53a68782bd589997cc0b01d8e4f60aaf94abbb99448f2c848010a0'}),
 Document(id=31e41a3879d0fd48c26cef9e076af64c9383675cd19dff14d7606be29f800596, content: 'Advertisement :) pica - high quality and fast image
 resize in browser.
 babelfish - developer friendl...', meta: {'file_path': 'markdown_pages/page2.md', 'source_id': 'd07c32e07de151200abfb29f5f22dc759300ddcc75e064078d7d36ab7930f490'}),
 Document(id=94bdc3fc

Let's create a pipeline for the `TextFileToDocument` component.

In [15]:
# Initialize document store and components
document_store = InMemoryDocumentStore()
text_converter =  TextFileToDocument()
document_cleaner = DocumentCleaner(
                    remove_empty_lines=True,
                    remove_extra_whitespaces=True,
                    remove_repeated_substrings=False
                )
document_splitter = DocumentSplitter(split_by="word", split_length=5)
document_writer = DocumentWriter(document_store=document_store)

# Initialize pipeline
pipeline = Pipeline()

# Add components
pipeline.add_component("converter", text_converter)
pipeline.add_component("cleaner", document_cleaner)
pipeline.add_component("splitter", document_splitter)
pipeline.add_component("writer", document_writer)

# Connect components to one another
pipeline.connect("converter", "cleaner")
pipeline.connect("cleaner.documents", "splitter.documents")
pipeline.connect("splitter.documents", "writer.documents")


<haystack.pipeline.Pipeline at 0x7f88f17b6d40>

In [18]:
pipeline.draw("./images/text_file_to_document_pipeline.png")

In [16]:
# Execute pipeline
file_names = [str(f) for f in Path("./textfile-pages").rglob("*.txt")]
pipeline.run({"converter": {"sources": file_names}})

{'writer': {'documents_written': 3}}

In [17]:
document_store.filter_documents()

[Document(id=d5a6e4891649cc95de12efe9c221e7956d9964fdd4e1860ecb209d9ddd088d91, content: 'This is page 1 of the text file.
 Lorem ipsum dolor sit amet, consectetur adipiscing elit.
 Sed euismo...', meta: {'file_path': 'textfile-pages/page1.txt', 'source_id': '8c31b71f452f1d2bcaafeb90b6aee523eea0002ba49cb12ff39946b2532c7c1f'}),
 Document(id=e912e8a17c8e6524f68e23a64d7e5918ab6251ae99b5ee02fb75be55e6a2885d, content: 'This is page 2 of the text file.
 Lorem ipsum dolor sit amet, consectetur adipiscing elit.
 Nullam auc...', meta: {'file_path': 'textfile-pages/page2.txt', 'source_id': '964dbbbb44b231a6409552a36be07d3d97439bce04f47540370e25148535a216'}),
 Document(id=22ec06e031cd39e68c5380be4dfd82e05aaa5e6d2b18293d05136b49f59bd6f1, content: 'This is page 3 of the text file.
 Lorem ipsum dolor sit amet, consectetur adipiscing elit.
 Sed euismo...', meta: {'file_path': 'textfile-pages/page3.txt', 'source_id': '936add0cc365ed200afc5caf76cb6ddc686ea3c1ce231bdd989065731f3ea9bf'})]

### Incorporating a duplicate policy

The `DuplicatePolicy` is a class that defines the different options for handling documents with the same ID in a DocumentStore. It has three possible values:

* `OVERWRITE`: Indicates that if a document with the same ID already exists in the DocumentStore, it should be overwritten with the new document.
* `SKIP`: If a document with the same ID already exists, the new document will be skipped and not added to the DocumentStore.
* `FAIL`: Raises an error if a document with the same ID already exists in the DocumentStore. It prevents duplicate documents from being added.

We will pass the duplicate policy when we initialize the   `DocumentWriter()` component.

In [21]:
from haystack.document_stores.types import DuplicatePolicy

# Initialize document store and components
document_store = InMemoryDocumentStore()
text_converter =  TextFileToDocument()
document_cleaner = DocumentCleaner(
                    remove_empty_lines=True,
                    remove_extra_whitespaces=True,
                    remove_repeated_substrings=False
                )
document_splitter = DocumentSplitter(split_by="word", split_length=5)
document_writer = DocumentWriter(document_store=document_store,
                                 policy=DuplicatePolicy.SKIP)

# Initialize pipeline
pipeline = Pipeline()

# Add components
pipeline.add_component("converter", text_converter)
pipeline.add_component("cleaner", document_cleaner)
pipeline.add_component("splitter", document_splitter)
pipeline.add_component("writer", document_writer)

# Connect components to one another
pipeline.connect("converter", "cleaner")
pipeline.connect("cleaner.documents", "splitter.documents")
pipeline.connect("splitter.documents", "writer.documents")

file_names = [str(f) for f in Path("./textfile-pages").rglob("*.txt")]
pipeline.run({"converter": {"sources": file_names}})

{'writer': {'documents_written': 47}}