In [None]:
%%capture
!pip install llama-index==0.10.22 llama-index-embeddings-openai qdrant-client llama-index-vector-stores-qdrant

In [None]:
import os
import nest_asyncio
from getpass import getpass
nest_asyncio.apply()

In [None]:
os.environ['CO_API_KEY'] = getpass("Enter your Cohere API key: ")

In [None]:
QDRANT_URL = getpass("Enter your Qdrant URL:")

In [None]:
QDRANT_API_KEY = getpass("Enter your Qdrant API Key:")

In [None]:
from llama_index.core.settings import Settings
from llama_index.llms.cohere import Cohere
from llama_index.embeddings.fastembed import FastEmbedEmbedding

Settings.llm = Cohere(model="command-r")
Settings.embed_model = FastEmbedEmbedding(model_name="BAAI/bge-small-en-v1.5")

# Ingestion Pipeline

- 🔄 **IngestionPipeline Overview**: Utilizes `Transformations` applied to input data, modifying data into nodes, which are returned or inserted to a vector database.

- 💾 **Caching Mechanism**: Each node+transformation pair is cached, enhancing efficiency for identical subsequent operations by utilizing cached results.


### Using an `IngestionPipeline`

First, let's read in some data. 

In [None]:
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader(
    input_files = ["../data/it_can_be_done.txt"], 
    filename_as_id=True).load_data()

# Ingestion Pipeline with Document Management


 •  💾 **Caching in IngestionPipeline**: Hashes and stores each node + transformation combination to speed up future processes with identical data.

 •  📁 **Local Cache Management**: The input nodes list and transformation pair are cached in the pipeline. When we apply the same transformation to that list of nodes again, the output nodes are retrieved from the cache.

 •  📚 **Docstore Attachment**:  Enables document management in the ingestion pipeline, using `doc_id` or `node.ref_doc_id` for identification. Prevents running a transformation on the same document multiple times by using the document ID and the hash of the document content to manage duplicates.

 •  🗂️ **Duplicate Handling**:
  - Maintains a `doc_id` to `document_hash` map to identify duplicates.

  - Re-processes documents if the same `doc_id` is found with a changed hash.

  - Skips documents if the same `doc_id` is found but the hash remains unchanged.

 •  🚫 **Without Vector Store**:
  - Limited to checking and removing duplicate inputs.

 •  ✨ **With Vector Store**:
  - Enables handling of upserts for updated documents, offering advanced management capabilities.

In [None]:
from qdrant_client import QdrantClient
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.ingestion import IngestionPipeline, IngestionCache

client = QdrantClient(
    url=QDRANT_URL, 
    api_key=QDRANT_API_KEY,
)

vector_store = QdrantVectorStore(
    client=client, 
    collection_name="it_can_be_done")

ingest_cache = IngestionCache(
    collection="it_can_be_done",
)

# create pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        TokenTextSplitter(chunk_size=256, chunk_overlap=16),
        Settings.embed_model
    ],
    docstore=SimpleDocumentStore(),
    vector_store=vector_store,
    cache=ingest_cache,
)

# run the pipeline
nodes = pipeline.run(documents = documents)

In [None]:
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

The ingestion pipeline allows for saves the cache and docstore to a default folder `(./pipeline_storage)`. 

When running the pipeline, it reuses the cache, skips duplicate documents in the docstore.

In [None]:
pipeline.persist('./pipeline_storage')