# Ingestion Pipeline And Token Counting.

In this notebook, we will explore the concept of Ingestion Pipeline in LlamaIndex which helps you to easily create nodes, subsequently index and query them. Additionally we will even see how you can estimate the cost of your pipeline using number of tokens used.

1. Ingestion Pipeline - Easily Ingesting data.
2. Transformation caching Inmemory.
3. Custom Transformations.
4. Tokenization and Token Counting.

We will delve into each of these features in detail throughout the notebook.

## Setup

In [1]:
import nest_asyncio
nest_asyncio.apply()

In [2]:
import os
# from dotenv import load_dotenv, find_dotenv
# load_dotenv('D:/.env')
# OPENAI_API_KEY = os.environ['OPENAI_API_KEY']

## Download Data

We will use Paul Graham essay text for this tutorial.

In [4]:
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/9607a05a923ddf07deee86a56d386b42943ce381/docs/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'

--2024-06-06 08:36:29--  https://raw.githubusercontent.com/run-llama/llama_index/9607a05a923ddf07deee86a56d386b42943ce381/docs/docs/examples/data/paul_graham/paul_graham_essay.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 75042 (73K) [text/plain]
Saving to: ‘data/paul_graham/paul_graham_essay.txt’


2024-06-06 08:36:30 (11.7 MB/s) - ‘data/paul_graham/paul_graham_essay.txt’ saved [75042/75042]



## Load Data

In [5]:
from llama_index.core import SimpleDirectoryReader

documents= SimpleDirectoryReader('./data/paul_graham/').load_data()

## Ingestion Pipeline - Easily Ingesting data.

An `IngestionPipeline` uses a new concept of `Transformations` that are applied to input data.

The `Transformations` could be any of the following:

1. text splitter
2. node parser
3. metadata extractor
4. embeddings model

Once the data is ingested you can build index and start querying.

In [6]:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.core.text_splitter import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline

In [7]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
    ]
)
nodes = pipeline.run(documents=documents)

In [8]:
nodes[0]

TextNode(id_='550c894b-69c4-4d7e-b3ff-1896ac8c598e', embedding=None, metadata={'file_path': '/home/jupyter-prashant/RAG systems using LlamaIndex/Module 5 - Customization in LlamaIndex/M5_L3_Ingestion Pipeline and Token Counting/data/paul_graham/paul_graham_essay.txt', 'file_name': 'paul_graham_essay.txt', 'file_type': 'text/plain', 'file_size': 75042, 'creation_date': '2024-06-06', 'last_modified_date': '2024-06-06'}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='92d66897-2a11-46bb-b827-61eb0128e79e', node_type=<ObjectType.DOCUMENT: '4'>, metadata={'file_path': '/home/jupyter-prashant/RAG systems using LlamaIndex/Module 5 - Customization in LlamaIndex/M5_L3_Ingestion Pipeline and Token Counting/data/paul_

In [9]:
nodes[0].metadata

{'file_path': '/home/jupyter-prashant/RAG systems using LlamaIndex/Module 5 - Customization in LlamaIndex/M5_L3_Ingestion Pipeline and Token Counting/data/paul_graham/paul_graham_essay.txt',
 'file_name': 'paul_graham_essay.txt',
 'file_type': 'text/plain',
 'file_size': 75042,
 'creation_date': '2024-06-06',
 'last_modified_date': '2024-06-06'}

In [10]:
len(documents)

1

In [11]:
len(nodes)

20

In [12]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TitleExtractor()
    ]
)
nodes = pipeline.run(documents=documents)

100%|██████████| 5/5 [00:01<00:00,  3.46it/s]


In [13]:
nodes[0].metadata

{'file_path': '/home/jupyter-prashant/RAG systems using LlamaIndex/Module 5 - Customization in LlamaIndex/M5_L3_Ingestion Pipeline and Token Counting/data/paul_graham/paul_graham_essay.txt',
 'file_name': 'paul_graham_essay.txt',
 'file_type': 'text/plain',
 'file_size': 75042,
 'creation_date': '2024-06-06',
 'last_modified_date': '2024-06-06',
 'document_title': '"Exploring the Intersection of Writing, Programming, Philosophy, and Art: A Journey from College to the Evolution of AI and Lisp"'}

In [14]:
nodes[0].metadata['document_title']

'"Exploring the Intersection of Writing, Programming, Philosophy, and Art: A Journey from College to the Evolution of AI and Lisp"'

In [15]:
len(nodes)

20

In [16]:
nodes[0].embedding

Let's include embeddings.

