# Chains

> Chains-based functions for PDFs.

In [None]:
# | default_exp pdf.chains

In [None]:
# | export

from langchain_ray.imports import *
from langchain_ray.chains import *
from langchain_ray.pdf.utils import *

In [None]:
# | export


def pdf_docs_df_chain(
    chunk_size=200,
    chunk_overlap=20,
    input_variables=["pdf_folder"],
    output_variables=["docs_df"],
    verbose=False,
):
    "Chain that takes a PDF folder and returns a DataFrame of Documents."
    pdf_chain = transform_chain(
        create_pdf_df,
        input_variables=input_variables,
        output_variables=["pdfs_df"],
    )
    docs_chain = transform_chain(
        df_pdf_docs,
        input_variables=["pdfs_df"],
        output_variables=output_variables,
        transform_kwargs={"chunk_size": chunk_size, "chunk_overlap": chunk_overlap},
    )
    return SequentialChain(
        chains=[pdf_chain, docs_chain],
        input_variables=input_variables,
        output_variables=output_variables,
        verbose=verbose,
    )


def pdfs_to_docs_chain(
    chunk_size=200,
    chunk_overlap=20,
    input_variables=["pdf_folder"],
    output_variables=["docs"],
    verbose=False,
):
    "Chain that takes a PDF folder and returns a DataFrame of Documents."
    pdf_chain = transform_chain(
        pdf_files,
        input_variables=input_variables,
        output_variables=["pdfs"],
    )
    docs_chain = transform_chain(
        pdfs_to_docs,
        input_variables=["pdfs"],
        output_variables=output_variables,
        transform_kwargs={"chunk_size": chunk_size, "chunk_overlap": chunk_overlap},
    )
    return SequentialChain(
        chains=[pdf_chain, docs_chain],
        input_variables=input_variables,
        output_variables=output_variables,
        verbose=verbose,
    )


def pdf_cats_chain(
    cats_model,
    input_variables=["docs_df"],
    output_variables=["cats_df"],
):
    "Chain that takes a DataFrame of Documents and adds categories using a SetFit model."
    return transform_chain(
        df_docs_cat,
        input_variables=input_variables,
        output_variables=output_variables,
        transform_kwargs={"cats_model": cats_model},
    )


def pdf_ems_chain(
    ems_model,
    ems_folder,
    input_variables=["docs_df"],
    output_variables=["ems_df"],
):
    "Chain that takes a DataFrame of Documents and writes embeddings to `ems_folder` using `ems_model`."
    transform_chain(
        df_docs_ems,
        input_variables=input_variables,
        output_variables=output_variables,
        transform_kwargs={
            "ems_model": ems_model,
            "ems_folder": ems_folder,
        },
    )


def docs_faiss_chain(
    ems_model,
    index_folder,
    index_name,
    input_variables=["docs_df"],
    output_variables=["faiss_df"],
):
    "Chain that takes a DataFrame of Documents and adds them to a FAISS index in `index_folder`."
    return transform_chain(
        df_to_faiss,
        input_variables=input_variables,
        output_variables=output_variables,
        transform_kwargs={
            "ems_model": ems_model,
            "index_folder": index_folder,
            "index_name": index_name,
        },
    )


def docs_to_faiss_chain(
    ems_model,
    index_folder,
    index_name,
    input_variables=["docs"],
    output_variables=["docs"],
    verbose=False,
):
    "Chain that takes a DataFrame of Documents and adds them to a FAISS index in `index_folder`."
    return transform_chain(
        docs_to_faiss,
        input_variables=input_variables,
        output_variables=output_variables,
        transform_kwargs={
            "ems_model": ems_model,
            "index_folder": index_folder,
            "index_name": index_name,
        },
        data_kwargs_mapping={input_variables[0]: "docs"},
        verbose=verbose,
    )


