In [1]:
from typing import Dict, List
from pathlib import Path

# from llama_index import download_loader
from llama_hub.file.pymu_pdf.base import PyMuPDFReader
from llama_index import Document, download_loader

# Step 1: Logic for loading and parsing the files into llama_index documents.
# UnstructuredReader = download_loader("UnstructuredReader")
loader = PyMuPDFReader()
# loader = UnstructuredReader()

def load_and_parse_files(file_row: Dict[str, Path]) -> List[Dict[str, Document]]:
    documents = []
    file = file_row["path"]
    if file.is_dir():
        return []
    # Skip all non-html files like png, jpg, etc.
    if file.suffix.lower() == ".pdf":
        loaded_doc = loader.load(file)
        loaded_doc[0].extra_info = {"path": str(file)}
        documents.extend(loaded_doc)
    return [{"doc": doc} for doc in documents]

In [2]:
import ray
from pathlib import Path
# Get the paths for the locally downloaded documentation.
all_docs_gen = Path(".").rglob("*.pdf")
all_docs = [{"path": doc.resolve()} for doc in all_docs_gen]

In [3]:
# Create the Ray Dataset pipeline
ds = ray.data.from_items(all_docs)

# Use `flat_map` since there is a 1:N relationship.
# Each filepath returns multiple documents.
loaded_docs = ds.flat_map(load_and_parse_files)

2023-10-09 20:01:29,948	INFO worker.py:1642 -- Started a local Ray instance.


In [4]:
loaded_docs.show(500)

2023-10-09 20:01:58,828	INFO dataset.py:2380 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-10-09 20:01:58,831	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(load_and_parse_files)] -> LimitOperator[limit=500]
2023-10-09 20:01:58,831	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-10-09 20:01:58,832	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

