# Query Pipelines Llamaindex

LlamaIndex provides a declarative query API that allows you to chain together different modules in order to orchestrate simple-to-advanced workflows over your data.

This is centered around our QueryPipeline abstraction. Load in a variety of modules (from LLMs to prompts to retrievers to other pipelines), connect them all together into a sequential chain or DAG, and run it end2end.

So what are the advantages of QueryPipeline?

- Express common workflows with fewer lines of code/boilerplate
- Greater readability
- Greater parity / better integration points with common low-code / no-code solutions (e.g. LangFlow)
- [In the future] A declarative interface allows easy serializability of pipeline components, providing portability of pipelines/easier deployment to different systems.

### Importing the libraries

In [11]:
from llama_index.query_pipeline.query import QueryPipeline
from llama_index.llms import OpenAI
from llama_index.prompts import PromptTemplate
from llama_index.storage import StorageContext
from llama_index import (
    VectorStoreIndex,
    ServiceContext,
    SimpleDirectoryReader,
    load_index_from_storage,
)

Load the API Keys

In [12]:
from dotenv import load_dotenv

# Load the enviroment variables
load_dotenv()

True

### Load the data

In [3]:
reader = SimpleDirectoryReader(
    input_files=["./data/paul_graham_essay.txt"]
)
docs = reader.load_data()
print(f"Loaded {len(docs)} docs")

Loaded 1 docs


### Create or rebuild storage and the index

In [5]:
import os

if not os.path.exists("storage"):
    index = VectorStoreIndex.from_documents(docs)
    # save index to disk
    index.set_index_id("vector_index")
    index.storage_context.persist("./storage")
else:
    # rebuild storage context
    storage_context = StorageContext.from_defaults(persist_dir="storage")
    # load index
    index = load_index_from_storage(storage_context, index_id="vector_index")

## Chain Together Prompt and LLM

In this section we show a super simple workflow of chaining together a prompt with LLM.

We simply define chain on initialization. This is a special case of a query pipeline where the components are purely sequential, and we automatically convert outputs into the right format for the next inputs.

#### Defining a Sequential Chain

Some simple pipelines are purely linear in nature - the output of the previous module directly goes into the input of the next module.

Some examples:

- prompt -> LLM -> output parsing
- prompt -> LLM -> prompt -> LLM
- retriever -> response synthesizer

In [7]:
# try chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
# Define the query pipeline
p = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)

In [8]:
output = p.run(movie_name="The Departed")
print(str(output))