def pdfs_to_faiss_chain(
    ems_model,  # The model to use for vectorestore embeddings.
    index_folder,  # The folder to store the FAISS index.
    index_name,  # The name of the FAISS index.
    input_variables=["pdf_folder"],  # The input key for the PDF folder.
    output_variables=["docs"],  # The output key for the final DataFrame.
    chunk_size=200,  # The number of characters per Document.
    chunk_overlap=20,  # The number of characters to overlap between Documents.
    docs_block_size=1500,  # The number of Documents to process in a single Ray task.
    num_cpus=12,  # The number of CPUs to use for Ray.
    num_gpus=1,  # The number of GPUs to use for Ray.
    verbose=False,
):
    """
    Chain that takes a `pdf_folder` and adds them to FAISS indexes in `index_folder`.
    """
    docs_chain = pdfs_to_docs_chain(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        input_variables=input_variables,
        output_variables=["dc"],
        verbose=verbose,
    )
    faiss_chain = docs_to_faiss_chain(
        ems_model=ems_model,
        index_folder=index_folder,
        index_name=index_name,
        input_variables=["dc"],
        output_variables=output_variables,
        verbose=verbose,
    )
    faiss_chain = ray_chain(
        faiss_chain,
        block_size=docs_block_size,
        num_cpus=num_cpus,
        num_gpus=num_gpus,
        verbose=verbose,
    )
    return SequentialChain(
        chains=[docs_chain, faiss_chain],
        input_variables=input_variables,
        output_variables=output_variables,
        verbose=verbose,
    )


def index_query_chain(
    ems_model,  # The SentenceTransformer model to use for vectorestore embeddings.
    index_folder,  # The folder with the FAISS indexes.
    index_name,  # The name of the FAISS index.
    input_variables=["query", "k"],  # The input key for the query.
    output_variables=["search_results"],  # The output key for the search results.
    block_size=10,  # The number of indexes to process in a single Ray task.
    num_cpus=12,  # The number of CPUs to use for Ray.
    num_gpus=1,  # The number of GPUs to use for Ray.
    verbose=False,
):
    """
    Chain that takes a query and returns the top `k` results from the FAISS indexes in `index_folder`.
    If there are more than `block_size` indexes, search will be distributed using Ray.
    """
    q_df_chain = transform_chain(
        create_idx_q_df,
        input_variables=["query"],
        output_variables=["q_df"],
        transform_kwargs={"index_folder": index_folder, "index_name": index_name},
    )

    def apply_search(df, ems_model, k):
        return df.apply(df_search_faiss, axis=1, ems_model=ems_model, k=k)

    search_chain = transform_chain(
        apply_search,
        transform_kwargs={"ems_model": ems_model},
        input_variables=["q_df", "k"],
        output_variables=["search_df"],
    )

    def flatten_res(df, k):
        return [sorted(flatten_list(df.results), key=lambda x: x[1])[:k]]

    res_chain = transform_chain(
        flatten_res,
        input_variables=["search_df", "k"],
        output_variables=output_variables,
    )

    return ray_chain(
        SequentialChain(
            chains=[q_df_chain, search_chain, res_chain],
            input_variables=input_variables,
            output_variables=output_variables,
            verbose=verbose,
        ),
        block_size=block_size,
        num_cpus=num_cpus,
        num_gpus=num_gpus,
    )


def index_query_chain(
    ems_model,  # The SentenceTransformer model to use for vectorestore embeddings.
    index_folder,  # The folder with the FAISS indexes.
    index_name,  # The name of the FAISS index.
    input_variables=["query", "k"],
    output_variables=["search_results"],  # The output key for the search results.
    block_size=10,  # The number of indexes to process in a single Ray task.
    num_cpus=12,  # The number of CPUs to use for Ray.
    num_gpus=1,  # The number of GPUs to use for Ray.
    verbose=False,
):
    """
    Chain that takes a query and returns the top `k` results from the FAISS indexes in `index_folder`.
    If there are more than `block_size` indexes, search will be distributed using Ray.
    """

    index_names_chain = transform_chain(
        index_names,
        transform_kwargs={"index_folder": index_folder, "index_name": index_name},
        input_variables=["k"],
        output_variables=["index_names"],
    )

    search_faiss_chain = transform_chain(
        search_faiss,
        transform_kwargs={"index_folder": index_folder, "ems_model": ems_model},
        input_variables=["index_names", "query", "k"],
        output_variables=["res"],
    )

    search_faiss_chain = ray_chain(
        search_faiss_chain, block_size=block_size, num_cpus=num_cpus, num_gpus=num_gpus
    )

    def flatten_res(res, k):
        if is_list(k):
            k = k[0]
        return [sorted(flatten_list(res), key=lambda x: x[1])[:k]]

    res_chain = transform_chain(
        flatten_res,
        input_variables=["res", "k"],
        output_variables=output_variables,
    )

    return SequentialChain(
        chains=[index_names_chain, search_faiss_chain, res_chain],
        input_variables=input_variables,
        output_variables=output_variables,
        verbose=verbose,
    )

In [None]:
# # | export