In [17]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TitleExtractor(),
        OpenAIEmbedding(model='text-embedding-3-small')
    ]
)
nodes = pipeline.run(documents=documents)

100%|██████████| 5/5 [00:01<00:00,  3.49it/s]


In [18]:
nodes[0].embedding[:10]

[-0.037085600197315216,
 0.02660352922976017,
 -0.010638287290930748,
 -0.01540286559611559,
 0.062236323952674866,
 -0.02219824679195881,
 0.03178989142179489,
 0.06604798883199692,
 -0.0502389594912529,
 0.03311772271990776]

[Async Ingestion Pipeline + Metadata Extraction.](https://docs.llamaindex.ai/en/latest/examples/ingestion/async_ingestion_pipeline.html#)

## Transformation Caching

Every time you execute the same IngestionPipeline object, it stores a cache of the hash combining input nodes and transformations along with their respective output. In future runs, if the cache matches, the pipeline skips the transformation and uses the cached result. This accelerates repeated executions and aids in quicker iteration when selecting transformations.

In [19]:
from llama_index.core.ingestion import IngestionCache

In [20]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TitleExtractor(),
    ]
)
nodes = pipeline.run(documents=documents)

100%|██████████| 5/5 [00:01<00:00,  3.65it/s]


In [21]:
# save and load
pipeline.cache.persist("./llama_cache.json")
new_cache = IngestionCache.from_persist_path("./llama_cache.json")

In [22]:
new_pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TitleExtractor(),
    ],
    cache=new_cache,
)

### Now it will run instantly due to the cache.

Will be very useful when extracting metadata and also creating embeddings

In [23]:
nodes = new_pipeline.run(documents=documents)

Now let's add embeddings to it. You will observe that the parsing of nodes, title extraction is loaded from cache and OpenAI embeddings are created now.

In [24]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TitleExtractor(),
        OpenAIEmbedding()
    ],
    cache=new_cache,
)
nodes = pipeline.run(documents=documents)

In [25]:
# save and load
pipeline.cache.persist("./nodes_embedding.json")
nodes_embedding_cache = IngestionCache.from_persist_path("./nodes_embedding.json")

In [26]:
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TitleExtractor(),
        OpenAIEmbedding(model='text-embedding-3-small')
    ],
    cache=nodes_embedding_cache,
)

# Will load it from the cache as the transformations are same.
nodes = pipeline.run(documents=documents)

## Custom Transformations

Implementing custom transformations is pretty easy.

Let's include a transformation that removes special characters from the text before generating embeddings.

The primary requirement for transformations is that they should take a list of nodes as input and return a modified list of nodes.

In [27]:
from llama_index.core.schema import TransformComponent
import re

class TextCleaner(TransformComponent):
  def __call__(self, nodes, **kwargs):
    for node in nodes:
      node.text = re.sub(r'[^0-9A-Za-z ]', "", node.text)
    return nodes

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=100),
        TextCleaner(),
    ],
)

nodes = pipeline.run(documents=documents)

## Tokenization and Token Counting

In [28]:
import tiktoken
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from llama_index.core import Settings

In [29]:
token_counter = TokenCountingHandler(tokenizer=tiktoken.encoding_for_model("gpt-3.5-turbo").encode)

# If you plan to use different LLM model you can set in following way
# from transformers import AutoTokenizer
# tokenizer=AutoTokenizer.from_pretrained("HuggingFaceH4/zephyr-7b-beta").encode

In [30]:
Settings.llm = OpenAI(model="gpt-3.5-turbo", temperature=0.2)

In [31]:
# Callback manager handles callbacks for events within LlamaIndex.
Settings.callback_manager = CallbackManager([token_counter])

In [32]:
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex(nodes)

In [33]:
token_counter.total_embedding_token_count

17164

In [34]:
# reset the counts at your discretion!
token_counter.reset_counts()
print(token_counter.total_embedding_token_count)

0


In [35]:
query_engine = index.as_query_engine()
response = query_engine.query("What did the author do growing up?")

In [36]:
print(response.response)

The author worked on writing and programming outside of school before college.


In [37]:
print(
    "Embedding Tokens: ",
    token_counter.total_embedding_token_count,
    "\n",
    "LLM Prompt Tokens: ",
    token_counter.prompt_llm_token_count,
    "\n",
    "LLM Completion Tokens: ",
    token_counter.completion_llm_token_count,
    "\n",
    "Total LLM Token Count: ",
    token_counter.total_llm_token_count,
)

Embedding Tokens:  8 
 LLM Prompt Tokens:  1893 
 LLM Completion Tokens:  13 
 Total LLM Token Count:  1906