{'doc': Document(id_='7ec81496-907b-4db4-af61-2351e85e8b7b', embedding=None, metadata={'path': '/home/saadsameerkhan/Personal/LlamIndex/normal_docs/CPA_contingencymemo_3.pdf'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='2ccb0620b6ffaa89f4a8305b0b915ce8459f0b602a4d3b18d7ec8b1122a9b2bf', text='C O N T I N G E N C Y  P L A N N I N G  M E M O R A N D U M  N O .  3  \nCrisis Between   \nUkraine and Russia \nSteven Pifer  \nJuly 2009 \n', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n')}
{'doc': Document(id_='ac67e80e-5789-4054-9357-6c7f763647bd', embedding=None, metadata={'total_pages': 11, 'file_path': PosixPath('/home/saadsameerkhan/Personal/LlamIndex/normal_docs/CPA_contingencymemo_3.pdf'), 'source': '2'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='4e1a198a7d31c7c6c94a60d3aebc3740d83de305c58e4e64f344f3af

In [5]:
# Step 2: Convert the loaded documents into llama_index Nodes. This will split the documents into chunks.
from llama_index.node_parser import SimpleNodeParser
from llama_index.data_structs import Node

def convert_documents_into_nodes(documents: Dict[str, Document]) -> List[Dict[str, Node]]:
    parser = SimpleNodeParser.from_defaults(chunk_size=700, chunk_overlap=20)
    document = documents["doc"]
    nodes = parser.get_nodes_from_documents([document]) 
    return [{"node": node} for node in nodes]

In [6]:
# Use `flat_map` since there is a 1:N relationship. Each document returns multiple nodes.
nodes = loaded_docs.flat_map(convert_documents_into_nodes)

In [7]:
nodes.show(100)

2023-10-09 20:02:10,445	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(load_and_parse_files)->FlatMap(convert_documents_into_nodes)] -> LimitOperator[limit=100]
2023-10-09 20:02:10,446	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-10-09 20:02:10,446	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

{'node': TextNode(id_='8f39e628-f08b-4ea6-bbcf-5318d1c7daea', embedding=None, metadata={'path': '/home/saadsameerkhan/Personal/LlamIndex/normal_docs/CPA_contingencymemo_3.pdf'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='ea6cf66f-55be-4787-9ab9-8b99acb41383', node_type=None, metadata={'path': '/home/saadsameerkhan/Personal/LlamIndex/normal_docs/CPA_contingencymemo_3.pdf'}, hash='2ccb0620b6ffaa89f4a8305b0b915ce8459f0b602a4d3b18d7ec8b1122a9b2bf')}, hash='92c78ab6375fad1fe26b36203eb85a3c2ee8edf2ced6c79f4610773f1181a144', text='C O N T I N G E N C Y  P L A N N I N G  M E M O R A N D U M  N O .  3  \nCrisis Between   \nUkraine and Russia \nSteven Pifer  \nJuly 2009', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n')}
{'node': TextNode(id_='11793ec7-43a8-4d75-bceb-75aeabf49411', embedding=None, metadata={'

In [8]:
# Step 3: Embed each node using a local embedding model.
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from llama_index.embeddings import LangchainEmbedding

class EmbedNodes:
    def __init__(self):
        self.embedding_model = HuggingFaceEmbeddings(
            # Use all-mpnet-base-v2 Sentence_transformer.
            # This is the default embedding model for LlamaIndex/Langchain.
            model_name="BAAI/bge-large-en-v1.5", 
            model_kwargs={"device": "cpu"},
            cache_folder='./bge_model',
            # Use GPU for embedding and specify a large enough batch size to maximize GPU utilization.
            # Remove the "device": "cuda" to use CPU instead.
            # encode_kwargs={"device": "cpu"}
            )

        # self.embedding_model = LangchainEmbedding(
        #     HuggingFaceEmbeddings(
        #         model_name="sentence-transformers/all-mpnet-base-v2",
        #         model_kwargs={"device": "cpu"},
        #         cache_folder='./mnpet_model',
        #         encode_kwargs={"device": "cpu"}
        #         )
        # ) 

    def __call__(self, node_batch: Dict[str, List[Node]]) -> Dict[str, List[Node]]:
        nodes = node_batch["node"]
        text = [node.text for node in nodes]
        embeddings = self.embedding_model.embed_documents(text)
        assert len(nodes) == len(embeddings)

        for node, embedding in zip(nodes, embeddings):
            node.embedding = embedding
        return {"embedded_nodes": nodes}

In [None]:
import torch
torch.set_num_threads(8)

In [42]:
# Use `map_batches` to specify a batch size to maximize GPU utilization.
# We define `EmbedNodes` as a class instead of a function so we only initialize the embedding model once. 
from ray.data import ActorPoolStrategy

# This state can be reused for multiple batches.
embedded_nodes = nodes.map_batches(
    EmbedNodes, 
    batch_size=5, 
    # Use 1 GPU per actor.
    num_cpus=1,
    # There are 4 GPUs in the cluster. Each actor uses 1 GPU. So we want 4 total actors.
    compute=ActorPoolStrategy(size=5)
    )


In [None]:
# alternate way of embedding:

# def embed_nodes(nodes: List[Node]) -> List[Node]:
#     embedding_model = LangchainEmbedding(
#             HuggingFaceEmbeddings(
#                 model_name="BAAI/bge-large-en-v1.5",
#                 model_kwargs={"device": "cpu"},
#                 cache_folder='./bge_model',
#                 encode_kwargs={"device": "cpu"}
#                 )
#     )

#     embeddings = embedding_model.embed_documents([node.text for node in nodes])
#     assert len(nodes) == len(embeddings)

#     for node, embedding in zip(nodes, embeddings):
#         node.embedding = embedding

#     return nodes

# embedded_nodes = nodes.map_batches(
#     embed_nodes,
#     batch_size=100,
# )


In [43]:
# Step 5: Trigger execution and collect all the embedded nodes.
ray_docs_nodes = []
for row in embedded_nodes.iter_rows():
    node = row["embedded_nodes"]
    assert node.embedding is not None
    ray_docs_nodes.append(node) 

2023-10-09 20:29:50,298	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[FlatMap(load_and_parse_files)->FlatMap(convert_documents_into_nodes)->MapBatches(EmbedNodes)]
2023-10-09 20:29:50,299	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-10-09 20:29:50,301	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-10-09 20:29:50,366	INFO actor_pool_map_operator.py:106 -- FlatMap(load_and_parse_files)->FlatMap(convert_documents_into_nodes)->MapBatches(EmbedNodes): Waiting for 5 pool actors to start...


In [None]:
Running 1/8 cpus when numgpus 0 actor pool size 1
ray logs raylet.out -ip 192.168.0.112

In [32]:
!ray status

Node status
---------------------------------------------------------------
Healthy:
 1 node_cb677e80d04e07db2a9d31875cade0cf0da6208569be98b7a39feccc
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 8.0/8.0 CPU
 0B/7.71GiB memory
 19.17KiB/3.85GiB object_store_memory

Demands:
 {'CPU': 8.0}: 1+ pending tasks/actors
[0m