# def pdf_faiss_chain2(
#     ems_model,  # The SentenceTransformer model to use for vectorestore embeddings.
#     index_folder,  # The folder to store the FAISS index.
#     index_name,  # The name of the FAISS index.
#     input_variables=["pdf_folder"],  # The input key for the PDF folder.
#     output_variables=["df"],  # The output key for the final DataFrame.
#     chunk_size=200,  # The number of characters per Document.
#     chunk_overlap=20,  # The number of characters to overlap between Documents.
#     docs_block_size=1500,  # The number of Documents to process in a single Ray task.
#     cats_model=None,  # The HuggingFace model to use for categorization.
#     ems_chain_model=None,  # The SentenceTransformer model to use for chain embeddings.
#     ems_folder=None,  # The folder to store the embeddings.
#     verbose=False,
# ):
#     """
#     Chain that takes a PDF folder and adds them to FAISS indexes in `index_folder`. With optional categorization and chain embeddings.
#     If there are more than `docs_block_size` Documents, it will be divided and distributed into multiple indexes using Ray.
#     """
#     chain1 = pdf_docs_chain(
#         chunk_size=chunk_size, chunk_overlap=chunk_overlap, input_variables=input_variables
#     )
#     index_chains = []
#     if cats_model is not None:
#         cats_chain = pdf_cats_chain(cats_model)
#         index_chains.append(cats_chain)
#     if ems_folder is not None and ems_chain_model is not None:
#         ems_chain = pdf_ems_chain(ems_chain_model, ems_folder)
#         index_chains.append(ems_chain)

#     faiss_chain = docs_faiss_chain(
#         ems_model, index_folder, index_name, output_variables=output_variables
#     )
#     index_chains.append(faiss_chain)
#     chain2 = ray_chain(
#         SequentialChain(chains=index_chains, output_variables=output_variables),
#         block_size=docs_block_size,
#         cuda=True,
#     )
#     return SequentialChain(
#         chains=[chain1, chain2],
#         input_variables=input_variables,
#         output_variables=output_variables,
#         verbose=verbose
#     )

## Usage Example

Then we load our embeddings model using LangChain's `SentenceTransformerEmbeddings`.

In [None]:
# | eval: false

device = "cuda"
model_name = "HamzaFarhan/PDFSegs"

ems_model = SentenceTransformerEmbeddings(
    model_name=model_name, model_kwargs={"device": device}
)

Then we define the `index_folder` and `index_name`

In [None]:
# | eval: false
# | output: false


data_folder = Path("/media/hamza/data2/faiss_data/")
index_folder = data_folder / "saved_indexes"
index_name = "chain_index"

In [None]:
# | hide
# | eval: false


for f in index_folder.glob(f"{index_name}*"):
    f.unlink()

Then we create a chain for creating FAISS index(es).

<br>We're using job resumes in our example and we want to split the text into chunks of 3 lines. A job resume typically has 60-80 characters per line, so we set `chunk_size` to 200. So for each PDF, we'll have (number of lines / 3) `Documents`.

<br>Also, let's suppose we have thousands of extracted `Documents` and  we want to parallelize the indexing process.
<br>That's where `docs_block_size` comes in. It's the number of `Documents` that will be indexed in parallel using `Ray` tasks. Each task will create a separate FAISS index.
<br>You can pass the `num_cpus` and `num_gpus` arguments to specify the number of CPUs and GPUs to use for indexing. Those resources will be distributed evenly across the tasks.


In [None]:
# | eval: false
# | output: false


verbose = True

faiss_chain = pdfs_to_faiss_chain(
    ems_model=ems_model,
    index_folder=index_folder,
    index_name=index_name,
    docs_block_size=200,
    verbose=verbose,
)

Let's run the chain on a sample folder of 5 PDFs.

In [None]:
# | eval: false

pdf_folder = Path("../../resumes_5/")

faiss_docs = faiss_chain(pdf_folder)



