In [1]:
%%capture
%pip install llama-index qdrant-client llama-index-vector-stores-qdrant llama-index-llms-cohere llama-index-embeddings-cohere

In [2]:
import os
from dotenv import load_dotenv
from getpass import getpass

import nest_asyncio

nest_asyncio.apply()
load_dotenv()

True

In [3]:
CO_API_KEY = os.environ['CO_API_KEY'] or getpass("Enter your Cohere API key: ")

In [None]:
#OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] or getpass("Enter your OpenAI API key: ")

In [4]:
QDRANT_URL = os.environ['QDRANT_URL'] or getpass("Enter your Qdrant URL:")

In [5]:
QDRANT_API_KEY = os.environ['QDRANT_API_KEY'] or  getpass("Enter your Qdrant API Key:")

In [6]:
from llama_index.core.settings import Settings
from llama_index.llms.cohere import Cohere
#from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.embeddings.cohere import CohereEmbedding

Settings.llm = Cohere(model="command-r-plus", api_key=CO_API_KEY)

#Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
Settings.embed_model = CohereEmbedding(model_name="embed-english-v3.0", api_key=CO_API_KEY)

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /opt/conda/envs/llama/lib/python3.13/site-
[nltk_data]     packages/llama_index/core/_static/nltk_cache...
[nltk_data]   Package punkt_tab is already up-to-date!


# Ingestion Pipeline

- 🔄 **IngestionPipeline Overview**: Utilizes `Transformations` applied to input data, modifying data into nodes, which are returned or inserted to a vector database.

- 💾 **Caching Mechanism**: Each node+transformation pair is cached, enhancing efficiency for identical subsequent operations by utilizing cached results.


### Using an `IngestionPipeline`

First, let's read in some data. 

In [7]:
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader(
    input_files = ["../02_Fundamental_Concepts_in_LlamaIndex/data/pg10763.txt"], 
    filename_as_id=True).load_data()

# Ingestion Pipeline with Document Management


 •  💾 **Caching in IngestionPipeline**: Hashes and stores each node + transformation combination to speed up future processes with identical data.

 •  📁 **Local Cache Management**: The input nodes list and transformation pair are cached in the pipeline. When we apply the same transformation to that list of nodes again, the output nodes are retrieved from the cache.

 •  📚 **Docstore Attachment**:  Enables document management in the ingestion pipeline, using `doc_id` or `node.ref_doc_id` for identification. Prevents running a transformation on the same document multiple times by using the document ID and the hash of the document content to manage duplicates.

 •  🗂️ **Duplicate Handling**:
  - Maintains a `doc_id` to `document_hash` map to identify duplicates.

  - Re-processes documents if the same `doc_id` is found with a changed hash.

  - Skips documents if the same `doc_id` is found but the hash remains unchanged.

 •  🚫 **Without Vector Store**:
  - Limited to checking and removing duplicate inputs.

 •  ✨ **With Vector Store**:
  - Enables handling of upserts for updated documents, offering advanced management capabilities.

In [20]:
from qdrant_client import QdrantClient

from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.core.ingestion import IngestionCache, IngestionPipeline
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.vector_stores.qdrant import QdrantVectorStore

client = QdrantClient(
    url=QDRANT_URL, 
    api_key=QDRANT_API_KEY,
)

vector_store = QdrantVectorStore(
    client=client, 
    collection_name="it_can_be_done")

ingest_cache = IngestionCache(
    collection="it_can_be_done",
)

# create pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        TokenTextSplitter(chunk_size=1024, chunk_overlap=16),# Had to change from 256 to work with token limits of free tier
        Settings.embed_model
    ],
    docstore=SimpleDocumentStore(),
    vector_store=vector_store,
    cache=ingest_cache,
)

# run the pipeline
nodes = pipeline.run(documents = documents)

In [12]:
nodes[0].__dict__.keys()

dict_keys(['id_', 'embedding', 'metadata', 'excluded_embed_metadata_keys', 'excluded_llm_metadata_keys', 'relationships', 'text', 'mimetype', 'start_char_idx', 'end_char_idx', 'text_template', 'metadata_template', 'metadata_seperator'])

In [13]:
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

In [14]:
retriever = index.as_retriever(
    similarity_top_k=7, 
    return_sources=True
    )

In [15]:
retrieved_nodes = retriever.retrieve("Poems about starting where you stand, and not making dreams your master")

In [16]:
retrieved_nodes

[NodeWithScore(node=TextNode(id_='dde8063b-0a48-4f1e-a96f-86b85123b615', embedding=None, metadata={'file_path': '../02_Fundamental_Concepts_in_LlamaIndex/data/pg10763.txt', 'file_name': 'pg10763.txt', 'file_type': 'text/plain', 'file_size': 405245, 'creation_date': '2025-02-02', 'last_modified_date': '2024-11-05'}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='../02_Fundamental_Concepts_in_LlamaIndex/data/pg10763.txt', node_type=<ObjectType.DOCUMENT: '4'>, metadata={'file_path': '../02_Fundamental_Concepts_in_LlamaIndex/data/pg10763.txt', 'file_name': 'pg10763.txt', 'file_type': 'text/plain', 'file_size': 405245, 'creation_date': '2025-02-02', 'last_modified_date': '2024-11-05'}, hash='7ed8c6d21497c7614e4

In [17]:
print(retrieved_nodes[0].get_text())

To serve your turn long after they are gone,
  And so hold on when there is nothing in you
    Except the Will which says to them; "Hold on!"

  If you can talk with crowds and keep your virtue,
    Or walk with Kings--nor lose the common touch,
  If neither foes nor loving friends can hurt you,
    If all men count with you, but none too much;
  If you can fill the unforgiving minute
    With sixty seconds' worth of distance run,
  Yours is the Earth and everything that's in it,
    And--which is more--you'll be a Man, my son!


_Rudyard Kipling._

From "Rudyard Kipling's Verse, 1885-1918."




INVICTUS


Triumph in spirit over adverse conditions is the keynote of this poem of
courage undismayed. It rings with the power of the individual to guide
his own destiny.


  Out of the night that covers me,
    Black as the Pit from pole to pole,
  I thank whatever gods may be
    For my unconquerable soul.

  In the fell clutch of circumstance
    I have not winced nor cried aloud.
  Under t

In [18]:
print(retrieved_nodes[0].get_score())

0.4767366


The ingestion pipeline allows for saves the cache and docstore to a default folder `(./pipeline_storage)`. 

When running the pipeline, it reuses the cache, skips duplicate documents in the docstore.



In [19]:
pipeline.persist('./pipeline_storage')