[1;3;38;2;155;135;227m> Running module a3aea295-140f-4182-99e1-d68a8beee6f0 with input: 
movie_name: The Departed

[0m[1;3;38;2;155;135;227m> Running module f785a2ed-cbda-4bc6-85a9-51f04986f0b8 with input: 
messages: Please generate related movies to The Departed

[0massistant: 1. Infernal Affairs (2002) - The Departed is actually a remake of this Hong Kong crime thriller, which follows a similar storyline of undercover cops infiltrating a criminal organization.

2. The Town (2010) - Directed by Ben Affleck, this crime drama revolves around a group of bank robbers in Boston and the FBI agent determined to bring them down.

3. Heat (1995) - Directed by Michael Mann, this classic crime film features an intense cat-and-mouse game between a skilled detective and a professional thief in Los Angeles.

4. American Gangster (2007) - Based on a true story, this crime drama stars Denzel Washington as a Harlem drug lord and Russell Crowe as the detective determined to bring him to justice.

5

## Chain multiple Prompts with Streaming

The query pipelines have LLM streaming support (simply do as_query_component(streaming=True)). Intermediate outputs will get autoconverted, and the final output can be a streaming output. 

In [9]:
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
# let's add some subsequent prompts for fun
prompt_str2 = """\
Here's some text:

{text}

Can you rewrite this with a summary of each movie?
"""
prompt_tmpl2 = PromptTemplate(prompt_str2)
llm = OpenAI(model="gpt-3.5-turbo")
llm_c = llm.as_query_component(streaming=True)

p = QueryPipeline(
    chain=[prompt_tmpl, llm_c, prompt_tmpl2, llm_c], verbose=True
)

In [10]:
output = p.run(movie_name="The Dark Knight")
for o in output:
    print(o.delta, end="")

[1;3;38;2;155;135;227m> Running module 75059c2a-0656-48ab-a6f1-9c6b21705828 with input: 
movie_name: The Dark Knight

[0m[1;3;38;2;155;135;227m> Running module 68f4a780-4fe7-485c-8650-48878708a24d with input: 
messages: Please generate related movies to The Dark Knight

[0m[1;3;38;2;155;135;227m> Running module 4df40a5f-b8f2-4c5c-b2bc-05e21242cfec with input: 
text: <generator object llm_chat_callback.<locals>.wrap.<locals>.wrapped_llm_chat.<locals>.wrapped_gen at 0x0000025C53B82020>

[0m[1;3;38;2;155;135;227m> Running module 30e10c95-c374-4d9e-b893-f7c1ce3d003d with input: 
messages: Here's some text:

1. Batman Begins (2005)
2. The Dark Knight Rises (2012)
3. Batman v Superman: Dawn of Justice (2016)
4. Man of Steel (2013)
5. The Avengers (2012)
6. Iron Man (2008)
7. Captain Amer...

[0m1. Batman Begins (2005): A young Bruce Wayne becomes Batman to protect Gotham City from the League of Shadows and their leader, Ra's al Ghul.
2. The Dark Knight Rises (2012): Batman returns to

## Chain Together Query Rewriting Workflow (prompts + LLM) with Retrieval

Here we try a slightly more complex workflow where we send the input through two prompts before initiating retrieval.

- Generate question about given topic.

- Hallucinate answer given question, for better retrieval.

Since each prompt only takes in one input, note that the QueryPipeline will automatically chain LLM outputs into the prompt and then into the LLM.

In [11]:
from llama_index.postprocessor import CohereRerank

# First prompt: generate question regarding topic
prompt_str1 = "Please generate a concise question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl1 = PromptTemplate(prompt_str1)
# Second prompt: use HyDE to hallucinate answer.
prompt_str2 = (
    "Please write a passage to answer the question\n"
    "Try to include as many key details as possible.\n"
    "\n"
    "\n"
    "{query_str}\n"
    "\n"
    "\n"
    'Passage:"""\n'
)
prompt_tmpl2 = PromptTemplate(prompt_str2)
# Define the LLM and the retriever
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=5)
# Build the pipeline
p = QueryPipeline(
    chain=[prompt_tmpl1, llm, prompt_tmpl2, llm, retriever], verbose=True
)

In [12]:
nodes = p.run(topic="college")
len(nodes)

[1;3;38;2;155;135;227m> Running module 6adb7b56-54f3-4564-a6a7-8d512ee8ae34 with input: 
topic: college

[0m[1;3;38;2;155;135;227m> Running module 57bd8005-ea8e-40c6-9ea5-e714cc166401 with input: 
messages: Please generate a concise question about Paul Graham's life regarding the following topic college

[0m[1;3;38;2;155;135;227m> Running module a901183d-16ab-4786-aace-15bb59d49844 with input: 
query_str: assistant: How did Paul Graham's college experience shape his career and entrepreneurial mindset?

[0m[1;3;38;2;155;135;227m> Running module af775f22-0342-4930-89fc-6a14dc37dbc0 with input: 
messages: Please write a passage to answer the question
Try to include as many key details as possible.


How did Paul Graham's college experience shape his career and entrepreneurial mindset?


Passage:"""


[0m[1;3;38;2;155;135;227m> Running module e68186fe-29c2-4fc4-929b-731d4125fc49 with input: 
input: assistant: Paul Graham's college experience played a pivotal role in shaping his ca

5

## Create a Full RAG Pipeline as a DAG

Here we chain together a full RAG pipeline consisting of query rewriting, retrieval, reranking, and response synthesis.

Here we can’t use chain syntax because certain modules depend on multiple inputs (for instance, response synthesis expects both the retrieved nodes and the original question). Instead we’ll construct a DAG explicitly, through add_modules and then add_link.

### RAG Pipeline with Query Rewriting

We use an LLM to rewrite the query first before passing it to our downstream modules - retrieval/reranking/synthesis.

In [15]:
from llama_index.postprocessor import CohereRerank
from llama_index.response_synthesizers import TreeSummarize
from llama_index import ServiceContext

### Load the data

In [18]:
from pathlib import Path
from llama_index import download_loader

PDFReader = download_loader("PDFReader")

loader = PDFReader()
docs = loader.load_data(file=Path('./data/Attention is all you need.pdf'))

### Create the index

In [19]:
import os

if not os.path.exists("storage"):
    index = VectorStoreIndex.from_documents(docs)
    # save index to disk
    index.set_index_id("vector_index")
    index.storage_context.persist("./storage")
else:
    # rebuild storage context
    storage_context = StorageContext.from_defaults(persist_dir="storage")
    # load index
    index = load_index_from_storage(storage_context, index_id="vector_index")
    

In [20]:
# define modules
prompt_str = "Please generate a question about the Transformer model regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=3)
reranker = CohereRerank()
summarizer = TreeSummarize(
    service_context=ServiceContext.from_defaults(llm=llm)
)

Define the query pipeline

In [21]:
# define query pipeline
p = QueryPipeline(verbose=True)
# Adding the modules to the pipeline
p.add_modules(
    {
        "llm": llm,
        "prompt_tmpl": prompt_tmpl,
        "retriever": retriever,
        "summarizer": summarizer,
        "reranker": reranker,
    }
)

Next we draw links between modules with add_link. add_link takes in the source/destination module ids, and optionally the source_key and dest_key. Specify the source_key or dest_key if there are multiple outputs/inputs respectively.

You can view the set of input/output keys for each module through module.as_query_component().input_keys and module.as_query_component().output_keys.

Here we explicitly specify dest_key for the reranker and summarizer modules because they take in two inputs (query_str and nodes).

In [22]:
#Define the links between modules
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")

# look at summarizer input keys
print(summarizer.as_query_component().input_keys)

required_keys={'query_str', 'nodes'} optional_keys=set()


In [23]:
# Run the pipeline
response = p.run(topic="Positional encoding")

[1;3;38;2;155;135;227m> Running module prompt_tmpl with input: 
topic: Positional encoding

[0m[1;3;38;2;155;135;227m> Running module llm with input: 
messages: Please generate a question about the Transformer model regarding the following topic Positional encoding

[0m[1;3;38;2;155;135;227m> Running module retriever with input: 
input: assistant: How does the positional encoding in the Transformer model help capture the sequential information in the input data?

[0m[1;3;38;2;155;135;227m> Running module reranker with input: 
query_str: assistant: How does the positional encoding in the Transformer model help capture the sequential information in the input data?
nodes: [NodeWithScore(node=TextNode(id_='547e0749-a842-4d59-89df-850726120f2c', embedding=None, metadata={'page_label': '2', 'file_name': 'Attention is all you need.pdf'}, excluded_embed_metadata_keys=[], ex...

[0m[1;3;38;2;155;135;227m> Running module summarizer with input: 
query_str: assistant: How does the positio

In [24]:
print(str(response))

The positional encoding in the Transformer model helps capture the sequential information in the input data by injecting information about the relative or absolute position of each token in the sequence. Since the Transformer model does not have any recurrence or convolution, the positional encoding is necessary for the model to understand the order of the sequence. This allows the model to differentiate between tokens at different positions and capture the sequential relationships between them.


You can do async too

In [25]:
response = await p.arun(topic="Positional encoding")
print(str(response))

[1;3;38;2;155;135;227m> Running modules and inputs in parallel: 
Module key: prompt_tmpl. Input: 
topic: PÑositional encoding


[0m[1;3;38;2;155;135;227m> Running modules and inputs in parallel: 
Module key: llm. Input: 
messages: Please generate a question about the Transformer model regarding the following topic PÑositional encoding


[0m[1;3;38;2;155;135;227m> Running modules and inputs in parallel: 
Module key: retriever. Input: 
input: assistant: What is the purpose of positional encoding in the Transformer model and how does it help in capturing the sequential information of input tokens?


[0m[1;3;38;2;155;135;227m> Running modules and inputs in parallel: 
Module key: reranker. Input: 
query_str: assistant: What is the purpose of positional encoding in the Transformer model and how does it help in capturing the sequential information of input tokens?
nodes: [NodeWithScore(node=TextNode(id_='547e0749-a842-4d59-89df-850726120f2c', embedding=None, metadata={'page_label': '2'

### RAG Pipeline without Query Rewriting

Here we setup a RAG pipeline without the query rewriting step.

Here we need a way to link the input query to both the retriever, reranker, and summarizer. We can do this by defining a special InputComponent, allowing us to link the inputs to multiple downstream modules.

In [26]:
from llama_index.postprocessor import CohereRerank
from llama_index.response_synthesizers import TreeSummarize
from llama_index import ServiceContext
from llama_index.query_pipeline import InputComponent

retriever = index.as_retriever(similarity_top_k=5)
summarizer = TreeSummarize(
    service_context=ServiceContext.from_defaults(
        llm=OpenAI(model="gpt-3.5-turbo")
    )
)
reranker = CohereRerank()

In [27]:
p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "input": InputComponent(),
        "retriever": retriever,
        "reranker": reranker,
        "summarizer": summarizer,
    }
)
p.add_link("input", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")


In [28]:
output = p.run(input="what is the purpose of positional encoding in the Transformer architecture?")
print(str(output))

[1;3;38;2;155;135;227m> Running module input with input: 
input: what is the purpose of positional encoding in the Transformer architecture?

[0m[1;3;38;2;155;135;227m> Running module retriever with input: 
input: what is the purpose of positional encoding in the Transformer architecture?

[0m[1;3;38;2;155;135;227m> Running module reranker with input: 
query_str: what is the purpose of positional encoding in the Transformer architecture?
nodes: [NodeWithScore(node=TextNode(id_='f14f266b-2905-49d8-94ef-f2f94e7f421f', embedding=None, metadata={'page_label': '3', 'file_name': 'Attention is all you need.pdf'}, excluded_embed_metadata_keys=[], ex...

[0m[1;3;38;2;155;135;227m> Running module summarizer with input: 
query_str: what is the purpose of positional encoding in the Transformer architecture?
nodes: [NodeWithScore(node=TextNode(id_='f14f266b-2905-49d8-94ef-f2f94e7f421f', embedding=None, metadata={'page_label': '3', 'file_name': 'Attention is all you need.pdf'}, excluded_embed

## Defining a RAG pipeline with Sentence Window Retrieval

In this section, we'll apply sentence window retrieval to our RAG pipeline. The sentence window is a post-processing technique we have to include in the right position of the pipeline. 

This technique parses documents into single sentences per node. Each node also contains a "window", a richer context, with the sentences on either side of the node sentence. During retrieval, before passing the retrieved sentences to the LLM, the single sentences are replaced with a window containing the surrounding sentences, the richer context. This is most useful for large documents/indexes, as it helps to retrieve more fine-grained details.

First, we define a new Service Context to apply the Sentence Window parser and the index creation:

In [29]:
from llama_index.node_parser import SentenceWindowNodeParser
from typing import List
import pathlib

def sentence_window_context(embed_model, window_size: int = 3, chunk_size: int=256, chunk_overlap: int = 0):
        # create the sentence window node parser w/ default settings
        node_parser = SentenceWindowNodeParser.from_defaults(
            #sentence_splitter= text_splitter,
            window_size=window_size,
            window_metadata_key="window",
            original_text_metadata_key="original_text",
            include_metadata= False
        )        
        # Define a context
        print("Defining the Service Context")
        service_context = ServiceContext.from_defaults(
                embed_model=embed_model,
                # embed_model=OpenAIEmbedding(model="text-embedding-3-small",)
                node_parser=node_parser)
    
        return service_context

def create_or_rebuild_index(docs: List, service_context, storage_path: str = "storage"):

    pathlib.Path().resolve()
    if not os.path.exists(storage_path):
        # Build the index acording to the service
        print("Creating the index")
        index = VectorStoreIndex.from_documents(docs, service_context=service_context)
        # save index to disk
        index.set_index_id("vector_index")
        print("Saving the index")
        index.storage_context.persist(os.path.join(pathlib.Path().resolve(), storage_path))
    else:
        print("Restoring the storage")
        # rebuild storage context
        storage_context = StorageContext.from_defaults(persist_dir=storage_path)
        # load index
        print("Loading the index")
        index = load_index_from_storage(storage_context, index_id="vector_index")    
    
    return index



Now, we can start defining our embeddings and creating the index

In [30]:
from llama_index.embeddings import OpenAIEmbedding
import os

embed_model_name="text-embedding-3-small"
embed_model= OpenAIEmbedding(model=embed_model_name)
# Create the service context
service_context = sentence_window_context(embed_model, window_size=3)
# Create or rebuild the index
index = create_or_rebuild_index(docs, service_context)

Defining the Service Context
Creating the index
Saving the index


Here we need a way to link the output of the retriever to our post-processor to extract the whole window and then inject it to the summarizer. 

In [31]:
from llama_index.indices.postprocessor import MetadataReplacementPostProcessor
from llama_index.query_pipeline import InputComponent
from llama_index.response_synthesizers import TreeSummarize

# Set the retriever
retriever = index.as_retriever(similarity_top_k=3)
# Define the summarizer
summarizer = TreeSummarize(
        service_context=ServiceContext.from_defaults(
            llm=OpenAI(model="gpt-3.5-turbo",
                       temperature=0.2,
                       max_tokens=512)
        )
)
# Define the reranker
postprocessor = MetadataReplacementPostProcessor(
                                target_metadata_key="window"
                    )
# Define the query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
        {
            "input": InputComponent(),
            "retriever": retriever,
            "postprocessor": postprocessor,
            "summarizer": summarizer,
        }
)
    # Set the links between components
p.add_link("input", "retriever")
p.add_link("input", "postprocessor", dest_key="query_str")
p.add_link("retriever", "postprocessor", dest_key="nodes")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("postprocessor", "summarizer", dest_key="nodes")

In [32]:
# Run the pipeline
output = p.run(input="what is the purpose of positional encoding in the Transformer architecture?")
print(str(output))

[1;3;38;2;155;135;227m> Running module input with input: 
input: what is the purpose of positional encoding in the Transformer architecture?

[0m[1;3;38;2;155;135;227m> Running module retriever with input: 
input: what is the purpose of positional encoding in the Transformer architecture?

[0m[1;3;38;2;155;135;227m> Running module postprocessor with input: 
query_str: what is the purpose of positional encoding in the Transformer architecture?
nodes: [NodeWithScore(node=TextNode(id_='d6a7afad-0fa4-434f-a842-24fd306ab9d0', embedding=None, metadata={'window': 'We also use the usual learned linear transfor-\nmation and softmax function to convert the...

[0m[1;3;38;2;155;135;227m> Running module summarizer with input: 
query_str: what is the purpose of positional encoding in the Transformer architecture?
nodes: [NodeWithScore(node=TextNode(id_='d6a7afad-0fa4-434f-a842-24fd306ab9d0', embedding=None, metadata={'window': 'We also use the usual learned linear transfor-\nmation and softm

## RAG pipeline with Auto Merging Retrieval and Rerank

The idea here is pretty much similar to Sentence Window Retriever — to search for more granular pieces of information and then to extend the context window before feeding said context to an LLM for reasoning. Documents are split into smaller child chunks referring to larger parent chunks.
Fetch smaller chunks during retrieval first, then if more than n chunks in top k retrieved chunks are linked to the same parent node (larger chunk), we replace the context fed to the LLM by this parent node — works like auto merging a few retrieved chunks into a larger parent chunk, hence the method name

In this notebook, we showcase our `AutoMergingRetriever`, which looks at a set of leaf nodes and recursively “merges” subsets of leaf nodes that reference a parent node beyond a given threshold. This allows us to consolidate potentially disparate, smaller contexts into a larger context that might help synthesis.

You can define this hierarchy yourself over a set of documents, or you can make use of our brand-new text parser: a `HierarchicalNodeParser` that takes in a candidate set of documents and outputs an entire hierarchy of nodes, from “coarse-to-fine”.


First, we need to build the index using the `HierarchicalNodeParser`;

In [33]:
from llama_index import ServiceContext, VectorStoreIndex, StorageContext, load_index_from_storage
from llama_index.query_pipeline.query import QueryPipeline
from llama_index.llms import OpenAI
from llama_index.embeddings import OpenAIEmbedding

from llama_index.node_parser import HierarchicalNodeParser, get_leaf_nodes, get_root_nodes
#from llama_index.indices.postprocessor import SentenceTransformerRerank
from llama_index.response_synthesizers import Refine
from llama_index.retrievers import AutoMergingRetriever
from llama_index.query_engine import RetrieverQueryEngine
from llama_index.postprocessor import CohereRerank
from llama_index import ServiceContext
from llama_index.query_pipeline import InputComponent



In this section we make use of the HierarchicalNodeParser. This will output a hierarchy of nodes, from top-level nodes with bigger chunk sizes to child nodes with smaller chunk sizes, where each child node has a parent node with a bigger chunk size. By default, the hierarchy is:

- 1st level: chunk size 2048
- 2nd level: chunk size 512
- 3rd level: chunk size 128

We then load these nodes into storage. The leaf nodes are indexed and retrieved via a vector store - these are the nodes that will first be directly retrieved via similarity search. The other nodes will be retrieved from a docstore.


In [34]:
# create the hierarchical node parser w/ default settings
node_parser = HierarchicalNodeParser.from_defaults(
    chunk_sizes=[2048, 512, 128]
)
# Get the nodes
nodes = node_parser.get_nodes_from_documents(docs)
print(len(nodes))
# Get the leaf nodes
leaf_nodes = get_leaf_nodes(nodes)
print(leaf_nodes[10].text)
print(len(leaf_nodes))

168
Recent work has achieved
signiﬁcant improvements in computational efﬁciency through factorization tricks [ 21] and conditional
computation [ 32], while also improving model performance in case of the latter. The fundamental
constraint of sequential computation, however, remains.
Attention mechanisms have become an integral part of compelling sequence modeling and transduc-
tion models in various tasks, allowing modeling of dependencies without regard to their distance in
the input or output sequences [ 2,19].
126


Build the index

In [35]:
# Define the embddings model
embed_model_name="text-embedding-3-small"
embed_model= OpenAIEmbedding(model=embed_model_name)
# DEfine the LLM
llm = OpenAI(model="gpt-3.5-turbo", temperature=0.1)
# Create the context definition
auto_merging_context = ServiceContext.from_defaults(
    llm=llm,
    embed_model=embed_model,
    node_parser=node_parser,
)
# Create the storage context
storage_context = StorageContext.from_defaults()
storage_context.docstore.add_documents(nodes)
# Build the Vector index
automerging_index = VectorStoreIndex(
    leaf_nodes, storage_context=storage_context, service_context=auto_merging_context
)


### Define the Pipeline

Here, we define the retriever and a post processor to rerank the retrieved documents

In [36]:
automerging_retriever = automerging_index.as_retriever(
    similarity_top_k=6
)

retriever = AutoMergingRetriever(
    automerging_retriever, 
    automerging_index.storage_context, 
    verbose=True
)
# Reranker
reranker = CohereRerank()

# Define the summarizer
summarizer = Refine(
        service_context=ServiceContext.from_defaults(
            llm=OpenAI(model="gpt-3.5-turbo",
                       temperature=0.2,
                       max_tokens=512)
        )
)

In [37]:
# Define the query pipeline
p = QueryPipeline(verbose=True)
p.add_modules(
        {
            "input": InputComponent(),
            "retriever": retriever,
            "reranker": reranker,
            "summarizer": summarizer,
        }
)
# Set the links between components
p.add_link("input", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("input", "reranker", dest_key="query_str")
p.add_link("input", "summarizer", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")


In [38]:
# Run the pipeline
output = p.run(input="What is the purpose of positional encoding? Explain it with details")
print(str(output))

[1;3;38;2;155;135;227m> Running module input with input: 
input: What is the purpose of positional encoding? Explain it with details

[0m[1;3;38;2;155;135;227m> Running module retriever with input: 
input: What is the purpose of positional encoding? Explain it with details

[0m[1;3;38;2;155;135;227m> Running module reranker with input: 
query_str: What is the purpose of positional encoding? Explain it with details
nodes: [NodeWithScore(node=TextNode(id_='c10a01ef-3fa8-4a78-82b5-4d94d20641fc', embedding=None, metadata={'page_label': '6', 'file_name': 'Attention is all you need.pdf'}, excluded_embed_metadata_keys=[], ex...

[0m[1;3;38;2;155;135;227m> Running module summarizer with input: 
query_str: What is the purpose of positional encoding? Explain it with details
nodes: [NodeWithScore(node=TextNode(id_='c10a01ef-3fa8-4a78-82b5-4d94d20641fc', embedding=None, metadata={'page_label': '6', 'file_name': 'Attention is all you need.pdf'}, excluded_embed_metadata_keys=[], ex...

[0mPo

## Defining a Custom Component in a Query Pipeline

You can easily define a custom component. Simply subclass a QueryComponent, implement validation/run functions + some helpers, and plug it in.

In [24]:
from llama_index.query_pipeline import (
    CustomQueryComponent,
    InputKeys,
    OutputKeys,
)
from typing import Dict, Any
from llama_index.llms.llm import BaseLLM
from pydantic import Field


class RelatedMovieComponent(CustomQueryComponent):
    """Related movie component."""

    llm: BaseLLM = Field(..., description="OpenAI LLM")

    def _validate_component_inputs(
        self, input: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        # NOTE: this is OPTIONAL but we show you here how to do validation as an example
        return input

    @property
    def _input_keys(self) -> set:
        """Input keys dict."""
        # NOTE: These are required inputs. If you have optional inputs please override
        # `optional_input_keys_dict`
        return {"movie"}

    @property
    def _output_keys(self) -> set:
        return {"output"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        """Run the component."""
        # use QueryPipeline itself here for convenience
        prompt_str = "Please generate related movies to {movie_name}"
        prompt_tmpl = PromptTemplate(prompt_str)
        p = QueryPipeline(chain=[prompt_tmpl, llm])
        return {"output": p.run(movie_name=kwargs["movie"])}

Let’s try the custom component out! We’ll also add a step to convert the output to Shakespeare.

In [25]:
llm = OpenAI(model="gpt-3.5-turbo")
component = RelatedMovieComponent(llm=llm)

# let's add some subsequent prompts for fun
prompt_str = """\
Here's some text:

{text}

Can you rewrite this in the voice of Shakespeare?
"""
prompt_tmpl = PromptTemplate(prompt_str)

p = QueryPipeline(chain=[component, prompt_tmpl, llm], verbose=True)

In [26]:
output = p.run(movie="Love Actually")
print(str(output))

[1;3;38;2;155;135;227m> Running module 1ebfa41f-5ac9-441c-b009-b18cf45edec0 with input: 
movie: Love Actually

[0m[1;3;38;2;155;135;227m> Running module e9ce341a-1ae8-4379-9d04-790a0ed024f9 with input: 
text: assistant: 1. "Valentine's Day" (2010) - This romantic comedy follows the lives of several interconnected couples and singles in Los Angeles as they navigate love and relationships on Valentine's Day....

[0m[1;3;38;2;155;135;227m> Running module dd728f26-fcaa-4cad-bd57-fa7ee052936f with input: 
messages: Here's some text:

1. "Valentine's Day" (2010) - This romantic comedy follows the lives of several interconnected couples and singles in Los Angeles as they navigate love and relationships on Valentin...

[0massistant: 1. "Valentine's Daye" (2010) - Thise romantic comedy doth follow the lives of several interconnected couples and singles in Los Angeles as they doth navigate love and relationships on Valentine's Daye.

2. "New Year's Eve" (2011) - Similar to "Love Actually,"