[1m> Entering new  chain...[0m


[1m> Entering new  chain...[0m

[1m> Finished chain.[0m


[1m> Entering new  chain...[0m


[1m> Entering new  chain...[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m

[1m> Finished chain.[0m


In [None]:
#| eval: false

faiss_docs['docs'][:5]

[Document(page_content='Spearheaded complete purchase cycl e at Ruwais Adnoc Project \nand expedite the materials & services from source to final delivery \n In-depth knowledge of export control regulations and import', metadata={'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 1547}),
 Document(page_content='procedures inMiddle East \n Resourceful in finalizing the specifications of materials, \nestablishing quali ty & quantity limits for effective inventory \ncontrol and reducing wastages', metadata={'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 1745}),
 Document(page_content='Skilled in planning and monitoring warehouse operations of \nreceipt, storage, return of unused stock, inventory control and \nmonitoring inbound /outbound logistics', metadata={'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 1932}),
 Document(page_content="Successfull

There were only 148 `Documents`. So Ray was not used. We can lower the `docs_block_size` to force Ray to be used.

In [None]:
# | hide
# | eval: false


for f in index_folder.glob(f"{index_name}*"):
    f.unlink()

In [None]:
# | eval: false

faiss_chain2 = pdfs_to_faiss_chain(
    ems_model=ems_model,
    index_folder=index_folder,
    index_name=index_name,
    docs_block_size=50, # Changed
    verbose=verbose,
)

In [None]:
# | eval: false


faiss_docs2 = faiss_chain2(pdf_folder)



[1m> Entering new  chain...[0m


[1m> Entering new  chain...[0m

[1m> Finished chain.[0m


[1m> Entering new  chain...[0m
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


2023-07-10 00:01:10,122	INFO worker.py:1627 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)

[38;5;4mℹ Running chain on 3 blocks.[0m




Learn more here: https://docs.ray.io/en/master/data/faq.html#migrating-to-strict-mode[0m
2023-07-10 00:01:12,124	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-07-10 00:01:12,125	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-10 00:01:12,125	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


- Repartition 1:   0%|          | 0/3 [00:00<?, ?it/s]

Repartition 2:   0%|          | 0/3 [00:00<?, ?it/s]

Running 0:   0%|          | 0/3 [00:00<?, ?it/s]

[2m[36m(MapBatches(<lambda>) pid=121928)[0m 
[2m[36m(MapBatches(<lambda>) pid=121928)[0m 
[2m[36m(MapBatches(<lambda>) pid=121928)[0m [1m> Entering new  chain...[0m
[2m[36m(MapBatches(<lambda>) pid=121928)[0m [1m> Finished chain.[0m


2023-07-10 00:01:19,126	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-8, stopped daemon 140169780901632)>.



[1m> Finished chain.[0m

[1m> Finished chain.[0m


In [None]:
#| eval: false

faiss_docs2['docs'][:5]

[Document(page_content='Spearheaded complete purchase cycl e at Ruwais Adnoc Project \nand expedite the materials & services from source to final delivery \n In-depth knowledge of export control regulations and import', metadata={'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 1547}),
 Document(page_content='procedures inMiddle East \n Resourceful in finalizing the specifications of materials, \nestablishing quali ty & quantity limits for effective inventory \ncontrol and reducing wastages', metadata={'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 1745}),
 Document(page_content='Skilled in planning and monitoring warehouse operations of \nreceipt, storage, return of unused stock, inventory control and \nmonitoring inbound /outbound logistics', metadata={'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 1932}),
 Document(page_content="Successfull

It's that simple! We can now use the FAISS indexes to search for similar Documents.

Create an `index_query_chain`.

In [None]:
# | eval: false

query_chain = index_query_chain(
    ems_model=ems_model,
    index_folder=index_folder,
    index_name=index_name,
    block_size=10,
    verbose=verbose,
)

In [None]:
# | eval: false

query = "I got my degree from the University of Toronto"
search_res = query_chain(dict(query=query, k=3))['search_results']



[1m> Entering new  chain...[0m

[1m> Finished chain.[0m


In [None]:
# | eval: false

print("Search Results:\n")
for doc in search_res:
    print(f"+{'-'*100}+")
    print()
    print_doc(doc[0])

Search Results:

+----------------------------------------------------------------------------------------------------+

[1mPage_Content:[0m Bachelor of Commerce (B. Com) - University of Mumbai 2008 - 2011

[1mMetadata:[0m {'source': '../../resumes_5/0cf20170-8051-41ba-9060-1a82d43f4289.pdf', 'page': 0, 'start_index': 3474}

+----------------------------------------------------------------------------------------------------+

[1mPage_Content:[0m in 1997 
 
 B.A. from Punjab University, Lahore 
in 1991 
 
 
CE R T I F I C A T I O N S :
 
 
 CTLP (Certified Trade & Logistics 
Professional) from Dubai World, 
Dubai - UAE in 2012

[1mMetadata:[0m {'source': '../../resumes_5/0f479ee8-5fd9-4f55-b254-5e8feef08038.pdf', 'page': 0, 'start_index': 356}

+----------------------------------------------------------------------------------------------------+

[1mPage_Content:[0m EDUCATION 
Cloud Computing for Big Data , Post Graduate Diploma (GPA 3.61 Dean's Honor list) Jan 2019 - Aug 202

In [None]:
# | hide
import nbdev

nbdev.nbdev_export()