#### [LangChain Handbook](https://pinecone.io/learn/langchain)

# Retrieval Augmentation

**L**arge **L**anguage **M**odels (LLMs) have a data freshness problem. The most powerful LLMs in the world, like GPT-4, have no idea about recent world events.

The world of LLMs is frozen in time. Their world exists as a static snapshot of the world as it was within their training data.

A solution to this problem is *retrieval augmentation*. The idea behind this is that we retrieve relevant information from an external knowledge base and give that information to our LLM. In this notebook we will learn how to do that.

To begin, we must install the prerequisite libraries that we will be using in this notebook. If we install all libraries we will find a conflict in the Hugging Face `datasets` library so we must install everything in a specific order like so:

In [None]:
!pip install -qU \
    datasets==2.12.0 \
    apache_beam \
    mwparserfromhell

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m474.6/474.6 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m52.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m191.0/191.0 kB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m110.5/110.5 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m17.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.9/294.9 kB[0m [31m28.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setu

## Building the Knowledge Base

In [None]:
# !pip install unstructured pdfminer pdf2image install PIL pdfminer.high_level
# !pip install pdfminer
!pip install langchain --upgrade
!pip install pdfminer.six pypdf

Collecting langchain
  Downloading langchain-0.0.293-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
Collecting dataclasses-json<0.6.0,>=0.5.7 (from langchain)
  Downloading dataclasses_json-0.5.14-py3-none-any.whl (26 kB)
Collecting langsmith<0.1.0,>=0.0.38 (from langchain)
  Downloading langsmith-0.0.38-py3-none-any.whl (38 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.6.0,>=0.5.7->langchain)
  Downloading marshmallow-3.20.1-py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.4/49.4 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.6.0,>=0.5.7->langchain)
  Downloading typing_inspect-0.9.0-py3-none-any.whl (8.8 kB)
Collecting mypy-extensions>=0.3.0 (from typing-inspect<1,>=0.4.0->dataclasses-json<0.6.0,>=0.5.7->langchain)
  Downloading mypy_extensions-1.0.0-py3-none-

In [None]:
from langchain.document_loaders import UnstructuredPDFLoader, OnlinePDFLoader, PyPDFLoader, PyPDFDirectoryLoader, WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [None]:
# loader = UnstructuredPDFLoader("./Learning_Apache_Flink.pdf")
# loader = PyPDFLoader("Introduction_to_Apache_Flink.pdf")
import os

pdf_folder_path = './pdf'
# loader = [PyPDFLoader(os.path.join(pdf_folder_path, fn)) for fn in os.listdir(pdf_folder_path)]
loader = PyPDFDirectoryLoader(pdf_folder_path)

In [None]:
data = loader.load()



In [None]:
print(f"You have {len(data)} document(s) in your data.")
print(f"There are {len(data[30].page_content)} characters in your document.")

You have 1000 document(s) in your data.
There are 1461 characters in your document.


In [None]:
print(type(data))
print(type(data[0]))

<class 'list'>
<class 'langchain.schema.document.Document'>


In [None]:
data[2]

Document(page_content='Figure 1: The execution graph for incremental word\ncount\nto maintain the current count for each word as their\ninternal state.\n1val env : StreamExecutionEnvironment = ...\n2env.setParallelism(2)\n3\n4val wordStream = env.readTextFile(path)\n5val countStream = wordStream.groupBy(_).count\n6countStream.print\nExample 1: Incremental Word Count\n3.2 Distributed Dataﬂow Execution\nWhen a user executes an application all DataStream\noperators compile into an execution graph that is in\nprinciple a directed graph G= (T,E), similarly to Na-\niad [11] where vertices Trepresent tasks and edges E\nrepresent data channels between tasks. An execution\ngraph is depicted in Fig. 1 for the incremental word\ncount example. As shown, every instance of an opera-\ntor is encapsulated on a respective task. Tasks can be\nfurther classiﬁed as sources when they have no input\nchannels and sinks when no output channels are set.\nFurthermore, Mdenotes the set of all records trans-\nfer

[**RecursiveUrlLoader**](https://https://python.langchain.com/docs/integrations/document_loaders/recursive_url_loader): We may want to process load all URLs under a root directory.

**Parameters**
*   url: str, the target url to crawl.
*   exclude_dirs: Optional[str], webpage directories to exclude.
*   use_async: Optional[bool], wether to use async requests, using async requests is usually faster in large tasks. However, async will disable the lazy loading feature(the function still works, but it is not lazy). By default, it is set to False.
*   extractor: Optional[Callable[[str], str]], a function to extract the text of the document from the webpage, by default it returns the page as it is. It is recommended to use tools like goose3 and beautifulsoup to extract the text. By default, it just returns the page as it is.
*   max_depth: Optional[int] = None, the maximum depth to crawl. By default, it is set to 2. If you need to crawl the whole website, set it to a number that is large enough would simply do the job.
*   timeout: Optional[int] = None, the timeout for each request, in the unit of seconds. By default, it is set to 10.
*   prevent_outside: Optional[bool] = None, whether to prevent crawling outside the root url. By default, it is set to True.

In [None]:
# from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader

# from bs4 import BeautifulSoup as Soup
# url = "https://nightlies.apache.org/flink/flink-docs-master/"
# loader = RecursiveUrlLoader(url=url, max_depth=2, extractor=lambda x: Soup(x, "html.parser").text)
# webs2 = loader.load()
# print(len(webs2))
# print(webs2[0])

  loader = RecursiveUrlLoader(url=url, max_depth=2, extractor=lambda x: Soup(x, "html.parser").text)
  soup = BeautifulSoup(raw_html, "html.parser")


646
page_content='\n\n\n\n\n\n\n\n\n\n\n\nLogging | Apache Flink\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nv1.19-SNAPSHOT\n\n\n\n\n\n\n\n\n\xa0\xa0Try Flinkâ\x96¾\n\n\n\nFirst steps\n\n\nFraud Detection with the DataStream API\n\n\nReal Time Reporting with the Table API\n\n\nFlink Operations Playground\n\n\n\n\n\n\xa0\xa0Learn Flinkâ\x96¾\n\n\n\nOverview\n\n\nIntro to the DataStream API\n\n\nData Pipelines & ETL\n\n\nStreaming Analytics\n\n\nEvent-driven Applications\n\n\nFault Tolerance\n\n\n\n\n\n\xa0\xa0Conceptsâ\x96¾\n\n\n\nOverview\n\n\nStateful Stream Processing\n\n\nTimely Stream Processing\n\n\nFlink Architecture\n\n\nGlossary\n\n\n\n\n\n\n\xa0\xa0Application Developmentâ\x96¾\n\n\n\n\nProject Configurationâ\x96¾\n\n\n\nOverview\n\n\nUsing Maven\n\n\nUsing Gradle\n\n\nConnectors and Formats\n\n\nTest Dependencies\n\n\nAdvanced Configuration\n\n\n\n\n\nDataStream APIâ\x96¾\n\n\n\nOverview\n\n\nExecution Mode (Batch/Streaming)\n\n\n\nEvent Timeâ\x96¾\n\n\n\nGenerating Watermarks\n

In [None]:
print(type(webs2))
print(type(webs2[0]))

<class 'list'>
<class 'langchain.schema.document.Document'>


[**URLLoader**](https://python.langchain.com/docs/integrations/document_loaders/url):
This covers how to load HTML documents from a list of URLs into a document format that we can use downstream.
Note that the data format could be different from WebBaseLoader.

In [None]:
from langchain.document_loaders import UnstructuredURLLoader

# This is for testing source in GPT.
urls = [
    "https://www.understandingwar.org/backgrounder/russian-offensive-campaign-assessment-february-8-2023",
    "https://www.understandingwar.org/backgrounder/russian-offensive-campaign-assessment-february-9-2023",
]
loader = UnstructuredURLLoader(urls=urls)
webs3 = loader.load()

[**WebBaseLoader**](https://python.langchain.com/docs/integrations/document_loaders/web_base): This covers how to use WebBaseLoader to load all text from HTML webpages into a document format that we can use downstream. For more custom logic for loading webpages look at some child class examples such as IMSDbLoader, AZLyricsLoader, and CollegeConfidentialLoader.

*   Load HTML pages using urllib and parse them with `BeautifulSoup’.
*   Initialize with webpage path.

In [None]:
list_of_websites = ["https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home",
                    "https://twitter.com/ApacheFlink",
                    "https://www.meetup.com/topics/apache-flink/",
                    "https://flink.apache.org/posts/",
                    "https://www.ververica.com/blog",
                    "https://rawkintrevo.org/category/flink/",
                    "https://data-flair.training/blogs/category/flink/",
                    "https://www.bytefish.de/blog/apache_flink_series_1.html",
                    "https://www.bytefish.de/blog/apache_flink_series_2.html",
                    "https://www.bytefish.de/blog/apache_flink_series_3.html",
                    "https://www.bytefish.de/blog/apache_flink_series_4.html",
                    "https://www.bytefish.de/blog/apache_flink_series_5.html",
                    ]
loader = WebBaseLoader(list_of_websites)
webs = loader.load()
webs

[Document(page_content='\n\n\n\nApache Flink Home - Apache Flink - Apache Software Foundation\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nLog in\nSkip to sidebar\nSkip to main content\nLinked ApplicationsLoading...Apache Software Foundation\n\n\nSpaces\n\n\n\n\n\n\n\nHit enter to search\n\n\n\nHelp\n\n\n\n\n\n\n        Online Help\n\n\n\n\n        Keyboard Shortcuts\n\n\n\n\n        Feed Builder\n\n\n\n\n        What’s new\n\n\n\n\n        What’s new\n\n\n\n\n        Available Gadgets\n\n\n\n\n        About Confluence\n\n\n\n\n\n\n\n\n\n\n\n\n\n        Log in\n\n\n\n\n        Sign up\n\n\n\n\n\n\n\n\n\n\nApache FlinkPage tree\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nBrowse pagesConfigureSpace tools\n\n\n\n\n\n\n\n\n\n\n\n\n \n\n\n\n\n\n                                Attachments (0)\n             \n\n\n\n\n                                Page History\n 

In [None]:
webs[2]

Document(page_content='Skip to content', metadata={'source': 'https://www.meetup.com/topics/apache-flink/', 'language': 'en-US'})

Now we install the remaining libraries:

In [None]:
# Please ignore the library conflicts.
!pip3 install -qU \
  langchain==0.0.162 \
  openai==0.27.7 \
  tiktoken==0.4.0 \
  "pinecone-client[grpc]"==2.2.2

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m770.9/770.9 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.0/72.0 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m16.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m179.1/179.1 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.0/90.0 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.5/62.5 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m34.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m36.8 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resol

---

🚨 _Note: the above `pip install` is formatted for Jupyter notebooks. If running elsewhere you may need to drop the `!`._

---

Every record contains *a lot* of text. Our first task is therefore to identify a good preprocessing methodology for chunking these articles into more "concise" chunks to later be embedding and stored in our Pinecone vector database.

For this we use LangChain's `RecursiveCharacterTextSplitter` to split our text into chunks of a specified max length.

In [None]:
import tiktoken

tiktoken.encoding_for_model('gpt-3.5-turbo')

<Encoding 'cl100k_base'>

In [None]:
import tiktoken

tokenizer = tiktoken.get_encoding('cl100k_base')

# create the length function
def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

tiktoken_len("hello I am a chunk of text and using the tiktoken_len function "
             "we can find the length of this chunk of text in tokens")

26

In [None]:
# This code is not in use, please run the following one.
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=300,
    chunk_overlap=0,
    length_function=tiktoken_len,
    separators=["\n\n", "\n", " ", ""]
)

In [None]:
chunks = text_splitter.split_documents(data)
# chunks

In [None]:
print(chunks[0].metadata)

{'source': 'pdf/1506.08603.pdf', 'page': 0}


In [None]:
tiktoken_len(chunks[0].page_content), tiktoken_len(chunks[1].page_content), tiktoken_len(chunks[2].page_content)

(379, 384, 55)

Using the `text_splitter` we get much better sized chunks of text. We'll use this functionality during the indexing process later. Now let's take a look at embedding.

## Creating Embeddings

Building embeddings using LangChain's OpenAI embedding support is fairly straightforward. We first need to add our [OpenAI api key]() by running the next cell:

In [None]:
import os

# get openai api key from platform.openai.com
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') or 'OPENAI_API_KEY'

*(Note that OpenAI is a paid service and so running the remainder of this notebook may incur some small cost)*

After initializing the API key we can initialize our `text-embedding-ada-002` embedding model like so:

In [None]:
from langchain.embeddings.openai import OpenAIEmbeddings

model_name = 'text-embedding-ada-002'

embed = OpenAIEmbeddings(
    model=model_name,
    openai_api_key=OPENAI_API_KEY
)

Now we embed some text like so:

In [None]:
texts = [
    'this is the first chunk of text',
    'then another second chunk of text is here'
]

res = embed.embed_documents(texts)
len(res), len(res[0])

(2, 1536)

From this we get *two* (aligning to our two chunks of text) 1536-dimensional embeddings.

Now we move on to initializing our Pinecone vector database.

## Vector Database

To create our vector database we first need a [free API key from Pinecone](https://app.pinecone.io). Then we initialize like so:

In [None]:
index_name = 'YOUR_PINECONE_INDEX_NAME'

In [None]:
import pinecone

# find API key in console at app.pinecone.io
PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') or 'PINECONE_API_KEY'
# find ENV (cloud region) next to API key in console
PINECONE_ENVIRONMENT = os.getenv('PINECONE_ENVIRONMENT') or 'PINECONE_ENVIRONMENT'

pinecone.init(
    api_key='PINECONE_API_KEY',
    environment='PINECONE_ENVIRONMENT'
)

if index_name not in pinecone.list_indexes():
    # we create a new index
    pinecone.create_index(
        name=index_name,
        metric='cosine',
        dimension=len(res[0])  # 1536 dim of text-embedding-ada-002
    )

  from tqdm.autonotebook import tqdm


Then we connect to the new index:

In [None]:
index = pinecone.GRPCIndex(index_name)

index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 0}},
 'total_vector_count': 0}

We should see that the new Pinecone index has a `total_vector_count` of `0`, as we haven't added any vectors yet.

## Indexing

We can perform the indexing task using the LangChain vector store object. But for now it is much faster to do it via the Pinecone python client directly. We will do this in batches of `100` or more.

In [None]:
final_data = []
for i in data:
  final_data.append(i)
print(len(final_data))
for i in webs:
  final_data.append(i)
print(len(final_data))
for i in webs2:
  final_data.append(i)
print(len(final_data))


1000
1012
1658


In [None]:
from tqdm.auto import tqdm
from uuid import uuid4

batch_limit = 100

texts = []
metadatas = []

# for i, document in enumerate(tqdm(data)):
for i, document in enumerate(tqdm(final_data)):
    # first get metadata fields for this record
    # print(i)
    # print(type(i))
    metadata = {
        'source': document.metadata['source']
    }
    # print(document.metadata['source'])
    # now we create chunks from the record text
    record_texts = text_splitter.split_text(document.page_content)
    # create individual metadata dicts for each chunk
    record_metadatas = [{
        "chunk": j, "text": text, **metadata
    } for j, text in enumerate(record_texts)]
    # append these to current batches
    texts.extend(record_texts)
    metadatas.extend(record_metadatas)
    # if we have reached the batch_limit we can add texts
    if len(texts) >= batch_limit:
        ids = [str(uuid4()) for _ in range(len(texts))]
        embeds = embed.embed_documents(texts)
        index.upsert(vectors=zip(ids, embeds, metadatas))
        texts = []
        metadatas = []

# for i, document in enumerate(tqdm(webs)):
#     # first get metadata fields for this record
#     # print(i)
#     # print(type(i))
#     metadata = {
#         'source': str(document.metadata['source'])
#     }
#     print(document.metadata['source'])
#     # now we create chunks from the record text
#     record_texts = text_splitter.split_text(document.page_content)
#     # create individual metadata dicts for each chunk
#     record_metadatas = [{
#         "chunk": j, "text": text, **metadata
#     } for j, text in enumerate(record_texts)]
#     # append these to current batches
#     texts.extend(record_texts)
#     metadatas.extend(record_metadatas)
#     # if we have reached the batch_limit we can add texts
#     if len(texts) >= batch_limit:
#         ids = [str(uuid4()) for _ in range(len(texts))]
#         embeds = embed.embed_documents(texts)
#         index.upsert(vectors=zip(ids, embeds, metadatas))
#         texts = []
#         metadatas = []

if len(texts) > 0:
    ids = [str(uuid4()) for _ in range(len(texts))]
    embeds = embed.embed_documents(texts)
    index.upsert(vectors=zip(ids, embeds, metadatas))

We've now indexed everything. We can check the number of vectors in our index like so:

In [None]:
index.describe_index_stats()

## Creating a Vector Store and Querying

Now that we've build our index we can switch back over to LangChain. We start by initializing a vector store using the same index we just built. We do that like so:

In [None]:
from langchain.vectorstores import Pinecone

text_field = "text"

# switch back to normal index for langchain
index = pinecone.Index(index_name)

vectorstore = Pinecone(
    index, embed.embed_query, text_field
)

  texts: Iterable[str],


In [None]:
query = "What is the architecture of flink?"

vectorstore.similarity_search(
    query,  # our search query
    k=3  # return 3 most relevant docs
)

[Document(page_content='Chapter 3. \nThe Architecture of\nApache Flink\nChapter 2\n discussed\n important concepts of distributed stream\nprocessing, such as parallelization, time, and state. In this chapter, we\ngive a high-level introduction to Flink’s architecture and describe how\nFlink addresses the aspects of stream processing we discussed earlier.\nIn particular, we explain Flink’s distributed architecture, show how it\nhandles time and state in streaming applications, and discuss its fault-\ntolerance mechanisms. This chapter provides relevant background\ninformation to successfully implement and operate advanced streaming\napplications with Apache Flink. It will help you to understand Flink’s\ninternals and to reason about the performance and behavior of\nstreaming applications.\nSystem Architecture\nFlink\n is a distributed system for stateful parallel data stream\nprocessing. A Flink setup consists of multiple processes that typically\nrun distributed across multiple machine

All of these are good, relevant results. But what can we do with this? There are many tasks, one of the most interesting (and well supported by LangChain) is called _"Generative Question-Answering"_ or GQA.

## Generative Question-Answering

In GQA we take the query as a question that is to be answered by a LLM, but the LLM must answer the question based on the information it is seeing being returned from the `vectorstore`.

To do this we initialize a `RetrievalQA` object like so:

In [None]:
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA

# completion llm
llm = ChatOpenAI(
    openai_api_key=OPENAI_API_KEY,
    model_name='gpt-3.5-turbo',
    temperature=0.0
)

qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever()
)

In [None]:
qa.run(query)

"The architecture of Apache Flink is a distributed system for stateful parallel data stream processing. It consists of multiple processes that typically run distributed across multiple machines. The main components of Flink's architecture are:\n\n1. JobManager: The JobManager is responsible for coordinating the execution of Flink applications. It receives the dataflow program from the client, schedules tasks, and manages the overall execution of the application.\n\n2. TaskManagers: TaskManagers are responsible for executing the tasks assigned to them by the JobManager. They run the actual computations and process the data streams. TaskManagers can run on multiple machines in a cluster.\n\n3. Client: The client is not part of the runtime and program execution, but it is used to prepare and send the dataflow program to the JobManager. The client can disconnect after submitting the program or stay connected to receive progress reports.\n\n4. Cluster Resource Manager: Flink integrates with

We can also include the sources of information that the LLM is using to answer our question. We can do this using a slightly different version of `RetrievalQA` called `RetrievalQAWithSourcesChain`:

In [None]:
from langchain.chains import RetrievalQAWithSourcesChain

qa_with_sources = RetrievalQAWithSourcesChain.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever()
)

In [None]:
qa_with_sources(query)

{'question': 'What is the architecture of flink?',
 'answer': 'The architecture of Flink consists of multiple processes that run distributed across multiple machines. It is a distributed system for stateful parallel data stream processing. The Flink runtime consists of a JobManager and one or more TaskManagers. The JobManager is responsible for coordinating the execution of the dataflow program, while the TaskManagers are responsible for executing the actual tasks. Flink integrates with cluster resource managers such as Hadoop YARN and Kubernetes, but can also be set up to run as a standalone cluster or as a library.\n',
 'sources': 'pdf/Stream Processing with Apache Flink.pdf, https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/, pdf/FULLTEXT01.pdf'}

Now we answer the question being asked, *and* return the source of this information being used by the LLM.

---